初次接觸es,可能對增刪改查很熟悉,覺得能爲駕輕就熟,本次應用場景爲 數據庫變動一條記錄,會觸發更新es中的數據,每秒併發大概30條左右,測試環境一切工做正常(數據量較少),上線後發現日誌中不少相似於下面的錯誤:java
隊列滿了數據庫
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 200) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60 at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509) at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441) at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68) at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171) at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153) at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52) at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
...
更新版本錯誤:併發
Caused by: org.elasticsearch.index.engine.VersionConflictEngineException: [kpi][4] [opportunity][1442415600000]: version conflict, current [5933], provided [5932] at org.elasticsearch.index.engine.internal.InternalEngine.innerIndex(InternalEngine.java:582) [elasticsearch-1.4.4.jar:] at org.elasticsearch.index.engine.internal.InternalEngine.index(InternalEngine.java:522) [elasticsearch-1.4.4.jar:] at org.elasticsearch.index.shard.service.InternalIndexShard.index(InternalIndexShard.java:425) [elasticsearch-1.4.4.jar:] at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:193) [elasticsearch-1.4.4.jar:]
通過高手指點,從單次的實時操做改成批量操做,這樣作的好處有,減小網路開銷,從消息大小,時間,消息數量三個維度來衡量 批量操做的維度,若是數據不是要求很是實時的操做(很是實時的存儲應該也不會選擇es),改成批量操做後,錯誤均修復,大概配置以下。elasticsearch
private BulkProcessor bulkProcessor; @PostConstruct public void init() { this.bulkProcessor = BulkProcessor.builder( esTransportMainClient.getClient(), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("---嘗試插入{}條數據---", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("---嘗試插入{}條數據成功---", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("[es錯誤]---嘗試插入數據失敗---", failure); } }) .setBulkActions(1000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(2) .build(); }
我這裏全局保持一個bulkProcesser就能夠維持正常業務了。ide
每次使用的方法:測試
bulkProcessor.add(updateRequestBuilder.request());
此processer的含義爲若是消息數量到達1000 或者消息大小到大5M 或者時間達到5s 任意條件知足,客戶端就會把當前的數據提交到服務端處理。效率很高。ui