Kafka的流行歸功於它設計和操做簡單、存儲系統高效、充分利用磁盤順序讀寫等特性、很是適合在線日誌收集等高吞吐場景。html
Kafka特性之一是它的複製協議。複製協議是保障kafka高可靠性的關鍵。對於單個集羣中每一個Broker不一樣工做負載狀況下,如何自動調優Kafka副本的工做方式是比較有挑戰的。它的挑戰之一是要知道如何避免follower進入和退出同步副本列表(即ISR)。從用戶的角度來看,若是生產者發送一大批海量消息,可能會引發Kafka Broker不少警告。這些警報代表一些topics處於「under replicated」狀態,這些副本處於同步失敗或失效狀態,更意味着數據沒有被複制到足夠數量Broker從而增長數據丟失的機率。所以Kafka集羣中處於「under replicated」中Partition數要密切監控。這個警告應該來自於Broker失效,減慢或暫停等狀態而不是生產者寫不一樣大小消息引發的。算法
Kafka中主題的每一個Partition有一個預寫式日誌文件,每一個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中,Partition中的每一個消息都有一個連續的序列號叫作offset, 肯定它在分區日誌中惟一的位置。網絡
Kafka每一個topic的partition有N個副本,其中N是topic的複製因子。Kafka經過多副本機制實現故障自動轉移,當Kafka集羣中一個Broker失效狀況下仍然保證服務可用。在Kafka中發生複製時確保partition的預寫式日誌有序地寫到其餘節點上。N個replicas中。其中一個replica爲leader,其餘都爲follower,leader處理partition的全部讀寫請求,與此同時,follower會被動按期地去複製leader上的數據。異步
以下圖所示,Kafka集羣中有4個broker, 某topic有3個partition,且複製因子即副本個數也爲3:post
Kafka提供了數據複製算法保證,若是leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本爲leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中全部follower滯後的狀態。當producer發送一條消息到broker後,leader寫入消息並複製到全部follower。消息提交以後才被成功複製到全部的同步副本。消息複製延遲受最慢的follower限制,重要的是快速檢測慢副本,若是follower「落後」太多或者失效,leader將會把它從ISR中刪除。fetch
副本同步隊列(ISR)
所謂同步,必須知足以下兩個條件:ui
默認狀況下Kafka對應的topic的replica數量爲1,即每一個partition都有一個惟一的leader,爲了確保消息的可靠性,一般應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置爲大於1,好比3。 全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。線程
上一節中的HW俗稱高水位,是HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO做爲HW,consumer最多隻能消費到HW所在的位置。另外每一個replica都有HW,leader和follower各自負責更新本身的HW的狀態。對於leader新寫入的消息,consumer不能馬上消費,leader會等待該消息被全部ISR中的replicas同步後更新HW,此時消息才能被consumer消費。這樣就保證了若是leader所在的broker失效,該消息仍然能夠重新選舉的leader中獲取。對於來自內部broKer的讀取請求,沒有HW的限制。
下圖詳細的說明了當producer生產消息至broker後,ISR以及HW和LEO的流轉過程:設計
因而可知,Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,同步複製要求全部能工做的follower都複製完,這條消息纔會被commit,這種複製方式極大的影響了吞吐率。而異步複製方式下,follower異步的從leader複製數據,數據只要被leader寫入log就被認爲已經commit,這種狀況下若是follower都尚未複製完,落後於leader時,忽然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。3d
副本不一樣步的異常狀況
broker 分配的任何一個 partition 都是以 Replica 對象實例的形式存在,而 Replica 在 Kafka 上是有兩個角色: leader 和 follower,只要這個 Replica 是 follower,它便會向 leader 進行數據同步。
反映在 ReplicaManager 上就是若是 Broker 的本地副本被選舉爲 follower,那麼它將會啓動副本同步線程,其具體實現以下所示:
簡單來講,makeFollowers() 的處理過程以下:
關於第6步,並不必定會爲每個 partition 都啓動一個 fetcher 線程,對於一個目的 broker,只會啓動 num.replica.fetchers
個線程,具體這個 topic-partition 會分配到哪一個 fetcher 線程上,是根據 topic 名和 partition id 進行計算獲得,實現所示:
如上所示,在 ReplicaManager 調用 makeFollowers() 啓動 replica fetcher 線程後,它其實是經過 ReplicaFetcherManager 實例進行相關 topic-partition 同步線程的啓動和關閉,其啓動過程分爲下面兩步:
addFetcherForPartitions()
的具體實現以下所示:
這個方法實際上是作了下面這幾件事:
ReplicaFetcherManager 建立 replica Fetcher 線程的實現以下:
replica fetcher 線程在啓動以後就開始進行正常數據同步流程了,這個過程都是在 ReplicaFetcherThread 線程中實現的。
ReplicaFetcherThread 的 doWork()
方法是一直在這個線程中的 run()
中調用的,實現方法以下:
在 doWork() 方法中主要作了兩件事:
processFetchRequest()
這個方法的做用是發送 Fetch 請求,並對返回的結果進行處理,最終寫入到本地副本的 Log 實例中,其具體實現:
其處理過程簡單總結一下:
fetch()
方法做用是發送 Fetch 請求,並返回相應的結果,其具體的實現,以下:
processPartitionData
這個方法的做用是,處理 Fetch 請求的具體數據內容,簡單來講就是:檢查一下數據大小是否超過限制、將數據追加到本地副本的日誌文件中、更新本地副本的 hw 值。
在副本同步的過程當中,會遇到哪些異常狀況呢?
你們必定會想到關於 offset 的問題,在 Kafka 中,關於 offset 的處理,不管是 producer 端、consumer 端仍是其餘地方,offset 彷佛都是一個如影隨行的問題。在副本同步時,關於 offset,會遇到什麼問題呢?下面舉兩個異常的場景:
以上兩種狀況都是 offset OutOfRange 的狀況,只不過:一是 Fetch Offset 超過了 leader 的 LEO,二是 Fetch Offset 小於 leader 最小的 offset
在介紹 Kafka 解決方案以前,咱們先來本身思考一下這兩種狀況應該怎麼處理?
上面是咱們比較容易想出的解決方案,而在 Kafka 中,其解決方案也很相似,不過遇到狀況比上面咱們列出的兩種狀況多了一些複雜,其解決方案以下:
針對第一種狀況,在 Kafka 中,實際上還會發生這樣一種狀況,1 在收到 OutOfRange 錯誤時,這時去 leader 上獲取的 LEO 值與最小的 offset 值,這時候卻發現 leader 的 LEO 已經從 800 變成了 1100(這個 topic-partition 的數據量增加得比較快),再按照上面的解決方案就不太合理,Kafka 這邊的解決方案是:遇到這種狀況,進行重試就能夠了,下次同步時就會正常了,可是依然會有上面說的那個問題。
replica fetcher 線程關閉的條件,在三種狀況下會關閉對這個 topic-partition 的拉取操做:
這裏直接說線程關閉,其實不是很準確,由於每一個 replica fetcher 線程操做的是多個 topic-partition,而在關閉的粒度是 partition 級別,只有這個線程分配的 partition 所有關閉後,這個線程纔會真正被關閉。
stopReplica
StopReplica 的請求其實是 Controller 發送過來的,這個在 controller 部分會講述,它觸發的條件有多種,好比:broker 下線、partition replica 遷移等等。
makeLeaders
makeLeaders()
方法的調用是在 broker 上這個 partition 的副本被設置爲 leader 時觸發的,其實現以下:
調用 ReplicaFetcherManager 的 removeFetcherForPartitions()
刪除對這些 topic-partition 的副本同步設置,這裏在實現時,會遍歷全部的 replica fetcher 線程,都執行 removePartitions()
方法來移除對應的 topic-partition 集合。
removePartitions
這個方法的做用是:ReplicaFetcherThread 將這些 topic-partition 從本身要拉取的 partition 列表中移除。
ReplicaFetcherThread 的關閉
前面介紹那麼多,彷佛仍是沒有真正去關閉,那麼 ReplicaFetcherThread 真正關閉是哪裏操做的呢?
實際上 ReplicaManager 每次處理完 LeaderAndIsr 請求後,都會調用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads()
方法,若是 fetcher 線程要拉取的 topic-partition 集合爲空,那麼就會關閉掉對應的 fetcher 線程。
http://www.javashuo.com/article/p-szajsrgw-kn.html
http://www.javashuo.com/article/p-urlfflqd-hk.html
https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability