kafka2x-Elasticsearch 數據同步工具demo

Bboss is a good elasticsearch Java rest client. It operates and accesses elasticsearch in a way similar to mybatis.html

BBoss Environmental requirements

JDK requirement: JDK 1.7+linux

Elasticsearch version requirements: 1.x,2.X,5.X,6.X,+git

Spring boot: 1.x,2.x,+github

kafka2x-Elasticsearch 數據同步工具demo

兼容 kafka_2.12-2.3.0 系列版本 ,使用本demo所帶的應用程序運行容器環境,能夠快速編寫,打包發佈可運行的數據導入工具windows

支持的 kafka_2.12-2.3.0 系列版本 到elasticsearch數據同步微信

kafka低版本(kafka_2.12-0.10.2.0系列版本)同步工具案例地址:https://gitee.com/bbossgroups/kafka1x-elasticsearchmybatis

支持的Elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,+app

支持海量PB級數據同步導入功能eclipse

使用參考文檔jvm

導入maven座標

<dependency>  <groupId>com.bbossgroups.plugins</groupId>  <artifactId>bboss-elasticsearch-rest-kafka2x</artifactId>  <version>5.9.9</version>  <scope>compile</scope> </dependency>

構建部署

準備工做

須要經過gradle構建發佈版本,gradle安裝配置參考文檔:

https://esdoc.bbossgroups.com/#/bboss-build

下載源碼工程-基於gradle

https://github.com/bbossgroups/kafka2x-elasticsearch

從上面的地址下載源碼工程,而後導入idea或者eclipse,根據本身的需求,修改導入程序邏輯

org.frameworkset.elasticsearch.imp.Kafka2ESdemo

若是須要測試和調試導入功能,運行Kafka2ESdemo的main方法便可便可:

public class Dbdemo {  public static void main(String args[]){  Kafka2ESdemo dbdemo = new Kafka2ESdemo();  boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同時指定了默認值   dbdemo.scheduleTimestampImportData(dropIndice);  }  ..... }

修改es配置-kafka2x-elasticsearch\src\main\resources\application.properties

修改完畢配置後,就能夠進行功能調試了。

測試調試經過後,就能夠構建發佈可運行的版本了:進入命令行模式,在源碼工程根目錄kafka2x-elasticsearch 下運行如下gradle指令打包發佈版本

release.bat

運行做業

gradle構建成功後,在build/distributions目錄下會生成能夠運行的zip包,解壓運行導入程序

linux:

chmod +x restart.sh

./restart.sh

windows: restart.bat

做業jvm配置

修改jvm.options,設置內存大小和其餘jvm參數

-Xms1g

-Xmx1g

做業參數配置

在使用kafka2x-elasticsearch 時,爲了不調試過程當中不斷打包發佈數據同步工具,能夠將部分控制參數配置到啓動配置文件resources/application.properties中,而後在代碼中經過如下方法獲取配置的參數:

#工具主程序 mainclass=org.frameworkset.elasticsearch.imp.Kafka2ESdemo  # 參數配置 # 在代碼中獲取方法:CommonLauncher.getBooleanAttribute("dropIndice",false);//同時指定了默認值false dropIndice=false

在代碼中獲取參數dropIndice方法:

boolean dropIndice = CommonLauncher.getBooleanAttribute("dropIndice",false);//同時指定了默認值false

另外能夠在resources/application.properties配置控制做業執行的一些參數,例如工做線程數,等待隊列數,批處理size等等:

queueSize=50 workThreads=10 batchSize=20

在做業執行方法中獲取並使用上述參數:

int batchSize = CommonLauncher.getIntProperty("batchSize",10);//同時指定了默認值 int queueSize = CommonLauncher.getIntProperty("queueSize",50);//同時指定了默認值 int workThreads = CommonLauncher.getIntProperty("workThreads",10);//同時指定了默認值 importBuilder.setBatchSize(batchSize); importBuilder.setQueue(queueSize);//設置批量導入線程池等待隊列長度 importBuilder.setThreadCount(workThreads);//設置批量導入線程池工做線程數量

elasticsearch技術交流羣:166471282

elasticsearch微信公衆號:bbossgroup

碼雲項目:kafka2x-elasticsearch

 

磁力搜索網站導航2020更新

https://www.cnblogs.com/cilisousuo/p/12099547.html

相關文章
相關標籤/搜索