問題描述:用 Elasticsearch 消費 kafka 中的數據。kafka broker 版本不變,Elasticsearch 版本不變,Elasticsearch 的客戶端(transport API)版本不變。kafka client 版本發生變化,因爲以前使用 apache kafka 原生API 消費,消費模型爲 一個 kafka consumer 對應多個 partition;現使用 spring kafka 重構,保證 一個partition 至少對應一個 kafka consumer。現有9個 topic,合計27個 partition。html
java.util.concurrent.ExecutionException: RemoteTransportException[[es03][192.168.132.115:9300][indices:data/write/update]]; nested: RemoteTransportException[[es03][192.168.132.115:9300][indices:data/write/update[s]]]; nested: VersionConflictEngineException[[query][c3b8666b43b7cdab0450b0620b7f5e5df0633b35]: version conflict, document already exists (current version [1])];
複製代碼
groupId
保持一致的狀況下,纔不會出現一個 partition 被多個 kafka consumer 消費的狀況結論:kafka client group id 未與以前保持一致java
Tips:node
Elasticsearch 是分佈式的。當建立、更新或刪除文檔時,必須將文檔的新版本複製到集羣中的其餘節點。Elasticsearch也是異步和併發的,這意味着這些複製請求是並行發送的,而且到達目的地的順序可能不一致。Elasticsearch須要確保文檔的舊版本不會覆蓋新版本。git
爲了確保文檔的舊版本不會覆蓋新版本,對文檔執行的每一個操做都由協調該更改的主分片分配一個序列號。序列號隨着每一個操做的增長而增長,所以新操做的序列號必定比舊操做的序列號高。而後,Elasticsearch可使用操做的序列號來確保新文檔版本不會被分配了較小序列號的更改覆蓋。spring
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
複製代碼
經排查測試apache
以前消費這默認是 RangeAssignor
緩存
partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
複製代碼
如今優化爲微信
partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
複製代碼
因此會讓 kafka 沒法合理分配 partition網絡
node.data: false
禁止。node.master: false
禁止。節點類型 | 配置參數 | 默認值 |
---|---|---|
master eligible | node.master |
true |
data | node.data |
true |
ingest | node.ingest |
true |
coordinating only | 無 | 設置上面三個參數所有爲false |
machine learning | node.ml |
true(須要enable x-pack) |
Elasticsearch 在建立,更新甚至刪除的時候會更改 document version。架構
Index buffer
(內存) 和 Transaction log
(磁盤) 中,即使內存數據丟失,也可讀取磁盤中的 Transaction log
。refresh
操做將 Index buffer
中的數據寫入 segments
(內存),此時數據可查詢。flush
操做,將 segments
寫入磁盤,同時清空 Transaction log
;若 Transaction log
滿(默認512M),也會執行此操做。merge
操做,按期合併 segment
Elasticsearch 中的每一個索引操做首先使用路由解析到一個副本組,一般基於文檔ID。一旦肯定了副本組,操做將在內部轉發到組的當前主分片。主分片負責驗證數據格式並將其轉發到其餘副本。因爲副本能夠由主分片異步複製,因此不須要主副本複製到全部副本。相反,Elasticsearch 維護一個應該接收操做的副本分片列表。這個列表稱爲同步副本,由主節點維護。顧名思義,這些是一組保證處理了全部已向用戶確認的索引操做和刪除操做的分片副本。主分片負責維護,所以必須將全部操做複製到這個集合中的每一個副本分片。
主分片遵循如下基本流程:
fsync
操做,此操做不會將數據寫入磁盤。index.refresh_interval
配置。Refresh 後,數據就能夠被搜索到了。這也是爲何 Elasticsearch 被稱爲近實時搜索。ES Flush & Lucene Commit
Elasticsearch 使用主備模型。主備份模型的一個優勢是,主分片和其全部副本分片存有相同的數據。所以,一個同步副本就足以知足讀請求。
Elasticsearch 中的讀取能夠直接使用 document ID,也能夠是很是複雜的搜索,包含複雜的聚合,這個操做會佔用大量CPU資源。
當節點接收到讀請求時,該節點負責將其轉發給包含相關分片的節點、整合全部分片的返回值並響應客戶端(相似於一個MapReduce)。咱們將該節點稱爲請求的協調節點。基本流程以下:
本文由 發給官兵 創做,採用 CC BY 3.0 CN協議 進行許可。 可自由轉載、引用,但需署名做者且註明文章出 處。如轉載至微信公衆號,請在文末添加做者公衆號二維碼。