Controller機制

Controller機制

1 前言

Controller 是從Kafka集羣中選取一個的broker,負責管理topic分區和副本的狀態的變化,以及執行重分配分區之類的管理任務。node

 

 

第一個啓動的broker會成爲一個controller,它會在Zookeeper上建立一個臨時節點(ephemeral):/controller。其餘後啓動的broker也嘗試去建立這樣一個臨時節點,但會報錯,此時這些broker會在該zookeeper的/controller節點上建立一個監控(Watch),這樣當該節點狀態發生變化(好比:被刪除)時,這些broker就會獲得通知。此時,這些broker就能夠在獲得通知時,繼續建立該節點。保證該集羣一直都有一個controller節點。git

 

當controller所在的broker節點宕機或斷開和Zookeeper的鏈接,它在Zookeeper上建立的臨時節點就會被自動刪除。其餘在該節點上都安裝了監控的broker節點都會獲得通知,此時,這些broker都會嘗試去建立這樣一個臨時的/controller節點,但它們當中只有一個broker(最早建立的那個)可以建立成功,其餘的broker會報錯:node already exists,接收到該錯誤的broker節點會再次在該臨時節點上安裝一個watch來監控該節點狀態的變化。每次當一個broker被選舉時,將會賦予一個更大的數字(經過zookeeper的條件遞增實現),這樣其餘節點就知道controller目前的數字。github

 

當一個broker宕機而不在當前Kafka集羣中時,controller將會獲得通知(經過監控zookeeper的路徑實現),如有些topic的主分區剛好在該broker上,此時controller將從新選擇這些主分區。controller將會檢查全部沒有leader的分區,並決定新的leader是誰(簡單的方法是:選擇該分區的下一個副本分區),並給全部的broker發送請求。算法

 

每一個分區的新leader指導,它將接收來自客戶端的生產者和消費者的請求。同時follower也指導,應該從這個新的leader開始複製消息。緩存

當一個新的broker節點加入集羣時,controller將會檢查,在該broker上是否存在分區副本。若存在,controller通知新的和存在的broker這個變化,該broker開始從leader處複製消息。網絡

下面將從如下幾個介紹controller的相關原理:函數

2 controller啓動

2.1選舉controller

在KafkaServer.startup()中,KafkaController對象被構建,在啓動KafkaApis、replicaManager後,KafkaController.startup()被調用。spa

 

 

startup()函數很是簡單,這裏直接粘代碼:.net

 

 

除去日誌以及標識狀態的isRunning賦值,值得看的代碼就兩句。其中registerSessionExpirationListener()用於在zookeeper會話失效後重連時取消註冊在zookeeper上的各類Listener,而controllerElector.startup則啓動了選舉,這些都將發生在ZookeeperLeaderElector類中。線程

 

 

Kafka集羣中每一個Broker都會調用startup()函數,可是一個集羣只有一個Broker可以成爲Controller。那麼,誰將成爲這個controller呢? 

KafkaController選舉是直接經過zookeeper實現的,就是在zookeeper建立臨時目錄/controller/並在目錄下存放當前brokerId。若是在zookeeper下建立路徑沒有拋出ZkNodeExistsException異常,則當前broker成功晉級爲Controller。除了調用elect外,controllerElector.startup還會在/controller/路徑上註冊Listener,監聽dataChange事件和dataDelete事件,當/controller/下數據發生變化時,表示Controller發生了變化;而由於/controller/下的數據爲臨時數據,當Controller發生failover時,數據會被刪除,觸發dataDelete事件,這時就須要從新選舉新一任Controller。

 

 

2.1 註冊listener

 

 

成爲KafkaController以後很重要的一件事,就是在zookeeper各個關鍵路徑上添加Listener,因此這裏頗有必要先總結一下跟controller相關的路徑([ ]表示其中的值是隨實際狀況變化的):

  • l  /controller/{「brokerid」:」1」}: 決定誰纔是這一屆Controller,路徑下存放當前Controller的brokerId,這些信息以臨時數據的形式存放,在會話失效時會被刪除。類LeaderChangeListener監聽該路徑下的dataChange和dataDelete事件。
  • l  /brokers/topics:子目錄爲全部topic列表。類TopicChangeListener監聽子目錄列表變化,若是有新增topic,則調用onNewTopicCreation建立新的topic。
  • l  /brokers/topics/[topic]/:存放的是topic下各個分區的AR,目錄下存放的格式爲{「partitions」:{「partitionId1」:[broker1,broker2], …}}。類AddPartitionsListener監聽路徑下的數據變化,在有新增partition時調用Controller.onNewPartitionCreate,即建立新的partition。
  • l  /brokers/topics/[topic]/partitions/[partitionId]/state/:存放的是各個分區的leaderAndIsr信息,即各個分區當前的leaderId,以及ISR。類ReassignedPartitionsIsrChangeListener監聽該路徑下的數據變化,在從新分配replica到partition時,須要等待新的replica追遇上leader後才能執行後續操做。
  • l  /brokers/ids/[brokerId]/brokerInfoString:存放broker信息,brokerInfoString包括broker的IP,端口等信息;BrokerChangeListener監聽/brokers/ids/下子目錄變化,從而通知Controller broker的上下線消息。brokerInfoString是Controller判斷的broker是否活着的條件之一,controllerContext中的liveBrokers須要相應路徑下可以獲取到brokerInfo。
  • l  /admin/reassign_partitions:指導從新分配AR的路徑,經過命令修改AR時會寫入到這個路徑下。類PartitionsReassignedListener監聽該路徑下的內容變化,調用initiateReassignReplicasForTopicPartition,執行從新分配AR操做。
  • l  /admin/preferred_replica_election:分區須要從新選舉第一個replica做爲leader,即所謂的preferred replica。類PreferredReplicaElectionListener監聽該路徑,並對路徑下的partitions執行從新選舉preferred replica做爲leader。

名詞解釋

  • l  AR 當前已分配的副本列表
  • l  RAR 重分配過的副本列表
  • l  ORA 重分配以前的副本列表
  • l  分區Leader 給定分區負責客戶端讀寫的結點
  • l  ISR 「in-sync」 replicas,可以與Leader保持同步的副本集合(Leader也在ISR中)

 

成爲KafkaController之後,會執行什麼操做呢?

  1. 升級Controller Epoch,並將新的epoch寫入到zookeeper中;新的epoch標識着下一個時代的leader,向其餘broker發送命令時會校驗epoch;
  2. 監聽zookeeper路徑/admin/reassign_partitions;
  3. 監聽zookeeper路徑/admin/preferred_replica_election;
  4. 註冊partition狀態機中的監聽器,監聽路徑/brokers/topics的子目錄變化,隨時準備建立topic;
  5. 註冊replica狀態機中的監聽器,監聽路徑/brokers/ids/的子目錄,以便在新的broker加入時可以感知到;
  6. 初始化ControllerContext,主要是從zookeeper中讀取數據初始化context中的變量,諸如 liveBrokers,allTopics,AR,LeadershipInfo等;
  7. 初始化ReplicaStateMachine,將全部在活躍broker上的replica的狀態變爲OnlineReplica;
  8. 初始化PartitionStateMachine,將全部leader在活躍broker上的partition的狀態設置爲Onlinepartition;其餘的partition狀態爲OfflinePartition。Partition是否爲Online的標識就是leader是否活着;以後還會觸發OfflinePartition 和 NewPartition向OnlinePartition轉變,由於OfflinePartition和NewPartition多是選舉leader不成功,因此沒有成爲OnlinePartition,在環境變化後須要從新觸發;
  9. 在全部的topic的zookeeper路徑/brokers/topics/[topic]/上添加AddPartitionsListener,監聽partition變化;
  10. KafkaController 啓動後,觸發一次最優 leader 選舉操做,若是須要的狀況下;
  11. KafkaController 啓動後,若是開啓了自動 leader 均衡,啓動自動 leader 均衡線程,它會根據配置的信息按期運行。

 

完成對各個zookeeper路徑的監聽後,zookeeper內容的變化驅動Controller進行各類操做,處理如新建topic,刪除topic,broker失效,broker恢復等事件。

2.3 Controller Failover

前面startup()中registerSessionExpirationListener()會註冊會話監聽器,在zookeeper會話過時後又重連成功時調用onControllerResignation(),並從新執行選舉操做。 此外,當Controller會話失效時,會刪除/controller/路徑下建立的臨時數據。與此同時,其餘broker上的ZookeeperLeaderElector類中的LeaderChangeListener感知到數據刪除後會從新執行選舉。

onControllerResignation()是Controller轉變爲普通broker時執行的操做,就是將前面註冊的各個Listener取消註冊,再也不關注zookeeper變化

2.4 initializeControllerContext 初始化 Controller 上下文信息

在 KafkaController 中

l  有兩個狀態機:分區狀態機和副本狀態機;

l  一個管理器:Channel 管理器,負責管理全部的 Broker 通訊;

l  相關緩存:Partition 信息、Topic 信息、broker id 信息等;

l  四種 leader 選舉機制:分別是用 leader offline、broker 掉線、partition reassign、最優 leader 選舉時觸發;

 

 

 

在 initializeControllerContext() 初始化 KafkaController 上下文信息的方法中,主要作了如下事情:

  • l  從 zk 獲取全部 alive broker 列表,記錄到 liveBrokers;
  • l  從 zk 獲取全部的 topic 列表,記錄到 allTopic 中;
  • l  從 zk 獲取全部 Partition 的 replica 信息,更新到 partitionReplicaAssignment 中;
  • l  從 zk 獲取全部 Partition 的 LeaderAndIsr 信息,更新到 partitionLeadershipInfo 中;
  • l  調用 startChannelManager() 啓動 Controller 的 Channel Manager;
  • l  經過 initializePreferredReplicaElection() 初始化須要最優 leader 選舉的 Partition 列表,記錄到 partitionsUndergoingPreferredReplicaElection 中;
  • l  經過 initializePartitionReassignment() 方法初始化須要進行副本遷移的 Partition 列表,記錄到 partitionsBeingReassigned 中;
  • l  經過 initializeTopicDeletion() 方法初始化須要刪除的 topic 列表及 TopicDeletionManager 對象;

最優 leader 選舉:就是默認選擇 Replica 分配中第一個 replica 做爲 leader,爲何叫作最優 leader 選舉呢?由於 Kafka 在給每一個 Partition 分配副本時,它會保證分區的主副本會均勻分佈在全部的 broker 上,這樣的話只要保證第一個 replica 被選舉爲 leader,讀寫流量就會均勻分佈在全部的 Broker 上,固然這是有一個前提的,那就是每一個 Partition 的讀寫流量相差很少,可是在實際的生產環境,這是不太可能的,因此通常狀況下,大集羣是不建議開自動 leader 均衡的,能夠經過額外的算法計算、手動去觸發最優 leader 選舉。

2.5 Controller Channel Manager

initializeControllerContext() 方法會經過 startChannelManager() 方法初始化 ControllerChannelManager 對象,以下所示:

 

 

ControllerChannelManager在初始化時,會爲集羣中的每一個節點初始化一個 ControllerBrokerStateInfo 對象,該對象包含四個部分:

  • l  NetworkClient:網絡鏈接對象;
  • l  Node:節點信息;
  • l  BlockingQueue:請求隊列;
  • l  RequestSendThread:請求的發送線程。

其具體實現以下所示:

 

 

清楚了上面的邏輯,再來看 KafkaController 部分是如何向 Broker 發送請求的

 

 

KafkaController 其實是調用的 ControllerChannelManager 的 sendRequest() 方法向 Broker 發送請求信息,其實現以下所示:

 

 

它實際上只是把對應的請求添加到該 Broker 對應的 MessageQueue 中,並無真正的去發送請求,請求的的發送是在 每臺 Broker 對應的 RequestSendThread 中處理的。

2.6 Controller 原生的四種 leader 選舉機制

四種 leader 選舉實現類及對應觸發條件以下所示

實現

觸發條件

OfflinePartitionLeaderSelector

leader 掉線時觸發

ReassignedPartitionLeaderSelector

分區的副本從新分配數據同步完成後觸發的

PreferredReplicaPartitionLeaderSelector

最優 leader 選舉,手動觸發或自動 leader 均衡調度時觸發

ControlledShutdownLeaderSelector

broker 發送 ShutDown 請求主動關閉服務時觸發

OfflinePartitionLeaderSelector

選舉的邏輯是:

  • l  若是 isr 中至少有一個副本是存活的,那麼從該 Partition 存活的 isr 中選舉第一個副本做爲新的 leader,存活的 isr 做爲新的 isr;
  • l  不然,若是髒選舉(unclear elect)是禁止的,那麼就拋出 NoReplicaOnlineException 異常;
  • l  不然,即容許髒選舉的狀況下,從存活的、所分配的副本(不在 isr 中的副本)中選出一個副本做爲新的 leader 和新的 isr 集合;
  • l  不然,便是 Partition 分配的副本沒有存活的,拋出 NoReplicaOnlineException 異常;

一旦 leader 被成功註冊到 zk 中,它將會更新到 KafkaController 緩存中的 allLeaders 中。

 

 

ReassignedPartitionLeaderSelector

ReassignedPartitionLeaderSelector 是在 Partition 副本遷移後,副本同步完成(RAR 都處在 isr 中,RAR 指的是該 Partition 新分配的副本)後觸發的,其 leader 選舉邏輯以下:

  • l  leader 選擇存活的 RAR 中的第一個副本,此時 RAR 都在 isr 中了;
  • l  new isr 是全部存活的 RAR 副本列表;

 

 

PreferredReplicaPartitionLeaderSelector

PreferredReplicaPartitionLeaderSelector 是最優 leader 選舉,選擇 AR(assign replica)中的第一個副本做爲 leader,前提是該 replica 在是存活的、而且在 isr 中,不然會拋出 StateChangeFailedException 的異常。

 

 

 

ControlledShutdownLeaderSelector

ControlledShutdownLeaderSelector 是在處理 broker 下線時調用的 leader 選舉方法,它會選舉 isr 中第一個沒有正在關閉的 replica 做爲 leader,不然拋出 StateChangeFailedException 異常。

 

 

參考資料:

http://www.javashuo.com/article/p-tzmaihsg-gk.html

https://blog.csdn.net/c395318621/article/details/52463854

https://github.com/wangzzu/awesome/issues/7

相關文章
相關標籤/搜索