在《圖解Kafka中的基本概念》中已經對副本進行了介紹。咱們先回顧下,Kafka中一個分區能夠擁有多個副本,副本可分佈於多臺機器上。而在多個副本中,只會有一個Leader副本與客戶端交互,也就是讀寫數據。其餘則做爲Follower副本,負責同步Leader的數據,當Leader宕機時,從Follower選舉出新的Leader,從而解決分區單點問題。本文將繼續深刻了解Kafka中副本機制的設計和原理。併發
副本機制的使用在計算機的世界裏是很常見的,好比MySQL、ZooKeeper、CDN等都有使用副本機制。使用副本機制所能帶來的好處有如下幾種:ide
但並非每一個好處都能得到,這仍是和具體的設計有關,好比本文的主角Kafka,只具備第一個好處,即提升可用性。這是由於副本中只有Leader能夠和客戶端交互,進行讀寫,其餘副本是隻能同步,不能分擔讀寫壓力。爲何這麼設計?這和Kafka做爲消息系統有關。好比當咱們使用生產者成功寫入消息後,但願消費者能立馬讀取到剛生產的消息,這也被稱做「Read-Your-Writes」一致性,可理解爲寫後當即讀,要實現這種一致性,若是是隻在Leader上讀寫是很方便實現的。並且也同時保證了「Monotomic Reads」一致性,即單調讀一致性,不會出現消息一會能讀到,一會讀不到的狀況。你可能會問,爲何不讓多個副本均可以讀,來提升讀操做吞吐量,同時加入其它機制來保證這兩個一致性。筆者的理解是在Kafka中已經引入了分區和消費組機制,來提供擴展性,提升讀吞吐量,因此這裏不必再爲了提升讀吞吐量,而讓系統更復雜。翻譯
咱們已經瞭解到當Leader宕機時,咱們要從Follower中選舉出新的Leader,但並非全部的Follower都有資格參與選舉。由於有的Follower的同步狀況滯後,若是讓他成爲Leader將會致使消息丟失。而爲了不這個狀況,Kafka引入了ISR(In-Sync Replica)副本的概念,這是一個集合,裏面存放的是和Leader保持同步的副本並含有Leader。這是一個動態調整的集合,當副本由同步變爲滯後時會從集合中剔除,而當副本由滯後變爲同步時又會加入到集合中。設計
那麼如何判斷一個副本是同步仍是滯後呢?Kafka在0.9版本以前,是根據replica.lag.max.messages參數來判斷,其含義是同步副本所能落後的最大消息數,當Follower上的最大偏移量落後Leader大於replica.lag.max.messages時,就認爲該副本是不一樣步的了,會從ISR中移除。若是ISR的值設置得太小,會致使Follower常常被踢出ISR,而若是設置過大,則當Leader宕機時,會形成較多消息的丟失。在實際使用時,很難給出一個合理值,這是由於當生產者爲了提升吞吐量而調大batch.size時,會發送更多的消息到Leader上,這時候若是不增大replica.lag.max.messages,則會有Follower頻繁被踢出ISR的現象,而當Follower發生Fetch請求同步後,又被加入到ISR中,ISR將頻繁變更。鑑於該參數難以設定,Kafka在0.9版本引入了一個新的參數replica.lag.time.max.ms,默認10s,含義是當Follower超過10s沒發送Fetch請求同步Leader時,就會認爲不一樣步而被踢出ISR。從時間維度來考量,可以很好地避免生產者發送大量消息到Leader副本致使分區ISR頻繁收縮和擴張的問題。3d
當ISR集合爲空時,即沒有同步副本(Leader也掛了),沒法選出下一個Leader,Kafka集羣將會失效。而爲了提升可用性,Kafka提供了unclean.leader.election.enable參數,當設置爲true且ISR集合爲空時,會進行Unclear Leader選舉,容許在非同步副本中選出新的Leader,從而提升Kafka集羣的可用性,但這樣會形成消息丟失。在容許消息丟失的場景中,是能夠開啓此參數來提升可用性的。而其餘狀況,則不建議開啓,而是經過其餘手段來提升可用性。日誌
下面咱們一塊兒瞭解副本同步的原理。副本的本質實際上是一個消息日誌,爲了讓副本正常同步,須要經過一些變量記錄副本的狀態,以下圖所示:code
其中LEO(Last End Offset)記錄了日誌的下一條消息偏移量,即當前最新消息的偏移量加一。而HW(High Watermark)界定了消費者可見的消息,消費者能夠消費小於HW的消息,而大於等於HW的消息將沒法消費。HW和LEO的關係是HW必定小於LEO。下面介紹下HW的概念,其可翻譯爲高水位或高水印,這一律念一般用於在流式處理領域(如Flink、Spark等),流式系統將保證在HW爲t時刻時,建立時間小於等於t時刻的全部事件都已經到達或可被觀測到。而在Kafka中,HW的概念和時間無關,而是和偏移量有關,主要目的是爲了保證一致性。試想若是一個消息到達了Leader,而Follower副本還將來得及同步,但該消息能已被消費者消費了,這時候Leader宕機,Follower副本中選出新的Leader,消息將丟失,出現不一致的現象。因此Kafka引入HW的概念,當消息被同步副本同步完成時,才讓消息可被消費。blog
上述便是LEO和HW的基本概念,下面咱們看下具體是如何工做的。事件
在每一個副本中都存有LEO和HW,而Leader副本中除了存有自身的LEO和HW,還存儲了其餘Follower副本的LEO和HW值,爲了區分咱們把Leader上存儲的Follower副本的LEO和HW值叫作遠程副本的LEO和HW值,以下圖所示:rem
之因此這麼設計,是爲了HW的更新,Leader需保證HW是ISR副本集合中LEO的最小值。關於具體的更新,咱們分爲Follower副本和Leader副原本看。
Follower副本更新LEO和HW的時機只有向Leader拉取了消息以後,會用當前的偏移量加1來更新LEO,而且用Leader的HW值和當前LEO的最小值來更新HW:
CurrentOffset + 1 -> LEO min(LEO, LeaderHW) -> HW
LEO的更新,很好理解。那爲何HW要取LEO和LeaderHW的最小值,爲何不直接取LeaderHW,LeaderHW不是必定大於LEO嗎?咱們在前文簡單的提到了LeaderHW是根據同步副原本決定,因此LeaderHW必定小於全部同步副本的LEO,而並不必定小於非同步副本的LEO,因此若是一個非同步副本在拉取消息,那LEO是會小於LeaderHW的,則應用當前LEO值來更新HW。
說完了Follower副本上LEO和HW的更新,下面看Leader副本。
正常狀況下Leader副本的更新時機有兩個:1、收到生產者的消息;2、被Follower拉取消息。
當收到生產者消息時,會用當前偏移量加1來更新LEO,而後取LEO和遠程ISR副本中LEO的最小值更新HW。
CurrentOffset + 1 -> LEO min(LEO, RemoteIsrLEO) -> HW
而當Follower拉取消息時,會更新Leader上存儲的Follower副本LEO,而後判斷是否須要更新HW,更新的方式和上述相同。
FollowerLEO -> RemoteLEO min(LEO, RemoteIsrLEO) -> HW
除了這兩種正常狀況,而當發生故障時,例如Leader宕機,Follower被選爲新的Leader,會嘗試更新HW。還有副本被踢出ISR時,也會嘗試更新HW。
下面咱們看下更新LEO和HW的示例,假設分區中有兩個副本,min.insync.replica=1。
從上述過程當中,咱們能夠看到remoteLEO、LeaderHW和FollowerHW的更新發生於Follower更新LEO後的第二輪Fetch請求,而這也意味着,更新須要額外一次Fetch請求。而這也將致使在Leader切換時,會存在數據丟失和數據不一致的問題。下面是數據丟失的示例:
當B做爲Follower已經Fetch了最新的消息,可是在發送第二輪Fetch時,將來得及處理響應,宕機了。當重啓時,會根據HW更新LEO,將發生日誌截斷,消息m1被丟棄。這時再發送Fetch請求給A,A宕機了,則B未能同步到消息m1,同時B被選爲Leader,而當A重啓時,做爲Follower同步B的消息時,會根據A的HW值更新HW和LEO,所以由2變成了1,也將發生日誌截斷,而已發送成功的消息m1將永久丟失。
數據不一致的狀況以下:
A做爲Leader,A已寫入m0、m1兩條消息,且HW爲2,而B做爲Follower,只有m0消息,且HW爲1。若A、B同時宕機,且B重啓時,A還未恢復,則B被選爲Leader。
集羣處於上述這種狀態有兩種狀況可能致使,1、宕機前,B不在ISR中,所以A未待B同步,即更新了HW,且unclear leader爲true,容許B成爲Leader;2、宕機前,B同步了消息m1,且發送了第二輪Fetch請求,Leader更新HW,但B未將消息m1落地到磁盤,宕機了,當再重啓時,消息m1丟失,只剩m0。
在B重啓做爲Leader以後,收到消息m2。A宕機重啓後向成爲Leader的B發送Fetch請求,發現本身的HW和B的HW一致,都是2,所以不會進行消息截斷,而這也形成了數據不一致。
爲了解決HW可能形成的數據丟失和數據不一致問題,Kafka引入了Leader Epoch機制,在每一個副本日誌目錄下都有一個leader-epoch-checkpoint文件,用於保存Leader Epoch信息,其內容示例以下:
0 0 1 300 2 500
上面每一行爲一個Leader Epoch,分爲兩部分,前者Epoch,表示Leader版本號,是一個單調遞增的正整數,每當Leader變動時,都會加1,後者StartOffset,爲每一代Leader寫入的第一條消息的位移。例如第0代Leader寫的第一條消息位移爲0,而第1代Leader寫的第一條消息位移爲300,也意味着第0代Leader在寫了0-299號消息後掛了,從新選出了新的Leader。下面咱們看下Leader Epoch如何工做:
當收到生產者發來的第一條消息時,會將新的epoch和當前LEO添加到leader-epoch-checkpoint文件中。
a.向Leader發送LeaderEpochRequest請求,請求內容中含有Follower當前本地的最新Epoch;
b.Leader將返回給Follower的響應中含有一個LastOffset,其取值規則爲:
i.若FollowerLastEpoch = LeaderLastEpoch,則取Leader LEO;
ii.不然,取大於FollowerLastEpoch的第一個Leader Epoch中的StartOffset。
c.Follower在拿到LastOffset後,若LastOffset < LEO,將截斷日誌;
d.Follower開始正常工做,發送Fetch請求;
咱們再回顧看下數據丟失和數據不一致的場景,在應用了LeaderEpoch後發生什麼改變:
當B做爲Follower已經Fetch了最新的消息,可是發送第二輪Fetch時,將來得及處理響應,宕機了。當重啓時,會向A發送LeaderEpochRequest請求。若是A沒宕機,因爲 FollowerLastEpoch = LeaderLastEpoch,因此將LeaderLEO,即2做爲LastOffset給A,又由於LastOffset=LEO,因此不會截斷日誌。這種狀況比較簡單,而圖中所畫的狀況是A宕機的狀況,沒返回LeaderEpochRequest的響應的狀況。這時候B會被選做Leader,將當前LEO和新的Epoch寫進leader-epoch-checkpoint文件中。當A做爲Follower重啓後,發送LeaderEpochRequest請求,包含最新的epoch值0,當B收到請求後,因爲FollowerLastEpoch < LeaderLastEpoch,因此會取大於FollowerLastEpoch的第一個Leader Epoch中的StartOffset,即2。當A收到響應時,因爲LEO = LastOffset,因此不會發生日誌截斷,也就不會丟失數據。
下面是數據不一致狀況:
A做爲Leader,A已寫入m0、m1兩條消息,且HW爲2,而B做爲Follower,只有消息m0,且HW爲1,A、B同時宕機。B重啓,被選爲Leader,將寫入新的LeaderEpoch(1, 1)。B開始工做,收到消息m2時。這是A重啓,將做爲Follower將發送LeaderEpochRequert(FollowerLastEpoch=0),B返回大於FollowerLastEpoch的第一個LeaderEpoch的StartOffset,即1,小於當前LEO值,因此將發生日誌截斷,併發送Fetch請求,同步消息m2,避免了消息不一致問題。
你可能會問,m2消息那豈不是丟失了?是的,m2消息丟失了,但這種狀況的發送的根本緣由在於min.insync.replicas的值設置爲1,即沒有任何其餘副本同步的狀況下,就認爲m2消息爲已提交狀態。LeaderEpoch不能解決min.insync.replicas爲1帶來的數據丟失問題,可是能夠解決其所帶來的數據不一致問題。而咱們以前所說能解決的數據丟失問題,是指消息已經成功同步到Follower上,但因HW未及時更新引發的數據丟失問題。