近期接到一個任務,須要改造現有從mysql往Elasticsearch導入數據MTE(mysqlToEs)小工具,因爲以前採用單線程導入,千億數據須要兩週左右的時間才能導入完成,導入效率很是低。因此樓主花了3天的時間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導入工具,單臺服務器導入效率提升十幾倍(合理調整線程數據,效率更高)。java
乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據node
打開今日頭條,查看更多圖片mysql
若是近期有往es導入數據的同窗,能夠從github上下載。git
傳送門:github
https://github.com/dunzung/mtesql
關鍵技術棧數據庫
Elasticsearchjson
jdbc服務器
ExecutorService\Thread多線程
sql
工具說明
maven依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
java線程池設置
默認線程池大小爲21個,可調整。其中POR爲處理流程已辦數據線程池,ROR爲處理流程已閱數據線程池。
乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據
private static int THREADS = 21;
public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
定義已辦生產者線程/已閱生產者線程:ZlPendProducer/ZlReadProducer
public class ZlPendProducer implements Runnable {
...
@Override
public void run() {
System.out.println(threadName + "::啓動...");
for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
try {
....
int size = 1000;
for (int i = 0; i < count; i += size) {
if (i + size > count) {
//做用爲size最後沒有100條數據則剩餘幾條newList中就裝幾條
size = count - i;
}
String sql = "select * from " + tableName + " limit " + i + ", " + size;
System.out.println(tableName + "::sql::" + sql);
rs = statement.executeQuery(sql);
List<HistPendingEntity> lst = new ArrayList<>();
while (rs.next()) {
HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
lst.add(p);
}
MteExecutor.POR.submit(new ZlPendConsumer(lst));
Thread.sleep(2000);
}
....
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ZlReadProducer implements Runnable {
...已閱生產者處理邏輯同已辦生產者
}
定義已辦消費者線程/已閱生產者線程:ZlPendConsumer/ZlReadConsumer
public class ZlPendConsumer implements Runnable {
private String threadName;
private List<HistPendingEntity> lst;
public ZlPendConsumer(List<HistPendingEntity> lst) {
this.lst = lst;
}
@Override
public void run() {
...
lst.forEach(v -> {
try {
String json = new Gson().toJson(v);
EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
Const.COUNTER.LD_P.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
System.out.println("err::PendingId::" + v.getPendingId());
}
});
...
}
}
public class ZlReadConsumer implements Runnable {
//已閱消費者處理邏輯同已辦消費者
}
定義導入Elasticsearch數據監控線程:Monitor
監控線程-Monitor爲了計算每分鐘導入Elasticsearch的數據總條數,利用監控線程,能夠調整線程池的線程數的大小,以便利用多線程更快速的導入數據。
public void monitorToES() {
new Thread(() -> {
while (true) {
StringBuilder sb = new StringBuilder();
sb.append("已辦表數::").append(Const.TBL.TBL_PEND_COUNT)
.append("::已辦總數::").append(Const.COUNTER.LD_P_TOTAL)
.append("::已辦入庫總數::").append(Const.COUNTER.LD_P);
sb.append("~~~~已閱表數::").append(Const.TBL.TBL_READ_COUNT);
sb.append("::已閱總數::").append(Const.COUNTER.LD_R_TOTAL)
.append("::已閱入庫總數::").append(Const.COUNTER.LD_R);
if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
ldPrevPendCount = Const.COUNTER.LD_P.get();
ldPrevReadCount = Const.COUNTER.LD_R.get();
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
if ((end - start) / 1000 >= 60) {
start = end;
sb.append("\n#########################################\n");
sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條");
sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條");
ldPrevPendCount = Const.COUNTER.LD_P.get();
ldPrevReadCount = Const.COUNTER.LD_R.get();
}
}
System.out.println(sb.toString());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
初始化Elasticsearch:EsClient
String cName = meta.get("cName");//es集羣名字
String esNodes = meta.get("esNodes");//es集羣ip節點
Settings esSetting = Settings.builder()
.put("cluster.name", cName)
.put("client.transport.sniff", true)//增長嗅探機制,找到ES集羣
.put("thread_pool.search.size", 5)//增長線程池個數,暫時設爲5
.build();
String[] nodes = esNodes.split(",");
client = new PreBuiltTransportClient(esSetting);
for (String node : nodes) {
if (node.length() > 0) {
String[] hostPort = node.split(":");
client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
}
}
初始化數據庫鏈接
conn = DriverManager.getConnection(url, user, password);
啓動參數
nohup java -jar mte.jar ES-Cluster2019 192.168.1.10:9300,192.168.1.11:9300,192.168.1.12:9300 root 123456! jdbc:mysql://192.168.1.13
:3306/mte 130 130 >> ./mte.log 2>&1 &
參數說明
ES-Cluster2019 爲Elasticsearch集羣名字
192.168.1.10:9300,192.168.1.11:9300,192.168.1.12:9300爲es的節點IP
130 130爲已辦已閱分表的數據
程序入口:MteMain
乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據
// 監控線程
Monitor monitorService = new Monitor();
monitorService.monitorToES();
// 已辦生產者線程
Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));
pendProducerThread.start();
// 已閱生產者線程
Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));
readProducerThread.start();
小試牛刀
乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據
乾貨分享:利用java多線程技術往Elasticsearch導入千億級數據