從一次線上問題談談 Elasticsearch 讀寫架構

問題描述:用 Elasticsearch 消費 kafka 中的數據。kafka broker 版本不變,Elasticsearch 版本不變,Elasticsearch 的客戶端(transport API)版本不變。kafka client 版本發生變化,因爲以前使用 apache kafka 原生API 消費,消費模型爲 一個 kafka consumer 對應多個 partition;現使用 spring kafka 重構,保證 一個partition 至少對應一個 kafka consumer。現有9個 topic,合計27個 partition。html

問題一:Elasticsearch 拋出如下異常

java.util.concurrent.ExecutionException: RemoteTransportException[[es03][192.168.132.115:9300][indices:data/write/update]]; nested: RemoteTransportException[[es03][192.168.132.115:9300][indices:data/write/update[s]]]; nested: VersionConflictEngineException[[query][c3b8666b43b7cdab0450b0620b7f5e5df0633b35]: version conflict, document already exists (current version [1])];
複製代碼
  1. Elasticsearch 在寫 document 時是基於樂觀鎖的,因此上述問題發生確定是由於併發寫的問題
  2. 而如今正常狀況下每一個 partition 對應一個 kafka consumer,是不該該出現併發寫(也就是重複消費狀況)的
  3. 上述第二點忽略了一個重要前提,在 groupId 保持一致的狀況下,纔不會出現一個 partition 被多個 kafka consumer 消費的狀況

結論:kafka client group id 未與以前保持一致java

Tips:node

Elasticsearch 是分佈式的。當建立、更新或刪除文檔時,必須將文檔的新版本複製到集羣中的其餘節點。Elasticsearch也是異步和併發的,這意味着這些複製請求是並行發送的,而且到達目的地的順序可能不一致。Elasticsearch須要確保文檔的舊版本不會覆蓋新版本。git

爲了確保文檔的舊版本不會覆蓋新版本,對文檔執行的每一個操做都由協調該更改的主分片分配一個序列號。序列號隨着每一個操做的增長而增長,所以新操做的序列號必定比舊操做的序列號高。而後,Elasticsearch可使用操做的序列號來確保新文檔版本不會被分配了較小序列號的更改覆蓋。spring

問題二:kafka 拋出如下異常

org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
複製代碼

經排查測試apache

以前消費這默認是 RangeAssignor緩存

partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
複製代碼

如今優化爲微信

partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
複製代碼

因此會讓 kafka 沒法合理分配 partition網絡

Elasticsearch 節點類型

Master Node
  • 職責
    1. 處理建立,刪除索引等請求 / 決定分片⽚被分配到哪一個節點 / 負責索引的建立與刪除。
    2. 維護而且更新 Cluster State,且只能由 master node 維護,不然會形成集羣狀態不正常。
  • Master Node 的最佳實踐
    1. Master 節點很是重要,在部署上須要考慮解決單點的問題。
    2. 爲⼀個集羣設置多個 Master 節點 / 每一個節點只承擔 Master 的單⼀角色。
Data Node
  • 職責:保存分片數據。在數據擴展上起到了相當重要的做用(由 Master Node 決定如何把分片分發到數據節點上)。
  • 節點啓動後,默認就是數據節點。能夠設置 node.data: false 禁止。
  • 經過增長數據節點,能夠解決數據水平擴展和解決數據單點問題。
Master Eligible Nodes & 選主流程
  • ⼀個集羣,⽀持配置多個 Master Eligible 節點。這些節點能夠在必要時(如 Master 節點出現故障,網絡故障時)參與選主流程,成爲 Master 節點。
  • 節點啓動後,默認就是⼀個 Master eligible 節點,設置 node.master: false 禁止。
  • 當集羣內第⼀個 Master eligible 節點啓動時候,它會將本身選舉成 Master 節點。
Coordinating Node
  • 處理請求的節點,負責路由請求到正確的節點,如建立索引的請求須要路由到 Master 節點。
  • 全部節點默認都是 Coordinating Node。
  • 經過將其餘類型(data node/master node/master eligible node)設置成 False,使其成爲專門負責的協調的節點。
節點類型總結
節點類型 配置參數 默認值
master eligible node.master true
data node.data true
ingest node.ingest true
coordinating only 設置上面三個參數所有爲false
machine learning node.ml true(須要enable x-pack)

Elasticsearch 寫架構

Elasticsearch 在建立,更新甚至刪除的時候會更改 document version。架構

Elasticsearch 如何作到高可用:
  1. 數據首先寫入到 Index buffer(內存) 和 Transaction log(磁盤) 中,即使內存數據丟失,也可讀取磁盤中的 Transaction log
  2. 默認 1s 一次的 refresh 操做將 Index buffer 中的數據寫入 segments(內存),此時數據可查詢
  3. 默認30分鐘執行一次的 flush 操做,將 segments 寫入磁盤,同時清空 Transaction log;若 Transaction log 滿(默認512M),也會執行此操做。
  4. merge 操做,按期合併 segment

  Elasticsearch 中的每一個索引操做首先使用路由解析到一個副本組,一般基於文檔ID。一旦肯定了副本組,操做將在內部轉發到組的當前主分片。主分片負責驗證數據格式並將其轉發到其餘副本。因爲副本能夠由主分片異步複製,因此不須要主副本複製到全部副本。相反,Elasticsearch 維護一個應該接收操做的副本分片列表。這個列表稱爲同步副本,由主節點維護。顧名思義,這些是一組保證處理了全部已向用戶確認的索引操做和刪除操做的分片副本。主分片負責維護,所以必須將全部操做複製到這個集合中的每一個副本分片。

主分片遵循如下基本流程:

  • 驗證傳入操做並在結構無效時拒絕它(例如:插入時字段格式與 mapping 不匹配)
  • 在本地執行操做,即索引或刪除相關文檔。這還將驗證字段的內容,並在須要時拒絕(例如:關鍵字值太長,沒法在Lucene中進行索引)。
  • 將操做轉發到當前同步複製集中的每一個副本分片。若是有多個副本分片,則並行執行。
  • 一旦全部副本分片都成功地執行了操做並響應了主分片,主副本就會向客戶端確認請求成功完成。
Lucene Index
  • 在 Lucene 中,單個倒排索引⽂件被稱爲Segment。Segment 是⾃包含的,不可變動的,多個 Segments 彙總在⼀起,稱爲 Lucene 的 Index,其對應的就是 ES 中的 Shard(分片)。而後另外使用一個 commit 文件,記錄索引內全部的 segment。
  • 當有新文檔寫⼊時,會生成新 Segment,查詢時會同時查詢全部 Segments,而且對結果彙總。Lucene 中有⼀個⽂件,用來記錄全部 Segments 信息,叫作 Commit Point。
  • 刪除的⽂檔信息,保存在「.del」文件中。
什麼是 Refresh
  • 數據首先寫入到 Index buffer(內存)中,此時數據不可被查詢到。

  • 將 Index buffer 寫入 Segment 的過程叫Refresh。Refresh 不執行 fsync 操做,此操做不會將數據寫入磁盤。

  • Refresh 頻率:默認 1 秒發生⼀次,可經過 index.refresh_interval 配置。Refresh 後,數據就能夠被搜索到了。這也是爲何 Elasticsearch 被稱爲近實時搜索
  • 若是系統有⼤量的數據寫入,那就會產生不少的 SegmentIndex Buffer 被佔滿時,會觸發 Refresh,默認值是 JVM 的 10%。
什麼是 Transaction log

  • Segment 寫⼊磁盤的過程相對耗時,藉助⽂件系統緩存,Refresh 時,先將 Segment 寫入緩存以開放查詢。
  • 爲了保證數據不會丟失。因此在 Index 文檔時,同時寫 Transaction Log,⾼版本開始,Transaction Log 默認落盤。每一個分片有⼀個 Transaction Log。
  • 在 ES Refresh 時,Index Buffer 被清空,Transaction log 不會清空。
什麼是 Flush

ES Flush & Lucene Commit

  • 實際影響:調用 Refresh,Index Buffer 清空,調⽤ fsync,將緩存中的 Segments寫⼊磁盤,清空(刪除)Transaction Log
  • 調用時機:默認 30 分鐘調用⼀次,或者 Transaction Log 滿 (默認 512 MB)
什麼是 Merge

  • Segment 不少,須要被按期被合併
    • 減小 Segments / 刪除已經刪除的文檔
  • ES 和 Lucene 會自動進行 Merge 操做
    • POST my_index/_forcemerge

Elasticsearch 讀流程

  Elasticsearch 使用主備模型。主備份模型的一個優勢是,主分片和其全部副本分片存有相同的數據。所以,一個同步副本就足以知足讀請求。

  Elasticsearch 中的讀取能夠直接使用 document ID,也能夠是很是複雜的搜索,包含複雜的聚合,這個操做會佔用大量CPU資源。

  當節點接收到讀請求時,該節點負責將其轉發給包含相關分片的節點、整合全部分片的返回值並響應客戶端(相似於一個MapReduce)。咱們將該節點稱爲請求的協調節點。基本流程以下:

  • 將讀請求解析到相關分片。注意,因爲大多數搜索將被髮送到一個或多個索引,所以它們一般須要從多個分片中讀取,每一個分片表示數據的不一樣子集。
  • 從分片複製組中選擇每一個相關分片的活動副本。這能夠是主分片,也能夠是副本分片。默認狀況下,Elasticsearch只是在副本分片之間進行循環。
  • 將分片級別的讀請求發送到所選副本。
  • 整合結果並作出響應。注意,在get by ID查找的狀況下,只有一個分片是相關的,能夠跳過這一步。

思考

  1. 上線前要至少兩人覈實全部配置無誤
  2. 必定要徹底瞭解所使用中間件的基本架構與細節,Elasticsearch 寫 document 時樂觀鎖形式實在是後知後覺

參考資料

  1. www.elastic.co/guide/en/el…
  2. 《Elasticsearch in action》
  3. wangnan.tech/post/elksta…
  4. www.elastic.co/guide/en/el…

本文由 發給官兵 創做,採用 CC BY 3.0 CN協議 進行許可。 可自由轉載、引用,但需署名做者且註明文章出 處。如轉載至微信公衆號,請在文末添加做者公衆號二維碼。

相關文章
相關標籤/搜索