Kafka needs no Keeper(關於KIP-500的討論)

寫在前面的

最近看了Kafka Summit上的這個分享,以爲名字很霸氣,標題直接沿用了。這個分享源於社區的KIP-500,大致的意思從此Apache Kafka再也不須要ZooKeeper。整個分享大約40幾分鐘。完整看下來感受乾貨不少,這裏特地總結出來。若是你把這個分享看作是《三國志》的話,那麼姑且就把個人這篇看作是裴松之注吧:)node

客戶端演進

首先,社區committer給出了Kafka Java客戶端移除ZooKeeper依賴的演進過程。下面兩張圖總結了0.8.x版本和0.11.x版本(是否真的是從0.11版本開始的變化並不重要)及之後的功能變遷:在Kafka 0.8時代,Kafka有3個客戶端,分別是Producer、Consumer和Admin Tool。其中Producer負責向Kafka寫消息,Consumer負責從Kafka讀消息,而Admin Tool執行各類運維任務,好比建立或刪除主題等。其中Consumer的位移數據保存在ZooKeeper上,所以Consumer端的位移提交和位移獲取操做都須要訪問ZooKeeper。另外Admin Tool執行運維操做也要訪問ZooKeeper,好比在對應的ZooKeeper znode上建立一個臨時節點,而後由預約義的Watch觸發相應的處理邏輯。算法

後面隨着Kafka的演進,社區引入了__consumer_offsets位移主題,同時定義了OffsetFetch和OffsetCommit等新的RPC協議,這樣Consumer的位移提交和位移獲取操做所有轉移到與位移主題進行交互,避免了對ZooKeeper的訪問。同時社區引入了新的運維工具AdminClient以及相應的CreateTopics、DeleteTopics、AlterConfigs等RPC協議,替換了原先的Admin Tool,這樣建立和刪除主題這樣的運維操做也徹底移動Kafka這一端來作,就像下面右邊這張圖展現的:apache

至此, Kafka的3個客戶端基本上都不須要和ZooKeeper交互了。應該說移除ZooKeeper的工做完成了大部分,但依然還有一部分工做要在ZooKeeper的幫助下完成,即Consumer的Rebalance操做。在0.8時代,Consumer Group的管理是交由ZooKeeper完成的,包括組成員的管理和訂閱分區的分配。這個設計在新版Consumer中也獲得了修正。所有的Group管理操做交由Kafka Broker端新引入的Coordinator組件來完成。要完成這些工做,Broker端新增了不少RPC協議,好比JoinGroup、SyncGroup、Heartbeat、LeaveGroup等。緩存

  

此時,Kafka的Java客戶端除了AdminClient還有一點要依賴ZooKeeper以外,全部其餘的組件所有擺脫了對ZooKeeper的依賴。安全

以後,社區引入了Kafka安全層,實現了對用戶的認證和受權。這個額外的安全層也是不須要訪問ZooKeeper的,所以以前依賴ZooKeeper的客戶端是沒法「享用」這個安全層。一旦啓用,新版Clients都須要首先接入這一層並經過審覈以後才能訪問到Broker,以下圖所示:網絡

這麼作的好處在於統一了Clients訪問Broker的模式,即定義RPC協議,好比咱們熟知的PRODUCE協議、FETCH協議、METADATA協議、CreateTopics協議等。若是後面須要實現更多的功能,社區只須要定義新的RPC協議便可。同時新引入的安全層負責對這套RPC協議進行安全校驗,統一了訪問模式。另外這些協議都是版本化的(versioned),所以可以獨立地進行演進,同時也兼顧了兼容性方面的考量。數據結構

Broker間交互

說完了Clients端,咱們說下Broker端的現狀。目前,應該說Kafka Broker端對ZooKeeper是重度依賴的,主要表如今如下幾個方面:架構

  • Broker註冊管理
  • ACL安全層配置管理
  • 動態參數管理
  • 副本ISR管理
  • Controller選舉

咱們拿一張圖來講明,圖中有4個Broker節點和一個ZooKeeper,左上角的Broker充當Controller的角色。當前,全部的Broker啓動後都必須維持與ZooKeeper的會話。Kafka依賴於這個會話實現Broker端的註冊,並且Kafka集羣中的全部配置信息、副本信息、主題信息也都保存在ZooKeeper上。最後Controller與集羣中每一個Broker都維持了一個TCP長鏈接用於向這些Broker發送RPC請求。當前的Controller RPC類型主要有3大類:運維

  • LeaderAndIsr:主要用於向集羣廣播主題分區Leader和ISR的變動狀況,好比對應的Broker應該是特定分區的Leader仍是Follower
  • StopReplica:向集羣廣播執行中止副本的命令
  • UpdateMetadata:向集羣廣播執行變動元數據信息的命令

圖中還新增了一個AlterISR RPC,這是KIP-497要實現的新RPC協議。現階段Kafka各個主題的ISR信息所有保存在ZooKeeper中。若是後續要捨棄ZooKeeper,必需要將這些信息從ZooKeeper中移出來,放在了Controller一端來作。同時還要在程序層面支持對ISR的管理。所以社區計劃在KIP-497上增長AlterISR協議。對了,還要提一句,當前Controller的選舉也是依靠ZooKeeper完成的。異步

因此後面Broker端的演進可能和Clients端的路線差很少:首先是把Broker與ZooKeeper的交互所有幹掉,只讓Controller與ZooKeeper進行交互,而其餘全部Broker都只與Controller交互,以下圖所示:

 

看上去這種演進路線社區已經走得輕車熟路了,但實際上還有遺留了一些問題須要解決。

Broker Liveness

首先就是Broker的liveness問題,即Kafka如何判斷一個Broker究竟是否存活?在目前的設計中,Broker的生存性監測徹底依賴於與ZooKeeper之間的會話。一旦會話超時或斷開Controller自動觸發ZooKeeper端的Watch來移除該Broker,並對其上的分區作善後處理。若是移除了ZooKeeper,Kafka應該採用什麼機制來判斷Broker的生存性是一個問題。

Network Partition

如何防範網絡分區也是一個須要討論的話題。當前可能出現的Network Partition有4種:一、單個Broker徹底與集羣隔離;二、Broker間沒法通信;三、Broker與ZooKeeper沒法通信;四、Broker與Controller沒法通信。下面4張圖分別展現了這4種狀況:

 

咱們分別討論下。首先是第一種狀況,單Broker與集羣其餘Broker隔離,這其實並不算太嚴重的問題。當前的設計已然可以保證很好地應對此種狀況。一旦Broker被隔離,Controller會將其從集羣中摘除,雖然可用性下降了,可是整個集羣的一致性依然可以獲得保證。第二種狀況是Broker間沒法通信,可能的後果是消息的備份機制沒法執行,Kafka要收縮ISR,依然是可用性上的下降,可是一致性狀態並無被破壞。狀況三是Broker沒法與ZooKeeper通信。Broker能正常運轉,它只是沒法與ZooKeeper進行通信。此時咱們說該Broker處於殭屍狀態,即所謂的Zoobie狀態。因Zoobie狀態引入的一致性bug社區jira中一直沒有斷過,社區這幾年也一直在修正這方面的問題,主要對抗的機制就是fencing。好比leader epoch等。最後一類狀況是Broker沒法與Controller通信,那麼全部的元數據更新通道被堵死,即便這個Broker依然是healthy的,可是它保存的元數據信息多是很是過時的。這樣鏈接該Broker的客戶端可能會看到各類很是古怪的問題。以前在知乎上回答過相似的問題:https://www.zhihu.com/question/313683699/answer/609887054。目前,社區對這種狀況並無太好的解決辦法,主要的緣由是Broker的liveness徹底交由ZooKeeper來作的。一旦Broker與ZooKeeper之間的交互沒有問題,其餘緣由致使的liveness問題就沒法完全規避。

第四類Network Partition引入了一個經典的場景:元數據不一致。目前每一個Broker都緩存了一份集羣的元數據信息,這份數據是異步更新的。當第四類Partition發生時,Broker端緩存的元數據信息必然與Controller的不一樣步,從而形成各類各樣的問題。

下面簡要介紹一下元數據更新的過程。主要的流程就是Controller啓動時會同步地從ZooKeeper上拉取集羣全量的元數據信息,以後再以異步的方式同步給其餘Broker。其餘Broker與Controller之間的同步每每有一個時間差,也就是說可能Clients訪問的元數據並非最新的。我我的認爲如今社區不少flaky test failure都是由於這個緣由致使的。 事實上,實際使用過程當中有不少場景是Broker端的元數據與Controller端永遠不一樣步。一般狀況下若是咱們不重啓Broker的話,那麼這個Broker上的元數據將永遠「錯誤」下去。好在社區還給出了一個最後的「大招」: 登陸到ZooKeeper SHELL,手動執行rmr /controller,強迫Controller重選舉,而後從新加載元數據,並給全部Broker重刷一份。不過在實際生產環境,我懷疑是否有人真的要這麼幹,畢竟代價不小,並且最關鍵的是這麼作依然可能存在兩個問題:1. 咱們如何確保Controller和Broker的數據是一致的?2. 加載元數據的過程一般很慢。

這裏詳細說說第二點,即加載元數據的性能問題。整體來講,加載元數據是一個O(N)時間複雜度的過程,這裏的N就是你集羣中總的分區數。考慮到Controller從ZooKeeper加載以後還要推給其餘的Broker,那麼作這件事的總的時間複雜度就是O(N * M),其中M是集羣中Broker的數量。能夠想見,當M和N都很大時,在集羣中廣播元數據不是一個很快的過程。

Metadata as an Event Log

Okay,鑑於以上所提到的全部問題,當Kafka拋棄了ZooKeeper以後,社區應該如何解決它們呢?整體的思路就是Metadata as an Event Log + Controller quorum。咱們先說metadata as an event log。若是你讀過Jay Kreps的《I ❤️Logs》,你應該有感觸,整個Kafka的架構其實都是構建在Log上的。每一個topic的分區本質上就是一個Commit Log,但元數據信息的保存卻不是Log形式。在現有的架構設計中你基本上能夠認爲元數據的數據結構是KV形式的。這一次,社區採用了與消息相同的數據保存方式,即將元數據做爲Log的方式保存起來,以下表所示:

 

這樣作的好處在於每次元數據的變動都被當作是一條消息保存在Log中,而這個Log能夠被視做是一個普通的Kafka主題被備份到多臺Broker上。Log的一個好處在於它有清晰的先後順序關係,即每一個事件發生的時間是能夠排序的,配合以恰當的處理邏輯,咱們就能保證對元數據變動的處理是按照變動發生時間順序處理,不出現亂序的情形。另外Log機制還有一個好處是,在Broker間同步元數據時,咱們能夠選擇同步增量數據(delta),而非全量狀態。如今Kafka Broker間同步元數據都是全量狀態同步的。前面說過了,當集羣分區數很大時,這個開銷是很可觀的。若是咱們可以只同步增量狀態,勢必能極大地下降同步成本。最後一個好處是,咱們能夠很容易地量化元數據同步的進度,由於對Log的消費有位移數據,所以經過監控Log Lag就能算出當前同步的進度或是落後的進度。

採用Log機制後,其餘Broker像是一個普通的Consumer,從Controller拉取元數據變動消息或事件。因爲每一個Broker都是一個Consumer,因此它們會維護本身的消費位移,就像下面這張圖同樣:

 這種設計下,Controller所在的Broker必需要承擔起全部元數據topic的管理工做,包括建立topic、管理topic分區的leader以及爲每一個元數據變動建立相應的事件等。既然社區選擇和__consumer_offsets相似的處理方式,一個很天然的問題在於這個元數據topic的管理是否可以複用Kafka現有的副本機制?答案是:不可行。理由是現有的副本機制依賴於Controller,所以Kafka無法依靠現有的副本機制來實現Controller——按照咱們的俗語來講,這有點雞生蛋、蛋生雞的問題,屬於典型的循環依賴。爲了實現這個,Kafka須要一套leader選舉協議,而這套協議或算法是不依賴於Controller的,即它是一個自管理的集羣quorum(抱歉,在分佈式領域內,特別是分佈式共識算法領域中,針對quorum的恰當翻譯我目前還未找到,所以直接使用quorum原詞了)。最終社區決定採用Raft來實現這組quorum。這就是上面咱們提到的第二個解決思路:Controller quorum。

Controller Quorum

與藉助Controller幫忙選擇Leader不一樣,Raft是讓本身的節點自行選擇Leader並最終令全部節點達成共識——對選擇Controller而言,這是一個很好的特性。其實Kafka現有的備份機制與Raft已經很接近了,下表羅列了一下它們的異同:

 一眼掃過去,其實Kafka的備份機制和Raft很相似,好比Kafka中的offset其實就是Raft中的index,epoch對應於term。固然Raft中採用的半數機制來確保消息被提交以及Leader選舉,而Kafka設計了ISR機制來實現這兩點。整體來講,社區認爲只須要對備份機制作一些小改動就應該能夠很容易地切換到Raft-based算法。

下面這張圖展現Controller quorum可能更加直觀:

整個controller quorum相似於一個小的集羣。和ZooKeeper相似,這個quorum一般是3臺或5臺機器,不須要讓Kafka中的每一個Broker都自動稱爲這個quorum中的一個節點。該quorum裏面有一個Leader負責處理客戶端發來的讀寫請求,這個Leader就是Kafka中的active controller。根據ZooKeeper的Zab協議,leader處理全部的寫請求,而follower是能夠處理讀請求的。當寫請求發送給follower後,follower會將該請求轉發給leader處理。不過我猜Kafka應該不會這樣實現,它應該只會讓leader(即active controller)處理全部的讀寫請求,而客戶端(也就是其餘Broker)壓根就不會發送讀寫請求給follower。在這一點上,這種設計和現有的Kafka請求處理機制是一致的。

如今還須要解決一個問題,即Leader是怎麼被選出來的?既然是Raft-based,那麼採用的也是Raft算法中的Leader選舉策略。讓Raft選出的Leader稱爲active controller。網上有不少關於Raft選主的文章,這裏就不在贅述了,有興趣的能夠讀一讀Raft的論文:《In Search of an Understandable Consensus Algorithm(Extended Version)》。

這套Raft quorum的一個好處在於它自然提供了低延時的failover,所以leader的切換會很是的迅速和及時,由於理論上再也不有元數據加載的過程了,全部的元數據如今都同步保存follower節點的內存中,它已經有其餘Broker須要拉取的全部元數據信息了!更酷的是,它避免瞭如今機制中一旦Controller切換要全量拉取元數據的低效行爲,Broker無需從新拉取以前已經「消費」的元數據變動消息,它只須要重新Leader繼續「消費」便可。

另外一個好處在於:採用了這套機制後,Kafka能夠作元數據的緩存了(metadata caching):即Broker可以把元數據保存在磁盤上,同時就像剛纔說的,Broker只需讀取它關心的那部分數據便可。還有,和如今snapshot機制相似,若是一個Broker保存的元數據落後Controller太多或者是一個全新的Broker,Kafka甚至能夠像Raft那樣直接發送一個snapshot文件,快速令其追上進度。固然大多數狀況下,Broker只須要拉取delta增量數據便可。

Post KIP-500 Broker註冊

當前Broker啓動以後會向ZooKeeper註冊本身的信息,好比本身的主機名、端口、監聽協議等數據。移除ZooKeeper以後,Broker的註冊機制也要發生變化:Broker須要向active controller發送心跳來進行註冊。Controller收集心跳中包含的Broker數據構建整個Kafka集羣信息,以下圖所示:

 同時Controller也會對心跳進行響應,顯式地告知Broker它們是否被容許加入集羣——若是不容許,則可能須要被隔離(fenced)。固然controller本身也能夠對本身進行隔離。咱們針對前面提到的隔離場景討論下KIP-500是怎麼應對的。

Fencing

首先是普通Broker與集羣徹底隔離的場景,好比該Broker沒法與controller和其餘Broker進行通訊,但它依然能夠和客戶端程序交互。此時,fencing機制就很簡單了,直接讓controller令其下線便可。這和如今依靠ZooKeeper會話機制維持Broker判活的機制是如出一轍的,沒有太大改進。

第二種狀況是Broker間的通信中斷。此時消息沒法在leader、follower間進行備份。可是對於元數據而言,咱們不會看到數據不一致的情形,由於Broker依然能夠和controller通信,所以也不會有什麼問題。

第三種狀況是Broker與Controller的隔離。現有機制下這是個問題,但KIP-500以後,Controller僅僅將該Broker「踢出場」便可,不會形成元數據的不一致。

最後一種狀況是Broker與ZooKeeper的隔離, 既然ZooKeeper要被移除了,天然這也不是問題了。

部署

終於聊到KIP-500以後的Kafka運維了。下表總結了KIP-500先後的部署狀況對比:

很簡單,如今任什麼時候候部署和運維Kafka都要考慮對ZooKeeper的運維管理。在KIP-500以後咱們只須要關心Kafka便可。

Controller quorum共享模式

如前所述,controller改爲Raft quorum機制後,可能使用3或5臺機器構成一個小的quorum。那麼一個很天然的問題是,這些Broker機器還可否用做他用,是惟一用做controller quorum仍是和其餘Broker同樣正常處理。社區對此也作了解釋:兩種都支持!

若是你的Kafka集羣資源很緊張,你可使用共享controller模式(Shared Controller Mode),即充當controller quorum的Broker機器也能處理普通的客戶端請求;相反地,若是你的Kafka資源很充足,專屬controller模式(Separate Controller Mode)多是更適合的,即在controller quorum中的Broker機器排它地用做Controller的選舉之用,再也不對客戶端提供讀寫服務。這樣能夠實現更好的資源隔離,適用於大集羣。

Roadmap

最後說一下KIP-500的計劃。社區計劃分三步走:

第一步是移除客戶端對ZooKeeper的依賴——這一步基本上已經完成了,除了目前AdminClient還有少許的API依賴ZooKeeper以外,其餘客戶端應該說都不須要訪問ZooKeeper了;第二步是移除Broker端的ZooKeeper依賴:這主要包括移除Broker端須要訪問ZooKeeper的代碼,以及增長新的Broker端API,如前面所說的AlterISR等,最後是將對ZooKeeper的訪問所有集中在controller端;最後一步就是實現controller quorum,實現Raft-based的quorum負責controller的選舉。

至於Kafka升級,若是從現有的Kafka直接升級到KIP-500以後的Kafka會比較困難,所以社區打算引入一個名爲Bridge Release的中間過渡版本,以下圖所示:

這個Bridge版本的特色在於全部對ZooKeeper的訪問都集中到了controller端,Broker訪問ZooKeeper的其餘代碼都被移除了。 

總結

KIP-500應該說是最近幾年社區提出的最重磅的KIP改進了。它幾乎是顛覆了Kafka已有的使用模式,摒棄了以前重度依賴的Apache ZooKeeper。就我我的而言,我是很期待這個KIP,後續有最新消息我也會在一併同步出來。讓咱們靜觀其變吧~~~

相關文章
相關標籤/搜索