Bboss is a good elasticsearch Java rest client. It operates and accesses elasticsearch in a way similar to mybatis.html
BBoss Environmental requirements
JDK requirement: JDK 1.7+linux
Elasticsearch version requirements: 1.x,2.X,5.X,6.X,+git
Spring boot: 1.x,2.x,+github
kafka2x-Elasticsearch 數據同步工具demo
兼容 kafka_2.12-2.3.0 系列版本 ,使用本demo所帶的應用程序運行容器環境,能夠快速編寫,打包發佈可運行的數據導入工具windows
支持的 kafka_2.12-2.3.0 系列版本 到elasticsearch數據同步微信
kafka低版本(kafka_2.12-0.10.2.0系列版本)同步工具案例地址:https://gitee.com/bbossgroups/kafka1x-elasticsearchmybatis
支持的Elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,+app
支持海量PB級數據同步導入功能eclipse
使用參考文檔jvm
導入maven座標
<dependency> <groupId>com.bbossgroups.plugins</groupId> <artifactId>bboss-elasticsearch-rest-kafka2x</artifactId> <version>5.9.9</version> <scope>compile</scope> </dependency>
構建部署
準備工做
須要經過gradle構建發佈版本,gradle安裝配置參考文檔:
https://esdoc.bbossgroups.com/#/bboss-build
下載源碼工程-基於gradle
https://github.com/bbossgroups/kafka2x-elasticsearch
從上面的地址下載源碼工程,而後導入idea或者eclipse,根據本身的需求,修改導入程序邏輯
org.frameworkset.elasticsearch.imp.Kafka2ESdemo
若是須要測試和調試導入功能,運行Kafka2ESdemo的main方法便可便可:
public class Dbdemo { public static void main(String args[]){ Kafka2ESdemo dbdemo = new Kafka2ESdemo(); boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同時指定了默認值 dbdemo.scheduleTimestampImportData(dropIndice); } ..... }
修改es配置-kafka2x-elasticsearch\src\main\resources\application.properties
修改完畢配置後,就能夠進行功能調試了。
測試調試經過後,就能夠構建發佈可運行的版本了:進入命令行模式,在源碼工程根目錄kafka2x-elasticsearch 下運行如下gradle指令打包發佈版本
release.bat
運行做業
gradle構建成功後,在build/distributions目錄下會生成能夠運行的zip包,解壓運行導入程序
linux:
chmod +x restart.sh
./restart.sh
windows: restart.bat
做業jvm配置
修改jvm.options,設置內存大小和其餘jvm參數
-Xms1g
-Xmx1g
做業參數配置
在使用kafka2x-elasticsearch 時,爲了不調試過程當中不斷打包發佈數據同步工具,能夠將部分控制參數配置到啓動配置文件resources/application.properties中,而後在代碼中經過如下方法獲取配置的參數:
#工具主程序 mainclass=org.frameworkset.elasticsearch.imp.Kafka2ESdemo # 參數配置 # 在代碼中獲取方法:CommonLauncher.getBooleanAttribute("dropIndice",false);//同時指定了默認值false dropIndice=false
在代碼中獲取參數dropIndice方法:
boolean dropIndice = CommonLauncher.getBooleanAttribute("dropIndice",false);//同時指定了默認值false
另外能夠在resources/application.properties配置控制做業執行的一些參數,例如工做線程數,等待隊列數,批處理size等等:
queueSize=50 workThreads=10 batchSize=20
在做業執行方法中獲取並使用上述參數:
int batchSize = CommonLauncher.getIntProperty("batchSize",10);//同時指定了默認值 int queueSize = CommonLauncher.getIntProperty("queueSize",50);//同時指定了默認值 int workThreads = CommonLauncher.getIntProperty("workThreads",10);//同時指定了默認值 importBuilder.setBatchSize(batchSize); importBuilder.setQueue(queueSize);//設置批量導入線程池等待隊列長度 importBuilder.setThreadCount(workThreads);//設置批量導入線程池工做線程數量
elasticsearch技術交流羣:166471282
elasticsearch微信公衆號:bbossgroup
碼雲項目:kafka2x-elasticsearch