~~~這是一篇有點長的文章,但願不會令你昏昏欲睡~~~算法
本文主要討論0.11版本以前Kafka的副本備份機制的設計問題以及0.11是如何解決的。簡單來講,0.11以前副本備份機制主要依賴水位(或水印)的概念,而0.11採用了leader epoch來標識備份進度。後面咱們會詳細討論兩種機制的差別。不過首先先作一些基本的名詞含義解析。緩存
水位或水印(watermark)一詞,也可稱爲高水位(high watermark),一般被用在流式處理領域(好比Apache Flink、Apache Spark等),以表徵元素或事件在基於時間層面上的進度。一個比較經典的表述爲:流式系統保證在水位t時刻,建立時間(event time) = t'且t' ≤ t的全部事件都已經到達或被觀測到。在Kafka中,水位的概念反而與時間無關,而是與位置信息相關。嚴格來講,它表示的就是位置信息,即位移(offset)。網上有一些關於Kafka watermark的介紹,本不該再贅述,但鑑於本文想要重點強調的leader epoch與watermark息息相關,故這裏再費些篇幅闡述一下watermark。注意:因爲Kafka源碼中使用的名字是高水位,故本文將始終使用high watermaker或乾脆簡稱爲HW。數據結構
Kafka分區下有可能有不少個副本(replica)用於實現冗餘,從而進一步實現高可用。副本根據角色的不一樣可分爲3類:異步
每一個Kafka副本對象都有兩個重要的屬性:LEO和HW。注意是全部的副本,而不僅是leader副本。fetch
咱們使用下圖來形象化地說明二者的關係:spa
上圖中,HW值是7,表示位移是0~7的全部消息都已經處於「已備份狀態」(committed),而LEO值是15,那麼8~14的消息就是還沒有徹底備份(fully replicated)——爲何沒有15?由於剛纔說過了,LEO指向的是下一條消息到來時的位移,故上圖使用虛線框表示。咱們總說consumer沒法消費未提交消息。這句話若是用以上名詞來解讀的話,應該表述爲:consumer沒法消費分區下leader副本中位移值大於分區HW的任何消息。這裏須要特別注意分區HW就是leader副本的HW值。scala
既然副本分爲leader副本和follower副本,而每一個副本又都有HW和LEO,那麼它們是怎麼被更新的呢?它們更新的機制又有什麼區別呢?咱們一一來分析下:設計
1、follower副本什麼時候更新LEO?3d
如前所述,follower副本只是被動地向leader副本請求數據,具體表現爲follower副本不停地向leader副本所在的broker發送FETCH請求,一旦獲取消息後寫入本身的日誌中進行備份。那麼follower副本的LEO是什麼時候更新的呢?首先我必須言明,Kafka有兩套follower副本LEO(明白這個是搞懂後面內容的關鍵,所以請多花一點時間來思考):1. 一套LEO保存在follower副本所在broker的副本管理機中;2. 另外一套LEO保存在leader副本所在broker的副本管理機中——換句話說,leader副本機器上保存了全部的follower副本的LEO。日誌
爲何要保存兩套?這是由於Kafka使用前者幫助follower副本更新其HW值;而利用後者幫助leader副本更新其HW使用。下面咱們分別看下它們被更新的時機。
1 follower副本端的follower副本LEO什麼時候更新?(原諒我有點拗口~~~~~)
follower副本端的LEO值就是其底層日誌的LEO值,也就是說每當新寫入一條消息,其LEO值就會被更新(相似於LEO += 1)。當follower發送FETCH請求後,leader將數據返回給follower,此時follower開始向底層log寫數據,從而自動地更新LEO值
2 leader副本端的follower副本LEO什麼時候更新?
leader副本端的follower副本LEO的更新發生在leader在處理follower FETCH請求時。一旦leader接收到follower發送的FETCH請求,它首先會從本身的log中讀取相應的數據,可是在給follower返回數據以前它先去更新follower的LEO(即上面所說的第二套LEO)
2、follower副本什麼時候更新HW?
follower更新HW發生在其更新LEO以後,一旦follower向log寫完數據,它會嘗試更新它本身的HW值。具體算法就是比較當前LEO值與FETCH響應中leader的HW值,取二者的小者做爲新的HW值。這告訴咱們一個事實:若是follower的LEO值超過了leader的HW值,那麼follower HW值是不會越過leader HW值的。
3、leader副本什麼時候更新LEO?
和follower更新LEO道理相同,leader寫log時就會自動地更新它本身的LEO值。
4、leader副本什麼時候更新HW值?
前面說過了,leader的HW值就是分區HW值,所以什麼時候更新這個值是咱們最關心的,由於它直接影響了分區數據對於consumer的可見性 。如下4種狀況下leader會嘗試去更新分區HW——切記是嘗試,有可能由於不知足條件而不作任何更新:
特別注意上面4個條件中的最後兩個。它揭示了一個事實——當Kafka broker都正常工做時,分區HW值的更新時機有兩個:leader處理PRODUCE請求時和leader處理FETCH請求時。另外,leader是如何更新它的HW值的呢?前面說過了,leader broker上保存了一套follower副本的LEO以及它本身的LEO。當嘗試肯定分區HW時,它會選出全部知足條件的副本,比較它們的LEO(固然也包括leader本身的LEO),並選擇最小的LEO值做爲HW值。這裏的知足條件主要是指副本要知足如下兩個條件之一:
乍看上去好像這兩個條件說得是一回事,畢竟ISR的定義就是第二個條件描述的那樣。但某些狀況下Kafka的確可能出現副本已經「追上」了leader的進度,但卻不在ISR中——好比某個從failure中恢復的副本。若是Kafka只判斷第一個條件的話,肯定分區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具有了「馬上進入ISR」的資格,所以就可能出現分區HW值越過ISR中副本LEO的狀況——這確定是不容許的,由於分區HW實際上就是ISR中全部副本LEO的最小值。
好了,理論部分我以爲說的差很少了,下面舉個實際的例子。咱們假設有一個topic,單分區,副本因子是2,即一個leader副本和一個follower副本。咱們看下當producer發送一條消息時,broker端的副本到底會發生什麼事情以及分區HW是如何被更新的。
下圖是初始狀態,咱們稍微解釋一下:初始時leader和follower的HW和LEO都是0(嚴格來講源代碼會初始化LEO爲-1,不過這不影響以後的討論)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此時,producer沒有發送任何消息給leader,而follower已經開始不斷地給leader發送FETCH請求了,但由於沒有數據所以什麼都不會發生。值得一提的是,follower發送過來的FETCH請求由於無數據而暫時會被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms參數)超時後會強制完成。假若在寄存期間producer端發送過來數據,那麼會Kafka會自動喚醒該FETCH請求,讓leader繼續處理之。
雖然purgatory不是本文的重點,但FETCH請求發送和PRODUCE請求處理的時機會影響咱們的討論。所以後續咱們也將分兩種狀況來討論分區HW的更新。
第一種狀況:follower發送FETCH請求在leader處理完PRODUCE請求以後
producer給該topic分區發送了一條消息。此時的狀態以下圖所示:
如圖所示,leader接收到PRODUCE請求主要作兩件事情:
因此,PRODUCE請求處理完成後leader端的HW值依然是0,而LEO是1,remote LEO是1。假設此時follower發送了FETCH請求(或者說follower早已發送了FETCH請求,只不過在broker的請求隊列中排隊),那麼狀態變動以下圖所示:
本例中當follower發送FETCH請求時,leader端的處理依次是:
而follower副本接收到FETCH response後依次執行下列操做:
此時,第一輪FETCH RPC結束,咱們會發現雖然leader和follower都已經在log中保存了這條消息,但分區HW值還沒有被更新。實際上,它是在第二輪FETCH RPC中被更新的,以下圖所示:
上圖中,follower發來了第二輪FETCH請求,leader端接收到後仍然會依次執行下列操做:
一樣地,follower副本接收到FETCH response後依次執行下列操做:
Okay,producer端發送消息後broker端完整的處理流程就講完了。此時消息已經成功地被複制到leader和follower的log中且分區HW是1,代表consumer可以消費offset = 0的這條消息。下面咱們來分析下PRODUCE和FETCH請求交互的第二種狀況。
第二種狀況:FETCH請求保存在purgatory中PRODUCE請求到來
這種狀況實際上和第一種狀況差很少。前面說過了,當leader沒法當即知足FECTH返回要求的時候(好比沒有數據),那麼該FETCH請求會被暫存到leader端的purgatory中,待時機成熟時會嘗試再次處理它。不過Kafka不會無限期地將其緩存着,默認有個超時時間(500ms),一旦超時時間已過,則這個請求會被強制完成。不過咱們要討論的場景是在寄存期間,producer發送PRODUCE請求從而使之知足了條件從而被喚醒。此時,leader端處理流程以下:
至於喚醒後的FETCH請求的處理與第一種狀況徹底一致,故這裏不作詳細展開了。
以上全部的東西其實就想說明一件事情:Kafka使用HW值來決定副本備份的進度,而HW值的更新一般須要額外一輪FETCH RPC才能完成,故而這種設計是有問題的。它們可能引發的問題包括:
咱們一一分析下:
1、數據丟失
如前所述,使用HW值來肯定備份進度時其值的更新是在下一輪RPC中完成的。如今翻到上面使用兩種不一樣顏色標記的步驟處思考下, 若是follower副本在藍色標記的第一步與紫色標記的第二步之間發生崩潰,那麼就有可能形成數據的丟失。咱們舉個例子來看下。
上圖中有兩個副本:A和B。開始狀態是A是leader。咱們假設producer端min.insync.replicas設置爲1,那麼當producer發送兩條消息給A後,A寫入到底層log,此時Kafka會通知producer說這兩條消息寫入成功。
可是在broker端,leader和follower底層的log雖都寫入了2條消息且分區HW已經被更新到2,但follower HW還沒有被更新(也就是上面紫色顏色標記的第二步還沒有執行)。假若此時副本B所在的broker宕機,那麼重啓回來後B會自動把LEO調整到以前的HW值,故副本B會作日誌截斷(log truncation),將offset = 1的那條消息從log中刪除,並調整LEO = 1,此時follower副本底層log中就只有一條消息,即offset = 0的消息。
B重啓以後須要給A發FETCH請求,但若A所在broker機器在此時宕機,那麼Kafka會令B成爲新的leader,而當A重啓回來後也會執行日誌截斷,將HW調整回1。這樣,位移=1的消息就從兩個副本的log中被刪除,即永遠地丟失了。
這個場景丟失數據的前提是在min.insync.replicas=1時,一旦消息被寫入leader端log即被認爲是「已提交」,而延遲一輪FETCH RPC更新HW值的設計使得follower HW值是異步延遲更新的,假若在這個過程當中leader發生變動,那麼成爲新leader的follower的HW值就有多是過時的,使得clients端認爲是成功提交的消息被刪除。
2、leader/follower數據離散
除了可能形成的數據丟失之外,這種設計還有一個潛在的問題,即形成leader端log和follower端log的數據不一致。好比leader端保存的記錄序列是r1,r2,r3,r4,r5,....;而follower端保存的序列多是r1,r3,r4,r5,r6...。這也是非法的場景,由於顧名思義,follower必須追隨leader,完整地備份leader端的數據。
咱們依然使用一張圖來講明這種場景是如何發生的:
這種狀況的初始狀態與狀況1有一些不一樣的:A依然是leader,A的log寫入了2條消息,但B的log只寫入了1條消息。分區HW更新到2,但B的HW仍是1,同時producer端的min.insync.replicas = 1。
此次咱們讓A和B所在機器同時掛掉,而後假設B先重啓回來,所以成爲leader,分區HW = 1。假設此時producer發送了第3條消息(綠色框表示)給B,因而B的log中offset = 1的消息變成了綠色框表示的消息,同時分區HW更新到2(A尚未回來,就B一個副本,故能夠直接更新HW而不用理會A)以後A重啓回來,須要執行日誌截斷,但發現此時分區HW=2而A以前的HW值也是2,故不作任何調整。此後A和B將以這種狀態繼續正常工做。
顯然,這種場景下,A和B底層log中保存在offset = 1的消息是不一樣的記錄,從而引起不一致的情形出現。
Kafka 0.11.0.0.版本解決方案
形成上述兩個問題的根本緣由在於HW值被用於衡量副本備份的成功與否以及在出現failture時做爲日誌截斷的依據,但HW值的更新是異步延遲的,特別是須要額外的FETCH請求處理流程才能更新,故這中間發生的任何崩潰均可能致使HW值的過時。鑑於這些緣由,Kafka 0.11引入了leader epoch來取代HW值。Leader端多開闢一段內存區域專門保存leader的epoch信息,這樣即便出現上面的兩個場景也能很好地規避這些問題。
所謂leader epoch其實是一對值:(epoch,offset)。epoch表示leader的版本號,從0開始,當leader變動過1次時epoch就會+1,而offset則對應於該epoch版本的leader寫入第一條消息的位移。所以假設有兩對值:
(0, 0)
(1, 120)
則表示第一個leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個leader版本號是1,從位移120處開始寫入消息。
leader broker中會保存這樣的一個緩存,並按期地寫入到一個checkpoint文件中。
當leader寫底層log時它會嘗試更新整個緩存——若是這個leader首次寫消息,則會在緩存中增長一個條目;不然就不作更新。而每次副本從新成爲leader時會查詢這部分緩存,獲取出對應leader版本的位移,這就不會發生數據不一致和丟失的狀況。
下面咱們依然使用圖的方式來講明下利用leader epoch如何規避上述兩種狀況
1、規避數據丟失
上圖左半邊已經給出了簡要的流程描述,這裏不詳細展開具體的leader epoch實現細節(好比OffsetsForLeaderEpochRequest的實現),咱們只須要知道每一個副本都引入了新的狀態來保存本身當leader時開始寫入的第一條消息的offset以及leader版本。這樣在恢復的時候徹底使用這些信息而非水位來判斷是否須要截斷日誌。
2、規避數據不一致
一樣的道理,依靠leader epoch的信息能夠有效地規避數據不一致的問題。
總結
0.11.0.0版本的Kafka經過引入leader epoch解決了原先依賴水位表示副本進度可能形成的數據丟失/數據不一致問題。有興趣的讀者能夠閱讀源代碼進一步地瞭解其中的工做原理。
源代碼位置:kafka.server.epoch.LeaderEpochCache.scala (leader epoch數據結構)、kafka.server.checkpoints.LeaderEpochCheckpointFile(checkpoint檢查點文件操做類)還有分佈在Log中的CRUD操做。