replica副本同步機制

replica副本同步機制

1 前言

 Kafka的流行歸功於它設計和操做簡單、存儲系統高效、充分利用磁盤順序讀寫等特性、很是適合在線日誌收集等高吞吐場景。html

 Kafka特性之一是它的複製協議。複製協議是保障kafka高可靠性的關鍵。對於單個集羣中每一個Broker不一樣工做負載狀況下,如何自動調優Kafka副本的工做方式是比較有挑戰的。它的挑戰之一是要知道如何避免follower進入和退出同步副本列表(即ISR)。從用戶的角度來看,若是生產者發送一大批海量消息,可能會引發Kafka Broker不少警告。這些警報代表一些topics處於「under replicated」狀態,這些副本處於同步失敗或失效狀態,更意味着數據沒有被複制到足夠數量Broker從而增長數據丟失的機率。所以Kafka集羣中處於「under replicated」中Partition數要密切監控。這個警告應該來自於Broker失效,減慢或暫停等狀態而不是生產者寫不一樣大小消息引發的。算法

2 Kafka的副本機制

Kafka中主題的每一個Partition有一個預寫式日誌文件,每一個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中,Partition中的每一個消息都有一個連續的序列號叫作offset, 肯定它在分區日誌中惟一的位置。網絡

 

 

Kafka每一個topic的partition有N個副本,其中N是topic的複製因子。Kafka經過多副本機制實現故障自動轉移,當Kafka集羣中一個Broker失效狀況下仍然保證服務可用。在Kafka中發生複製時確保partition的預寫式日誌有序地寫到其餘節點上。N個replicas中。其中一個replica爲leader,其餘都爲follower,leader處理partition的全部讀寫請求,與此同時,follower會被動按期地去複製leader上的數據。異步

以下圖所示,Kafka集羣中有4個broker, 某topic有3個partition,且複製因子即副本個數也爲3:post

 

 

Kafka提供了數據複製算法保證,若是leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本爲leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中全部follower滯後的狀態。當producer發送一條消息到broker後,leader寫入消息並複製到全部follower。消息提交以後才被成功複製到全部的同步副本。消息複製延遲受最慢的follower限制,重要的是快速檢測慢副本,若是follower「落後」太多或者失效,leader將會把它從ISR中刪除。fetch

副本同步隊列(ISR)
所謂同步,必須知足以下兩個條件:ui

  • 副本節點必須能與zookeeper保持會話(心跳機制)
  • 副本能複製leader上的全部寫操做,而且不能落後太多。(卡住或滯後的副本控制是由 replica.lag.time.max.ms 配置)

默認狀況下Kafka對應的topic的replica數量爲1,即每一個partition都有一個惟一的leader,爲了確保消息的可靠性,一般應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置爲大於1,好比3。 全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。線程

上一節中的HW俗稱高水位,是HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO做爲HW,consumer最多隻能消費到HW所在的位置。另外每一個replica都有HW,leader和follower各自負責更新本身的HW的狀態。對於leader新寫入的消息,consumer不能馬上消費,leader會等待該消息被全部ISR中的replicas同步後更新HW,此時消息才能被consumer消費。這樣就保證了若是leader所在的broker失效,該消息仍然能夠重新選舉的leader中獲取。對於來自內部broKer的讀取請求,沒有HW的限制。
下圖詳細的說明了當producer生產消息至broker後,ISR以及HW和LEO的流轉過程:設計

 

 

因而可知,Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,同步複製要求全部能工做的follower都複製完,這條消息纔會被commit,這種複製方式極大的影響了吞吐率。而異步複製方式下,follower異步的從leader複製數據,數據只要被leader寫入log就被認爲已經commit,這種狀況下若是follower都尚未複製完,落後於leader時,忽然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。3d

  • Controller來維護:Kafka集羣中的其中一個Broker會被選舉爲Controller,主要負責Partition管理和副本狀態管理,也會執行相似於重分配partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知全部的replicas。
  • leader來維護:leader有單獨的線程按期檢測ISR中follower是否脫離ISR, 若是發現ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節點中。

副本不一樣步的異常狀況

  • 慢副本:在必定週期時間內follower不能追遇上leader。最多見的緣由之一是I / O瓶頸致使follower追加複製消息速度慢於從leader拉取速度。
  • 卡住副本:在必定週期時間內follower中止從leader拉取請求。follower replica卡住了是因爲GC暫停或follower失效或死亡。
  • 新啓動副本:當用戶給主題增長副本因子時,新的follower不在同步副本列表中,直到他們徹底遇上了leader日誌。

 

3 Follower向leader拉取數據的過程

3.1 replica fetcher 線程什麼時候啓動

broker 分配的任何一個 partition 都是以 Replica 對象實例的形式存在,而 Replica 在 Kafka 上是有兩個角色: leader 和 follower,只要這個 Replica 是 follower,它便會向 leader 進行數據同步。

反映在 ReplicaManager 上就是若是 Broker 的本地副本被選舉爲 follower,那麼它將會啓動副本同步線程,其具體實現以下所示:

 

 

簡單來講,makeFollowers() 的處理過程以下:

  1. 先從本地記錄 leader partition 的集合中將這些 partition 移除,由於這些 partition 已經被選舉爲了 follower;
  2. 將這些 partition 的本地副本設置爲 follower,後面就不會接收關於這個 partition 的 Produce 請求了,若是依然有 client 在向這臺 broker 發送數據,那麼它將會返回相應的錯誤;
  3. 先中止關於這些 partition 的副本同步線程(若是本地副本以前是 follower 如今仍是 follower,先關閉的緣由是:這個 partition 的 leader 發生了變化,若是 leader 沒有發生變化,那麼 makeFollower方法返回的是 False,這個 Partition 就不會被添加到 partitionsToMakeFollower 集合中),這樣的話能夠保證這些 partition 的本地副本將不會再有新的數據追加;
  4. 對這些 partition 本地副本日誌文件進行截斷操做並進行 checkpoint 操做;
  5. 完成那些延遲處理的 Produce 和 Fetch 請求;
  6. 若是本地的 broker 沒有掉線,那麼向這些 partition 新選舉出來的 leader 啓動副本同步線程。

關於第6步,並不必定會爲每個 partition 都啓動一個 fetcher 線程,對於一個目的 broker,只會啓動 num.replica.fetchers 個線程,具體這個 topic-partition 會分配到哪一個 fetcher 線程上,是根據 topic 名和 partition id 進行計算獲得,實現所示:

 

 

3.2 replica fetcher 線程啓動

如上所示,在 ReplicaManager 調用 makeFollowers() 啓動 replica fetcher 線程後,它其實是經過 ReplicaFetcherManager 實例進行相關 topic-partition 同步線程的啓動和關閉,其啓動過程分爲下面兩步:

  1. ReplicaFetcherManager 調用 addFetcherForPartitions() 添加對這些 topic-partition 的數據同步流程;
  2. ReplicaFetcherManager 調用 createFetcherThread() 初始化相應的 ReplicaFetcherThread 線程。

 

addFetcherForPartitions() 的具體實現以下所示:

 

 

這個方法實際上是作了下面這幾件事:

  1. 先計算這個 topic-partition 對應的 fetcher id;
  2. 根據 leader 和 fetcher id 獲取對應的 replica fetcher 線程,若是沒有找到,就調用 createFetcherThread() 建立一個新的 fetcher 線程;
  3. 若是是新啓動的 replica fetcher 線程,那麼就啓動這個線程;
  4. 將 topic-partition 記錄到 fetcherThreadMap 中,這個變量記錄每一個 replica fetcher 線程要同步的 topic-partition 列表。

ReplicaFetcherManager 建立 replica Fetcher 線程的實現以下:

 

3.3 replica fetcher 線程處理過程

replica fetcher 線程在啓動以後就開始進行正常數據同步流程了,這個過程都是在 ReplicaFetcherThread 線程中實現的。

ReplicaFetcherThread 的 doWork() 方法是一直在這個線程中的 run() 中調用的,實現方法以下:

 

 

在 doWork() 方法中主要作了兩件事:

  1. 構造相應的 Fetch 請求(buildFetchRequest());
  2. 經過 processFetchRequest() 方法發送 Fetch 請求,並對其結果進行相應的處理。

 

processFetchRequest() 這個方法的做用是發送 Fetch 請求,並對返回的結果進行處理,最終寫入到本地副本的 Log 實例中,其具體實現:

 

 

其處理過程簡單總結一下:

  1. 經過 fetch() 方法,發送 Fetch 請求,獲取相應的 response(若是遇到異常,那麼在下次發送 Fetch 請求以前,會 sleep 一段時間再發);
  2. 若是返回的結果 不爲空,而且 Fetch 請求的 offset 信息與返回結果的 offset 信息對得上,那麼就會調用 processPartitionData() 方法將拉取到的數據追加本地副本的日誌文件中,若是返回結果有錯誤信息,那麼就對相應錯誤進行相應的處理;
  3. 對在 Fetch 過程當中遇到異常或返回錯誤的 topic-partition,會進行 delay 操做,下次 Fetch 請求的發生至少要間隔 replica.fetch.backoff.ms 時間。

 

fetch() 方法做用是發送 Fetch 請求,並返回相應的結果,其具體的實現,以下:

 

 

processPartitionData

這個方法的做用是,處理 Fetch 請求的具體數據內容,簡單來講就是:檢查一下數據大小是否超過限制、將數據追加到本地副本的日誌文件中、更新本地副本的 hw 值。

 

 

3.3 副本同步異常狀況的處理

在副本同步的過程當中,會遇到哪些異常狀況呢?

你們必定會想到關於 offset 的問題,在 Kafka 中,關於 offset 的處理,不管是 producer 端、consumer 端仍是其餘地方,offset 彷佛都是一個如影隨行的問題。在副本同步時,關於 offset,會遇到什麼問題呢?下面舉兩個異常的場景:

  1. 假如當前本地(id:1)的副本如今是 leader,其 LEO 假設爲1000,而另外一個在 isr 中的副本(id:2)其 LEO 爲800,此時出現網絡抖動,id 爲1 的機器掉線後又上線了,可是此時副本的 leader 實際上已經變成了 2,而2的 LEO 爲800,這時候1啓動副本同步線程去2上拉取數據,但願從 offset=1000 的地方開始拉取,可是2上最大的 offset 纔是800,這種狀況該如何處理呢?
  2. 假設一個 replica (id:1)其 LEO 是10,它已經掉線好幾天,這個 partition leader 的 offset 範圍是 [100, 800],那麼 1 重啓啓動時,它但願從 offset=10 的地方開始拉取數據時,這時候發生了 OutOfRange,不過跟上面不一樣的是這裏是小於了 leader offset 的範圍,這種狀況又該怎麼處理?

以上兩種狀況都是 offset OutOfRange 的狀況,只不過:一是 Fetch Offset 超過了 leader 的 LEO,二是 Fetch Offset 小於 leader 最小的 offset

在介紹 Kafka 解決方案以前,咱們先來本身思考一下這兩種狀況應該怎麼處理?

  1. 若是 fetch offset 超過 leader 的 offset,這時候副本應該是回溯到 leader 的 LEO 位置(超過這個值的數據刪除),而後再去進行副本同步,固然這種解決方案實際上是沒法保證 leader 與 follower 數據的徹底一致,再次發生 leader 切換時,可能會致使數據的可見性不一致,但既然用戶容許了髒選舉的發生,其實咱們是能夠認爲用戶是能夠接收這種狀況發生的;
  2. 這種就比較容易處理,首先清空本地的數據,由於本地的數據都已通過期了,而後從 leader 的最小 offset 位置開始拉取數據。

上面是咱們比較容易想出的解決方案,而在 Kafka 中,其解決方案也很相似,不過遇到狀況比上面咱們列出的兩種狀況多了一些複雜,其解決方案以下:

 

 

針對第一種狀況,在 Kafka 中,實際上還會發生這樣一種狀況,1 在收到 OutOfRange 錯誤時,這時去 leader 上獲取的 LEO 值與最小的 offset 值,這時候卻發現 leader 的 LEO 已經從 800 變成了 1100(這個 topic-partition 的數據量增加得比較快),再按照上面的解決方案就不太合理,Kafka 這邊的解決方案是:遇到這種狀況,進行重試就能夠了,下次同步時就會正常了,可是依然會有上面說的那個問題。

3.4 replica fetcher 線程的關閉

 replica fetcher 線程關閉的條件,在三種狀況下會關閉對這個 topic-partition 的拉取操做:

  1. stopReplica():broker 收到了 controller 發來的 StopReplica 請求,這時會開始關閉對指定 topic-partition 的同步線程;
  2. makeLeaders:這些 partition 的本地副本被選舉成了 leader,這時候就會先中止對這些 topic-partition 副本同步線程;
  3. makeFollowers():前面已經介紹過,這裏實際上中止副本同步,而後再開啓副本同步線程,由於這些 topic-partition 的 leader 可能發生了切換。

這裏直接說線程關閉,其實不是很準確,由於每一個 replica fetcher 線程操做的是多個 topic-partition,而在關閉的粒度是 partition 級別,只有這個線程分配的 partition 所有關閉後,這個線程纔會真正被關閉。

stopReplica

StopReplica 的請求其實是 Controller 發送過來的,這個在 controller 部分會講述,它觸發的條件有多種,好比:broker 下線、partition replica 遷移等等。

makeLeaders

makeLeaders() 方法的調用是在 broker 上這個 partition 的副本被設置爲 leader 時觸發的,其實現以下:

 

 

調用 ReplicaFetcherManager 的 removeFetcherForPartitions() 刪除對這些 topic-partition 的副本同步設置,這裏在實現時,會遍歷全部的 replica fetcher 線程,都執行 removePartitions() 方法來移除對應的 topic-partition 集合。

 

 

removePartitions

這個方法的做用是:ReplicaFetcherThread 將這些 topic-partition 從本身要拉取的 partition 列表中移除。

 

 

 

ReplicaFetcherThread 的關閉

前面介紹那麼多,彷佛仍是沒有真正去關閉,那麼 ReplicaFetcherThread 真正關閉是哪裏操做的呢?

實際上 ReplicaManager 每次處理完 LeaderAndIsr 請求後,都會調用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,若是 fetcher 線程要拉取的 topic-partition 集合爲空,那麼就會關閉掉對應的 fetcher 線程。

 

 

參考資料:

http://www.javashuo.com/article/p-szajsrgw-kn.html

http://www.javashuo.com/article/p-urlfflqd-hk.html

https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability

相關文章
相關標籤/搜索