注:筆者環境 ES6.6.二、linux centos6.九、mysql8.0、三個節點、節點內存64G、八核CPU
場景:java
目前Mysql 數據庫數據量約10億,有幾張大表1億左右,直接在Mysql查詢出現各類效率問題,所以想着將數據導一份到ES,從而實現大數據快速檢索的功能。
經過Logstash插件批量導數據,我的感受出現各類奇怪的問題,例如ES 內存暴滿,mysql 所在服務器內存暴,最主要的是在一次導數時不能所導的數據量不能太大。通過一次次試探Logstash與優化Logstash導數的最後,終於仍是動手直接運用ES提供的api進行導數了。
目前直接模擬測試批量導數據,不管是經過Logstash仍是ES 提供的Api峯值均能達到10萬每秒左右。下面上代碼,主要是經過官網提供的api (RestHighLevelClient、BulkProcessor)整理而來。目前因爲Mysql 查詢出來的數據須要進行一些處理,基本可達到3.5萬+每秒。這個速度還有不小的優化空間,好比筆者經過稍微修改下述代碼,啓動幾個線程同時執行bulk多張表,從kibana界面看出速度達到了成倍的提高,由於速度已基本達到筆者所想要的,便不怎麼進行優化代碼了。
1、Maven 配置以下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>mysql
<groupId>ElasticSearchDemo</groupId>
<artifactId>ElasticSearchDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>linux
<name>ElasticSearchDemo</name>
<url>http://maven.apache.org</url>git
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>github
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>sql
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.6.2</version>
</dependency>數據庫
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>apache
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>centos
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2、代碼以下
package service;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import utils.DBHelper;
/**
* @author Ye
* @time 2019年3月29日
*
* 類說明:經過BulkProcess批量將Mysql數據導入ElasticSearch中
*/
public class BulkProcessDemo {
private static final Logger logger = LogManager.getLogger(BulkProcessDemo.class);
public static void main(String[] args) {
try {
long startTime = System.currentTimeMillis();
String tableName = "testTable";
createIndex(tableName);
writeMysqlDataToES(tableName);
logger.info(" use time: " + (System.currentTimeMillis() - startTime) / 1000 + "s");
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* 建立索引
* @param indexName
* @throws IOException
*/
public static void createIndex(String indexName) throws IOException {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("es01", 9200, "http")));
// ES 索引默認須要小寫,故筆者將其轉爲小寫
CreateIndexRequest requestIndex = new CreateIndexRequest(indexName.toLowerCase());
// 注: 設置副本數爲0,索引刷新時間爲-1對大批量索引數據效率的提高有不小的幫助
requestIndex.settings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1"));
// CreateIndexResponse createIndexResponse = client.indices().create(requestIndex, RequestOptions.DEFAULT);
client.close();
}
/**
* 將mysql 數據查出組裝成es須要的map格式,經過批量寫入es中
*
* @param tableName
*/
private static void writeMysqlDataToES(String tableName) {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("eshost", 9200, "http")));// 初始化
BulkProcessor bulkProcessor = getBulkProcessor(client);
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = DBHelper.getConn();
logger.info("Start handle data :" + tableName);
String sql = "SELECT * from " + tableName;
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
rs = ps.executeQuery();
ResultSetMetaData colData = rs.getMetaData();
ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>();
// bulkProcessor 添加的數據支持的方式並很少,查看其api發現其支持map鍵值對的方式,故筆者在此將查出來的數據轉換成hashMap方式
HashMap<String, String> map = null;
int count = 0;
String c = null;
String v = null;
while (rs.next()) {
count++;
map = new HashMap<String, String>(128);
for (int i = 1; i <= colData.getColumnCount(); i++) {
c = colData.getColumnName(i);
v = rs.getString(c);
map.put(c, v);
}
dataList.add(map);
// 每10萬條寫一次,不足的批次的最後再一併提交
if (count % 100000 == 0) {
logger.info("Mysql handle data number : " + count);
// 將數據添加到 bulkProcessor 中
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(tableName.toLowerCase(), "gzdc", hashMap2.get("S_GUID"))
.source(hashMap2));
}
// 每提交一次便將map與list清空
map.clear();
dataList.clear();
}
}
// count % 100000 處理未提交的數據
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(
new IndexRequest(tableName.toLowerCase(), "gzdc", hashMap2.get("S_GUID")).source(hashMap2));
}
logger.info("-------------------------- Finally insert number total : " + count);
// 將數據刷新到es, 注意這一步執行後並不會當即生效,取決於bulkProcessor設置的刷新時間
bulkProcessor.flush();
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
try {
rs.close();
ps.close();
conn.close();
boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
client.close();
logger.info(terminatedFlag);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
/**
* 建立bulkProcessor並初始化
* @param client
* @return
*/
private static BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : " + request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
+ executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
// bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
// 注意點:在這裏感受有點坑,官網樣例並無這一步,而筆者因一時粗心也沒注意,在調試時注意看才發現,上面對builder設置的屬性沒有生效
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
client.close();
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
數據庫鏈接類
package utils;
import java.sql.Connection;
import java.sql.DriverManager;
public class DBHelper {
public static final String url = "jdbc:mysql://xx.xx.xx.xx:3306/xxdemo?useSSL=true";
public static final String name = "com.mysql.cj.jdbc.Driver";
public static final String user = "xxx";
public static final String password = "xxxx";
public static Connection conn = null;
public static Connection getConn() {
try {
Class.forName(name);
conn = DriverManager.getConnection(url, user, password);//獲取鏈接
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
日誌文件配置文件:log4j2.properties
property.filePath=logs
property.filePattern=logs/%d{yyyy}/%d{MM}
#\u8F93\u51FA\u683C\u5F0F
property.layoutPattern=%-d{yyyy-MM-dd HH:mm:ss SSS} [ %p ] [ %c ] %m%n
rootLogger.level = info
appender.console.type = Console
appender.console.name = STDOUT
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = ${layoutPattern}
rootLogger.appenderRef.stdout.ref = STDOUT
appender.I.type = RollingFile
appender.I.name = InfoRollingFile
appender.I.fileName = ${filePath}/es-info.log
appender.I.filePattern = ${filePattern}/es_info.log
appender.I.layout.type = PatternLayout
appender.I.layout.pattern = ${layoutPattern}
appender.I.policies.type = Policies
appender.I.policies.time.type = TimeBasedTriggeringPolicy
appender.I.policies.time.interval = 1
appender.I.policies.time.modulate = true
appender.I.policies.size.type = SizeBasedTriggeringPolicy
appender.I.policies.size.size=20M
appender.I.strategy.type = DefaultRolloverStrategy
appender.I.strategy.max = 100
#\u8FC7\u6EE4INFO\u4EE5\u4E0A\u4FE1\u606F
appender.I.filter.threshold.type = ThresholdFilter
appender.I.filter.threshold.level = WARN
appender.I.filter.threshold.onMatch = DENY
appender.I.filter.threshold.onMisMatch=NEUTRAL
rootLogger.appenderRef.I.ref = InfoRollingFile
rootLogger.appenderRef.I.level=INFO
appender.E.type = RollingFile
appender.E.name = ErrorRollingFile
appender.E.fileName = ${filePath}/es-error.log
appender.E.filePattern = ${filePattern}/es_error.log
appender.E.layout.type = PatternLayout
appender.E.layout.pattern = ${layoutPattern}
appender.E.policies.type = Policies
appender.E.policies.time.type = TimeBasedTriggeringPolicy
appender.E.policies.time.interval = 1
appender.E.policies.time.modulate = true
appender.E.policies.size.type = SizeBasedTriggeringPolicy
appender.E.policies.size.size=20M
appender.E.strategy.type = DefaultRolloverStrategy
appender.E.strategy.max = 100
#\u8FC7\u6EE4ERROR\u4EE5\u4E0A\u4FE1\u606F
appender.E.filter.threshold.type = ThresholdFilter
appender.E.filter.threshold.level = FATAL
appender.E.filter.threshold.onMatch = DENY
appender.E.filter.threshold.onMisMatch=NEUTRAL
rootLogger.appenderRef.E.ref = ErrorRollingFile
rootLogger.appenderRef.E.level=ERROR
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
核心代碼解說:
3、驗證上述代碼結果
一、起初筆者直接在本地Eclipse中運行,因爲電腦配置所限制,寫入es 速度只能達到5000/s左右的樣子。
二、直接將代碼打包成可運行的jar,經過java -jar xxx.jar 方式在linux中運行,基本能夠達到2.5萬/s,結果以下圖所示:
注:筆者下圖是將一張具備5400萬的數據,一次性導入ES,該表60多個字段
運行的部分日誌以下:
2019-03-29 17:31:34 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2679
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Mysql handle data number : 13500000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2681
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2683
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2682
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2685
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2686
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2687
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2684
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2688
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2689
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2691
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2693
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2692
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2690
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2696
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2694
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2695
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2697
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2698
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2699
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2700
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
簡單修改下相應參數進行bulk,查看效率
修改下每一個批次上傳 10000條數據,setBulkSize 設置成300M,數據處理批次修改爲20萬(if (count % 200000 == 0) ),其它配置同樣,處理另外一張大表,效果以下:
該表數據量:該表27個字段
SELECT count(1) from xxxxxxxxx; – 171769567
全量bulk進es用時:5113s
2019-04-01 11:39:04.453 INFO 28080 — [nio-8080-exec-1] s.ElasticServiceImpl : use time: 5113s
es 監控以下圖
4、小結
筆者我的不深刻的學習感受 Logstash 對於這種須要一次性大批量將數據導入ES的需求適應性可能不太好。
大批量的數據導入ES 我的推薦BulkProcessor, 不管是什麼數據,只須要將其轉換成Map鍵值對的格式即可運用bulkProcessor實現流式導入。
對於導入效率須要看集羣的環境以及導入批次的設置,還有ES的相關優化配置。
5、遇到的小坑
問題一
因爲表數據量太大,一開始本想以時間段(原數據庫中有時間相關的字段)分批查出數據再將其導入ES(在用Logstash插件導入時就是這麼處理的,但因爲數據在不一樣時間的分佈狀況很不同,在運用Logstash插件導入時常常由於某一時間段數據量太大而致使死機問題)。
解決:JDBC resltset中fetchSize的設置。
詳見:[正確使用MySQL JDBC setFetchSize()方法解決JDBC處理大結果集內在溢出]
問題二
經過官網提供的代碼設置了bulder 相關的屬性,好比每一個批次提交多少數據,但程序運行起來發現這些設置並無生效。
官網代碼以下:
BulkProcessor bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).build();
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder =
BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
1
2
3
4
5
6
7
8
9
10
11
12
13
解決:其實該坑的出現主要是筆者我的在此太粗心了,還有就是太相信官網了。
在這個問題上稍微留意下上述代碼就能夠看出 builder 雖然設置了,但並無將其設置後的值轉給那個類調用了,所以bulkProcessor仍是使用了默認的配置。
BulkProcessor bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).build();
其實這一步能夠換成以下(該bulkProcessor 的建立應在builder 屬性設置完成以後)
BulkProcessor bulkProcessor = builder.build();
# 完整代碼以下:
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
bulkProcessor = builder.build();
1
2
3
4
5
6
7
8
9
10
11
12
13
在這裏可能會感受如何將bulkProcessor 轉給監聽者(listener),查看builder.build(); 瞄了下底層會發現其實該方法最終是調用以下方法的:
/**
* Builds a new bulk processor.
*/
public BulkProcessor build() {
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
}
1
2
3
4
5
6
7
完整代碼見:https://github.com/yechunbo/BigdataSearchPro.git
參考文檔:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.6/java-rest-high-document-bulk.htmlhttps://www.elastic.co/guide/en/elasticsearch/client/java-api/6.6/_log4j_2_logger.htmlhttps://docs.oracle.com/cd/E11882_01/java.112/e16548/resltset.htm#JJDBC28622正確使用MySQL JDBC setFetchSize()方法解決JDBC處理大結果集內在溢出JDBC讀取數據優化-fetch size————————————————版權聲明:本文爲CSDN博主「在屋頂聽歌」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接及本聲明。原文連接:https://blog.csdn.net/u013850277/article/details/88904303