本文主要參考社區0.11版本Controller的重設計方案,試圖給你們梳理一下Kafka controller這個組件在設計上的一些重要思考。衆所周知,Kafka中有個關鍵組件叫controller,負責管理和協調Kafka集羣。網上關於controller的源碼分析也有不少,本文就再也不大段地列出代碼重複作這件事情了。實際上,對於controller的代碼我一直以爲寫的很是混亂,各類調用關係十分複雜,想要完整地理解它的工做原理確實不易。好在咱們就是普通的使用者,大體瞭解controller的工做原理便可。下面我就帶各位簡要了解一下當前Kafka controller的原理架構以及社區爲何要在大改controller的設計。node
「負責管理和協調Kafka集羣」的說法實在沒有什麼養分,上點乾貨吧——具體來講Controller目前主要提供多達10種的Kafka服務功能的實現,它們分別是:緩存
當前controller啓動時會爲集羣中全部broker建立一個各自的鏈接。這麼說吧,假設你的集羣中有100臺broker,那麼controller啓動時會建立100個Socket鏈接(也包括與它本身的鏈接!)。當前新版本的Kafka統一使用了NetworkClient類來建模底層的網絡鏈接(有興趣研究源碼的能夠去看下這個類,它主要依賴於Java NIO的Selector)。Controller會爲每一個鏈接都建立一個對應的請求發送線程,專門負責給對應的broker發送請求。也就是說,若是仍是那100臺broker,那麼controller啓動時還會建立100個RequestSendThread線程。當前的設計中Controller只能給broker發送三類請求,它們是:網絡
Controller一般都是發送請求給broker的,只有上面談到的controller 10大功能中的ControlledShutdownRequest請求是例外:這個請求是待關閉的broker經過RPC發送給controller的,即它的方向是反的。另外這個請求還有一個特別之處就是其餘全部功能或是請求都是經過Zookeeper間接與controller交互的,只有它是直接與controller進行交互的。多線程
構成controller的組件太多了,多到我已經不想用文字表達了,直接上圖吧:架構
其中比較重要的組件包括:異步
緩存內容十分豐富,這也是controller能夠協調管理整個cluster的基礎。async
不謙虛地說,我混跡社區也有些日子了。在裏面碰到過不少關於controller的bug。社區對於這些bug有個很共性的特色,那就是沒有什麼人願意(敢去)改這部分代碼,由於它實在是太複雜了。具體的問題包括:工具
編寫正確的多線程程序一直是Java開發者的痛點。在Controller的實現類KafkaController中建立了不少線程,好比以前提到的RequestSendThread線程,另外ZkClient也會建立單獨的線程來處理zookeeper回調,這還不算TopicDeletionManager建立的線程和其餘IO線程等。幾乎全部這些線程都須要訪問ControllerContext(RequestSendThread只操做它們專屬的請求隊列,不會訪問ControllerContext),所以必要的多線程同步機制是必定須要的。當前是使用controllerLock鎖來實現的,所以能夠說沒有並行度可言。源碼分析
看過源代碼的人相信對這一點深有體會。KafkaController、PartitionStateMachine和ReplicaStateMachine每一個都是500+行的大類且彼此混調的現象明顯,好比KafkaController的stopOldReplicasOfReassignedPartition方法調用ReplicaStateMachine的handleStateChanges方法,然後者又會調用KafkaController的remoteReplicaFromIsr方法。相似的狀況還發生在KafkaController和ControllerChannelManager之間。性能
當前broker對入站請求類型不作任何優先級處理,不管是PRODUCE請求、FETCH請求仍是Controller類的請求。這就可能形成一個問題:即clients發送的數據類請求積壓致使controller推遲了管理類請求的處理。設想這樣的場景,假設controller向broker廣播了leader發生變動。因而新leader開始接收clients端請求,而同時老leader所在的broker因爲出現了數據類請求的積壓使得它一直忙於處理這些請求而沒法處理controller發來的LeaderAndIsrRequest請求,所以這是就會出現「雙主」的狀況——也就是所謂的腦裂。此時假若client發送的一個PRODUCE請求未指定acks=-1,那麼由於日誌水位截斷的緣故這個請求包含的消息就可能「丟失」了。如今社區中關於controller丟失數據的bug大可能是由於這個緣由形成的。
當前controller操做Zookeeper是經過ZkClient來完成的。ZkClient目前是同步寫入Zookeeper,而同步一般意味着性能不高。更爲嚴重的是,controller是一個分區一個分區進行寫入的,對於分區數不少的集羣來講,這無疑是個巨大的性能瓶頸。若是用戶仔細查看源代碼,能夠發現PartitionStateMachine的electLeaderForPartition就是一個分區一個分區地選舉的。
Controller當前發送請求都是按照分區級別發送的,即一個分區一個分區地發送。沒有任何batch或並行可言,效率很低。
這裏的版本號相似於new consumer的generation,總之是要有一種機制告訴controller broker的版本信息。由於有些狀況下broker會處理本已過時或失效的請求致使broker狀態不一致。舉個例子,若是一個broker正常關閉過程當中「宕機」了,那麼重啓以後這個broker就有可能處理以前controller發送過來的StopReplicaRequest,致使某些副本被置成offline從而沒法使用。而這確定不是咱們但願看到的結果,對吧?
Contoller目前是使用了ZkClient這個開源工具,它能夠自動重建會話並使用特有的線程順序處理全部的Zookeeper監聽消息。由於是順序處理,它就有可能沒法及時響應最新的狀態變動致使Kafka集羣狀態的不一致。
和new consumer相似,controller摒棄多線程的模型,採用單線程的事件隊列模型。這樣簡化了設計同時也避免了複雜的同步機制。各位在最新的trunk分支上已然能夠看到這種變化:增長了ControllerEventManager類以及對應的ControllerEventThread線程類專門負責處理ControllerEvent。目前總共有9種controller event,它們分別是:
咱們基本上能夠從名字就能判斷出它們分別表明了什麼事件。
將全部同步操做Zookeeper的地方都改爲異步調用+回調的方式。實際上Apache Zookeeper客戶端執行請求的方式有三種:同步、異步和batch。一般以batch性能最好,但Kafka社區目前仍是傾向於用async替換sync。畢竟實現起來相對簡單同時性能上也能獲得很多提高。
可能摒棄以前狀態機的方式,採用和GroupCoordinator相似的方式,讓controller保存全部的狀態而且負責狀態的流轉以及狀態流轉過程當中的邏輯。固然,具體的實現還要再結合0.11最終代碼才能肯定。
對管理類請求和數據類請求區分優先級。好比使用優先級隊列替換現有的BlockingQueue——社區應該已經實現了這個功能,開發了一個叫PrioritizationAwareBlockingQueue的類來作這件事情,後續你們能夠看下這個類的源代碼
爲broker設定版本號(generation id)。若是controller發送過來的請求中包含的generation與broker本身的generation不匹配, 那麼broker會拒絕該請求。
ZkClient是同步順序處理ZK事件的,而原生Zookeeper client支持async方式。另外使用原生API還可以在接收到狀態變動通知時便立刻開始處理,而ZkClient的特定線程則必需要在隊列中順序處理到這條變動消息時才能處理。
以上就是關於Kafka controller的一些討論,包括了它當前的組件構成、設計問題以及對應的改進方案。有不少地方可能理解的還不是透徹,期待着在Kafka 0.11正式版本中能夠看到全新的controller組件。