Controller 是從Kafka集羣中選取一個的broker,負責管理topic分區和副本的狀態的變化,以及執行重分配分區之類的管理任務。node
第一個啓動的broker會成爲一個controller,它會在Zookeeper上建立一個臨時節點(ephemeral):/controller。其餘後啓動的broker也嘗試去建立這樣一個臨時節點,但會報錯,此時這些broker會在該zookeeper的/controller節點上建立一個監控(Watch),這樣當該節點狀態發生變化(好比:被刪除)時,這些broker就會獲得通知。此時,這些broker就能夠在獲得通知時,繼續建立該節點。保證該集羣一直都有一個controller節點。git
當controller所在的broker節點宕機或斷開和Zookeeper的鏈接,它在Zookeeper上建立的臨時節點就會被自動刪除。其餘在該節點上都安裝了監控的broker節點都會獲得通知,此時,這些broker都會嘗試去建立這樣一個臨時的/controller節點,但它們當中只有一個broker(最早建立的那個)可以建立成功,其餘的broker會報錯:node already exists,接收到該錯誤的broker節點會再次在該臨時節點上安裝一個watch來監控該節點狀態的變化。每次當一個broker被選舉時,將會賦予一個更大的數字(經過zookeeper的條件遞增實現),這樣其餘節點就知道controller目前的數字。github
當一個broker宕機而不在當前Kafka集羣中時,controller將會獲得通知(經過監控zookeeper的路徑實現),如有些topic的主分區剛好在該broker上,此時controller將從新選擇這些主分區。controller將會檢查全部沒有leader的分區,並決定新的leader是誰(簡單的方法是:選擇該分區的下一個副本分區),並給全部的broker發送請求。算法
每一個分區的新leader指導,它將接收來自客戶端的生產者和消費者的請求。同時follower也指導,應該從這個新的leader開始複製消息。緩存
當一個新的broker節點加入集羣時,controller將會檢查,在該broker上是否存在分區副本。若存在,controller通知新的和存在的broker這個變化,該broker開始從leader處複製消息。網絡
下面將從如下幾個介紹controller的相關原理:函數
在KafkaServer.startup()中,KafkaController對象被構建,在啓動KafkaApis、replicaManager後,KafkaController.startup()
被調用。spa
startup()
函數很是簡單,這裏直接粘代碼:.net
除去日誌以及標識狀態的isRunning賦值,值得看的代碼就兩句。其中registerSessionExpirationListener()用於在zookeeper會話失效後重連時取消註冊在zookeeper上的各類Listener,而controllerElector.startup則啓動了選舉,這些都將發生在ZookeeperLeaderElector類中。線程
Kafka集羣中每一個Broker都會調用startup()函數,可是一個集羣只有一個Broker可以成爲Controller。那麼,誰將成爲這個controller呢?
KafkaController選舉是直接經過zookeeper實現的,就是在zookeeper建立臨時目錄/controller/並在目錄下存放當前brokerId。若是在zookeeper下建立路徑沒有拋出ZkNodeExistsException異常,則當前broker成功晉級爲Controller。除了調用elect外,controllerElector.startup還會在/controller/路徑上註冊Listener,監聽dataChange事件和dataDelete事件,當/controller/下數據發生變化時,表示Controller發生了變化;而由於/controller/下的數據爲臨時數據,當Controller發生failover時,數據會被刪除,觸發dataDelete事件,這時就須要從新選舉新一任Controller。
成爲KafkaController以後很重要的一件事,就是在zookeeper各個關鍵路徑上添加Listener,因此這裏頗有必要先總結一下跟controller相關的路徑([ ]表示其中的值是隨實際狀況變化的):
名詞解釋
成爲KafkaController之後,會執行什麼操做呢?
完成對各個zookeeper路徑的監聽後,zookeeper內容的變化驅動Controller進行各類操做,處理如新建topic,刪除topic,broker失效,broker恢復等事件。
前面startup()中registerSessionExpirationListener()會註冊會話監聽器,在zookeeper會話過時後又重連成功時調用onControllerResignation(),並從新執行選舉操做。 此外,當Controller會話失效時,會刪除/controller/路徑下建立的臨時數據。與此同時,其餘broker上的ZookeeperLeaderElector類中的LeaderChangeListener感知到數據刪除後會從新執行選舉。
onControllerResignation()是Controller轉變爲普通broker時執行的操做,就是將前面註冊的各個Listener取消註冊,再也不關注zookeeper變化
在 KafkaController 中
l 有兩個狀態機:分區狀態機和副本狀態機;
l 一個管理器:Channel 管理器,負責管理全部的 Broker 通訊;
l 相關緩存:Partition 信息、Topic 信息、broker id 信息等;
l 四種 leader 選舉機制:分別是用 leader offline、broker 掉線、partition reassign、最優 leader 選舉時觸發;
在 initializeControllerContext() 初始化 KafkaController 上下文信息的方法中,主要作了如下事情:
最優 leader 選舉:就是默認選擇 Replica 分配中第一個 replica 做爲 leader,爲何叫作最優 leader 選舉呢?由於 Kafka 在給每一個 Partition 分配副本時,它會保證分區的主副本會均勻分佈在全部的 broker 上,這樣的話只要保證第一個 replica 被選舉爲 leader,讀寫流量就會均勻分佈在全部的 Broker 上,固然這是有一個前提的,那就是每一個 Partition 的讀寫流量相差很少,可是在實際的生產環境,這是不太可能的,因此通常狀況下,大集羣是不建議開自動 leader 均衡的,能夠經過額外的算法計算、手動去觸發最優 leader 選舉。
initializeControllerContext()
方法會經過 startChannelManager()
方法初始化 ControllerChannelManager 對象,以下所示:
ControllerChannelManager在初始化時,會爲集羣中的每一個節點初始化一個 ControllerBrokerStateInfo 對象,該對象包含四個部分:
其具體實現以下所示:
清楚了上面的邏輯,再來看 KafkaController 部分是如何向 Broker 發送請求的
KafkaController 其實是調用的 ControllerChannelManager 的 sendRequest()
方法向 Broker 發送請求信息,其實現以下所示:
它實際上只是把對應的請求添加到該 Broker 對應的 MessageQueue 中,並無真正的去發送請求,請求的的發送是在 每臺 Broker 對應的 RequestSendThread 中處理的。
四種 leader 選舉實現類及對應觸發條件以下所示
實現 |
觸發條件 |
OfflinePartitionLeaderSelector |
leader 掉線時觸發 |
ReassignedPartitionLeaderSelector |
分區的副本從新分配數據同步完成後觸發的 |
PreferredReplicaPartitionLeaderSelector |
最優 leader 選舉,手動觸發或自動 leader 均衡調度時觸發 |
ControlledShutdownLeaderSelector |
broker 發送 ShutDown 請求主動關閉服務時觸發 |
OfflinePartitionLeaderSelector
選舉的邏輯是:
一旦 leader 被成功註冊到 zk 中,它將會更新到 KafkaController 緩存中的 allLeaders 中。
ReassignedPartitionLeaderSelector
ReassignedPartitionLeaderSelector 是在 Partition 副本遷移後,副本同步完成(RAR 都處在 isr 中,RAR 指的是該 Partition 新分配的副本)後觸發的,其 leader 選舉邏輯以下:
PreferredReplicaPartitionLeaderSelector
PreferredReplicaPartitionLeaderSelector 是最優 leader 選舉,選擇 AR(assign replica)中的第一個副本做爲 leader,前提是該 replica 在是存活的、而且在 isr 中,不然會拋出 StateChangeFailedException 的異常。
ControlledShutdownLeaderSelector
ControlledShutdownLeaderSelector 是在處理 broker 下線時調用的 leader 選舉方法,它會選舉 isr 中第一個沒有正在關閉的 replica 做爲 leader,不然拋出 StateChangeFailedException 異常。
http://www.javashuo.com/article/p-tzmaihsg-gk.html