利用java多線程技術往Elasticsearch導入千億級數據

近期接到一個任務,須要改造現有從mysql往Elasticsearch導入數據MTE(mysqlToEs)小工具,因爲以前採用單線程導入,千億數據須要兩週左右的時間才能導入完成,導入效率很是低。因此樓主花了3天的時間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導入工具,單臺服務器導入效率提升十幾倍(合理調整線程數據,效率更高)。java

 

乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據node

 

打開今日頭條,查看更多圖片mysql

若是近期有往es導入數據的同窗,能夠從github上下載。git

 

傳送門:github

 

https://github.com/dunzung/mtesql

關鍵技術棧數據庫

Elasticsearchjson

jdbc服務器

ExecutorService\Thread多線程

sql

工具說明

maven依賴

 

<dependency>

 <groupId>mysql</groupId>

 <artifactId>mysql-connector-java</artifactId>

 <version>${mysql.version}</version>

</dependency>

<dependency>

 <groupId>org.elasticsearch</groupId>

 <artifactId>elasticsearch</artifactId>

 <version>${elasticsearch.version}</version>

</dependency>

<dependency>

 <groupId>org.elasticsearch.client</groupId>

 <artifactId>transport</artifactId>

 <version>${elasticsearch.version}</version>

</dependency>

<dependency>

 <groupId>org.projectlombok</groupId>

 <artifactId>lombok</artifactId>

 <version>${lombok.version}</version>

</dependency>

<dependency>

 <groupId>com.alibaba</groupId>

 <artifactId>fastjson</artifactId>

 <version>${fastjson.version}</version>

</dependency>

java線程池設置

 

默認線程池大小爲21個,可調整。其中POR爲處理流程已辦數據線程池,ROR爲處理流程已閱數據線程池。

 

乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據

 

private static int THREADS = 21;

public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);

public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);

定義已辦生產者線程/已閱生產者線程:ZlPendProducer/ZlReadProducer

 

public class ZlPendProducer implements Runnable {

 ...

 @Override

 public void run() {

 System.out.println(threadName + "::啓動...");

 for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)

 try {

 ....

 int size = 1000;

 for (int i = 0; i < count; i += size) {

 if (i + size > count) {

 //做用爲size最後沒有100條數據則剩餘幾條newList中就裝幾條

 size = count - i;

 }

 String sql = "select * from " + tableName + " limit " + i + ", " + size;

 System.out.println(tableName + "::sql::" + sql);

 rs = statement.executeQuery(sql);

 List<HistPendingEntity> lst = new ArrayList<>();

 while (rs.next()) {

 HistPendingEntity p = PendUtils.getHistPendingEntity(rs);

 lst.add(p);

 }

 MteExecutor.POR.submit(new ZlPendConsumer(lst));

 Thread.sleep(2000);

 }

 ....

 } catch (Exception e) {

 e.printStackTrace();

 }

 }

}

public class ZlReadProducer implements Runnable {

 ...已閱生產者處理邏輯同已辦生產者

}

定義已辦消費者線程/已閱生產者線程:ZlPendConsumer/ZlReadConsumer

 

public class ZlPendConsumer implements Runnable {

 private String threadName;

 private List<HistPendingEntity> lst;

 public ZlPendConsumer(List<HistPendingEntity> lst) {

 this.lst = lst;

 }

 @Override

 public void run() {

 ...

 lst.forEach(v -> {

 try {

 String json = new Gson().toJson(v);

 EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);

 Const.COUNTER.LD_P.incrementAndGet();

 } catch (Exception e) {

 e.printStackTrace();

 System.out.println("err::PendingId::" + v.getPendingId());

 }

 });

 ...

 }

}

public class ZlReadConsumer implements Runnable {

 //已閱消費者處理邏輯同已辦消費者

}

定義導入Elasticsearch數據監控線程:Monitor

 

監控線程-Monitor爲了計算每分鐘導入Elasticsearch的數據總條數,利用監控線程,能夠調整線程池的線程數的大小,以便利用多線程更快速的導入數據。

 

public void monitorToES() {

 new Thread(() -> {

 while (true) {

 StringBuilder sb = new StringBuilder();

 sb.append("已辦表數::").append(Const.TBL.TBL_PEND_COUNT)

 .append("::已辦總數::").append(Const.COUNTER.LD_P_TOTAL)

 .append("::已辦入庫總數::").append(Const.COUNTER.LD_P);

 sb.append("~~~~已閱表數::").append(Const.TBL.TBL_READ_COUNT);

 sb.append("::已閱總數::").append(Const.COUNTER.LD_R_TOTAL)

 .append("::已閱入庫總數::").append(Const.COUNTER.LD_R);

 if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {

 ldPrevPendCount = Const.COUNTER.LD_P.get();

 ldPrevReadCount = Const.COUNTER.LD_R.get();

 start = System.currentTimeMillis();

 } else {

 long end = System.currentTimeMillis();

 if ((end - start) / 1000 >= 60) {

 start = end;

 sb.append("\n#########################################\n");

 sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條");

 sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條");

 ldPrevPendCount = Const.COUNTER.LD_P.get();

 ldPrevReadCount = Const.COUNTER.LD_R.get();

 }

 }

 System.out.println(sb.toString());

 try {

 Thread.sleep(3000);

 } catch (InterruptedException e) {

 e.printStackTrace();

 }

 }

 }).start();

}

初始化Elasticsearch:EsClient

 

String cName = meta.get("cName");//es集羣名字

String esNodes = meta.get("esNodes");//es集羣ip節點

Settings esSetting = Settings.builder()

 .put("cluster.name", cName)

 .put("client.transport.sniff", true)//增長嗅探機制,找到ES集羣

 .put("thread_pool.search.size", 5)//增長線程池個數,暫時設爲5

 .build();

String[] nodes = esNodes.split(",");

client = new PreBuiltTransportClient(esSetting);

for (String node : nodes) {

 if (node.length() > 0) {

 String[] hostPort = node.split(":");

 client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));

 }

}

初始化數據庫鏈接

 

conn = DriverManager.getConnection(url, user, password);

啓動參數

 

nohup java -jar mte.jar ES-Cluster2019 192.168.1.10:9300,192.168.1.11:9300,192.168.1.12:9300 root 123456! jdbc:mysql://192.168.1.13

:3306/mte 130 130 >> ./mte.log 2>&1 &

參數說明

 

ES-Cluster2019 爲Elasticsearch集羣名字

 

192.168.1.10:9300,192.168.1.11:9300,192.168.1.12:9300爲es的節點IP

 

130 130爲已辦已閱分表的數據

 

程序入口:MteMain

 

乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據

 

// 監控線程

Monitor monitorService = new Monitor();

monitorService.monitorToES();

// 已辦生產者線程

Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));

pendProducerThread.start();

// 已閱生產者線程

Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));

readProducerThread.start();

小試牛刀

 

乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據

 

乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據

相關文章
相關標籤/搜索