【原創】kafka controller源代碼分析(一)

Kafka集羣中的一個broker會被做爲controller負責管理分區和副本的狀態以及執行相似於重分配分區之類的管理任務。若是當前的controller失敗了,會從剩下的broker中選出新的controller。node

1、PartitionLeaderSelector.scala
顧名思義就是爲分區選舉出leader broker,該trait只定義了一個方法selectLeader,接收一個TopicAndPartition對象和一個LeaderAndIsr對象。TopicAndPartition表示要選leader的分區,而第二個參數表示zookeeper中保存的該分區的當前leader和ISR記錄。該方法會返回一個元組包括了推舉出來的leader和ISR以及須要接收LeaderAndISr請求的一組副本。
    該文件中還定義了五種leader選舉器,在詳細介紹每種選舉器以前,咱們先說下分區都有哪些狀態。Kafka定義了4中分區:
  • NonExistentPartition —— 這個狀態表示該分區要麼沒有被建立過或曾經被建立過但後面被刪除了
  • NewPartition —— 分區建立以後就處於NewPartition狀態。在這個狀態中,分區應該已經分配了副本,可是尚未選舉出leader和ISR
  • OnlinePartition —— 一旦分區的leader被推選出來,它就處於OnlinePartition狀態
  • OfflinePartition —— 若是leader選舉出來後,leader broker宕機了,那麼該分區就處於OfflinePartition狀態。
既然有四種狀態就須要定義合法的狀態轉換:
NonExistentPartition -> NewPartition
1. 首先將第一個可用的副本broker做爲leader broker並把全部可用的副本對象都裝入ISR,而後寫leader和ISR信息到zookeeper中保存
2. 對於這個分區而言,發送LeaderAndIsr請求到每一個可用的副本broker,以及UpdateMetadata請求到每一個可用的broker上
OnlinePartition, OfflinePartition -> OnlinePartition
1. 爲該分區選取新的leader和ISR以及接收LeaderAndIsr請求的一組副本,而後寫入leader和ISR信息到zookeeper中保存。
  • 若是是OfflinePartitionLeaderSelector的話,新的leader就是一個可用的副本(該部分最好不在ISR中),而新的ISR就是之前的ISR或者是剛剛選出來的leader,而接收LeaderAndIsr請求的副本集合就是當前可用的副本集合;
  • 若是是ReassignedPartitionLeaderSelector的話,新的leader就是一個可用的已分配的副本,新的ISR就是當前的ISR,而接收請求的副本集合就是重分配的副本集合;
  • 若是是PreferredReplicaPartitionLeaderSelector的話,新的leader就是第一個分配的副本,新的ISR就是當前的ISR,接收請求的副本就是已分配的副本集合;
  • 若是是ControlledShutdownLeaderSelector的話,新的leader就是ISR中沒有被關閉的副本,新的ISR就是去除關閉狀態副本的ISR,而接收副本就是當前可用的已分配副本
2. 對於這個分區而言,發送LeaderAndIsr請求給每一個接收請求的副本而且發送UpdateMetadata請求給每一個可用的broker
NewPartition, OnlinePartition -> OfflinePartition
標記分區狀態爲離線(offline),僅此而已
OfflinePartition -> NonExistentPartition
僅僅是標記分區狀態爲NonExistentPartition便可
 
下面逐一分析各個leader選舉器類,先從NoOpLeaderSelector開始:
NoOpLeaderSelector類 —— 本質上什麼都不作,只是返回當前的leader和ISR以及給定分區當前的AR(assigned replicas)
OfflinePartitionLeaderSelector類 —— 若是ISR中至少有一個可用的broker,則從ISR中選取一個broker做爲新的leader,而可用的ISR就是新的ISR。若是沒有可用的broker且沒有啓用unclean leader選取,那麼就拋出異常NoReplicaOnlineException。不然就從AR中選出一個可用的broker做爲新的leader和ISR。但若是連AR中都沒有可用的broker,拋出異常。最後將可用的AR做爲接收LeaderAndIsr請求的副本集合。一旦成功選舉出leader以後保存到zookeeper中並更新緩存信息。
ReassignedPartitionLeaderSelector類 —— 從可用的ISR中選取第一個做爲leader,把當前的ISR做爲新的ISR,將重分配的副本集合做爲接收LeaderAndIsr請求的副本集合。
PreferredReplicaPartitionLeaderSelector類 —— 若是AR中的第一個副本就是當前leader的話,拋出異常,不然就選舉該副本爲leader,把當前ISR當作新的ISR,令AR做爲接收LeaderAndIsr請求的副本集合。
ControlledShutdownLeaderSelector類 —— 將ISR中那些處於關閉狀態的副本去除掉做爲新的ISR,而後選取第一個副本做爲leader,而後令當前AR做爲接收LeaderAndIsr請求的副本。
2、PartitionStateMachine.scala
這個就是分區的狀態機。首先定義了一個trait封裝了定義好的四種分區狀態:NewPartition、OnlinePartition、OfflinePartition和NonExistentPartition。而PartitionStateMachine類就是分區的狀態機類。在具體展開該類的字段方法以前,先說一下它內部定義的三個嵌套類,用於動態監聽不一樣的分區操做:
1. TopicChangeListener監聽類
這個類繼承了Zookeeper的IZKChildListener接口,後者是znode子節點事件監聽接口,當ZKClient接收到某個path節點變動或子節點變動事件時就會觸發該listener。而TopicChangeListener類負責監聽分區的全部可能的狀態轉換。具體實現也很簡單就是實現接口的handleChildChange方法,具體邏輯以下:
  • 獲取給定zk路徑下全部子節點的集合,並與當前controller中保存的topic集合比較,找出新增topic集合和被刪除的topic集合
  • 更新controller中保存的當前topic集合
  • 從zk的/brokers/topics/[topic]路徑下找出那些新增topic對應的分區副本分配記錄信息
  • 更新controller中保存的topic分區副本分配記錄,去掉那些被刪除topic的記錄,並加入上一步中獲取到的那些新增topic的分配記錄
  • 若是新增了topic,那麼調用onNewTopicCreation方法爲新增topic註冊分區變動監聽器並設置分區狀態,最後發送元數據更新請求通知各個broker
2. DeleteTopicsListener監聽類
這個監聽器負責監聽topic的刪除,主要包括1. 將要刪除的topic加入到待刪除topic緩存中——前提是這個topic必須存在;2. 若是存在要刪除的topic,那麼通知刪除topic線程。它也實現了handleChildChange方法,這個方法會在topic被刪除的時候被調用。具體邏輯以下:
  • 獲取待刪除topic集合(方法參數傳過來的),與controller中保存的topic集合比較,找到那些不存在的topic
  • 若是確實存在不存在的topic集合,那麼刪除zk中/admin/delete_topics下對應的子節點
  • 從待刪除的topic集合中去掉那些不存在的topic
  • 遍歷更新後的待刪除topic集合,判斷每一個topic當前是否正處於其餘狀態變動過程當中,好比PreferredLeaderSelector或ReassignedLeaderSelector。若是是的話調用DeleteTopicManager的markTopicIneligibleForDelete方法標記該topic爲暫時不能被刪除狀態
  • 將全部帶刪除的topic都加入到controller的topic刪除列表中等待專有線程對其執行刪除操做
3. AddPartitionsListener監聽類
用於監聽增長分區事件,也實現了handleDataChange方法,具體邏輯以下:
  • 獲取zookeeper中新增分區topic對應的分區副本分配記錄,與controller中保存的分區副本記錄相比較,找出新增的分區記錄
  • 若是這些新增分區所屬的topic當前正在執行刪除操做,那麼直接記錄一個日誌錯誤返回,即跳過增長分區的操做,不然調用controller的onNewPartitionCreation方法來建立這些分區
okay,說完這些監聽類以後咱們能夠梳理一下PartitionStateMachine類的字段和方法。先說字段:
1. controllerContext —— 就是KafkaController類的一個實例,封裝了不少controller的方法
2. controllerId —— controller ID,由配置文件中的broker.id屬性指定
3. zkClient —— 一個ZooKeeper的客戶端,用於與zookeeper服務器交互
4. partitionState —— 保存的分區的狀態信息
5. brokerRequestBatch —— 主要用於批量發送請求給Broker
6. hasStarted —— 主要用於標識該狀態機是否開啓
7. noOpPartitionLeaderSelector —— 一個默認的leader選舉類,主要被當作默認值使用,本質上其實什麼都不作
8. topicChangeListener/deleteTopicListener/addPartitionListener —— 三個監聽類實例,用於監聽狀態變動、刪除topic和增長新分區事件
9. stateChangeLogger —— 一個日誌類
下面說說具體的方法:
1. initializePartitionState —— 在分區狀態機啓動時候會被調用,用於設置Zookeeper中全部現存分區的初始狀態。具體邏輯是:首先獲取zk中已有分區記錄,遍歷每一條分區記錄,若是該分區沒有leader和ISR的話則置於NewPartition狀態,不然檢查一下leader broker是否可用(即在可用broker列表中),若是可用的話將分區狀態設置爲OnlinePartition不然設置爲OfflinePartition。
2. assertValidPreviousStates —— 在分區狀態轉換開始前驗證一下轉換前的狀態是否支持進行這種轉換。被容許的轉換以前已經說過了,再也不贅述。
3. assignReplicasToPartitions —— 在執行NonExistentPartition到NewPartition的轉換時會調用該方法用來更新controller的保存的分區副本分配緩存信息。具體方法就是先從zookeeper中讀出該分區的AR記錄而後加入到controller保存的緩存中。
4. initializeLeaderAndIsrForPartition —— 在執行NewPartition到OnlinePartition轉換時會調用該方法。若是一個分區處於NewPartition狀態時,它並無選舉出leader和ISR。在轉換到OnlinePartition狀態後,會在Zookeeper上建立對應的leader和ISR記錄。一旦處於OnlinePartition狀態後分區永遠不能退回到NewPartition狀態,而只能是OfflinePartition狀態。具體邏輯以下:
  • 獲取目標分區對應的副本分配記錄,並從中找出可用的已分配副本。若是當前沒有可用的副本那麼報錯表示該狀態轉換失敗;不然選舉出第一個副本broker做爲leader,把當前可用的副本集合做爲ISR
  • 一旦肯定了leader和ISR以後,在zookeeper上的/brokers/topics/[topic]/partitions/[partitionId]/state節點下建立出對應的leader和ISR路徑
  • 更新controller的分區leader緩存信息,而且把LeaderAndIsr請求加入到brokerRequestBatch中
5. handleStateChange —— 這個方法是分區狀態機的核心方法。該方法主要確保全部的狀態轉換都是合法的。具體邏輯以下:
  • 若是分區狀態機自己沒有啓動,直接拋出異常退出
  • 查看目標分區在狀態中的當前狀態,若是沒有任何記錄設置當前狀態爲NonExistentPartition
  • 若是要轉換到NewPartition狀態,當前必須處於NonExistentPartition狀態,而後更新controller中的分區副本分配緩存信息,並將分區狀態設置爲NewPartition
  • 若是要轉換到OnlinePartition狀態,當前必須處於NewPartition、OnlinePartition或OfflinePartition中的一種。若是當前狀態是NewPartition,首先爲該分區選舉出leader和ISR並寫入zookeeper中保存;若是是OnlinePartition或OfflinePartition,則使用electLeaderForPartition方法(後面會說到)從新爲分區選舉leader,最後設置分區狀態爲OnlinePartition
  • 若是要轉換到OfflinePartition狀態,當前必須處於NewPartition、OnlinePartition或OfflinePartition中的一種。當分區的leader不存在或不可用時會執行這樣的轉換。該轉換也僅僅是變動分區狀態爲OfflinePartition
  • 若是要轉換到NonExistentPartition狀態,當前必須處於OfflinePartition,一樣地,轉換操做也僅僅是設置目標分區狀態爲NonExistentPartition
6. getLeaderIsrAndEpochOrThrowException —— 從Zookeeper中獲取某個分區的leader和ISR信息,若是不存在則拋出異常。
7. electLeaderForPartition —— 爲那些leader已不可用的目標離線分區選舉新的leader,在執行Offlinepartition, OnlinePartition到OnlinePartition的轉換時會被調用。具體邏輯是:
  • 從zookeeper中獲取目標分區的leader和ISR,以及controller_epoch信息
  • 若是zk中的controller_epoch比當前controller保存的值大,說明當前controller曾經可能失敗過並選舉過別的controller且那個controller也干預過目標分區的leader選舉。若是是這樣的話直接終止正在進行中的leader選舉
  • 不然,使用指定的leader選舉器進行leader選舉,獲取leader、ISR和AR的信息並寫入zookeeper中保存
  • 若是zookeeper更新失敗(好比沒法鏈接zookeeper等),從步驟1開始重試,直至成功爲止
  • 更新controller緩存的leader、ISR信息,並將leader、ISR、AR信息加入到元數據請求隊列中等待後面發送元數據更新請求
8. triggerOnlinePartitionStateChange —— 當Kafka集羣成功選出controller或發生broker變動時就會調用該方法嘗試將全部處於NewPartition或OfflinePartition狀態的分區轉換狀態到OnlinePartition狀態。具體作法就是遍歷分區狀態緩存中的全部分區,只要其所屬的topic不在要刪除topic隊列中且分區處於NewPartition或OfflinePartition狀態就調用handleStateChange方法進行到OnlinePartition的狀態轉換。而後發送更新LeaderAndIsr請求以及元數據請求給broker
9. partitionInState —— 從controller緩存中獲取當前處於給定狀態的全部分區,貌似這個方法沒有被使用過
10. startup —— 啓動狀態機,先初始化各個分區的狀態,而後設置啓動標識位爲true代表已啓動,最後調用triggerOnlinePartitionStateChange方法將符合條件的分區都置爲OnlinePartition狀態
11. registerTopicChangeListener/deregisterTopicChangeListener —— 分別註冊和取消Zookeeper的/brokers/topics上分區狀態轉換監聽器
12. registerDeleteTopicListener/deregisterDeleteTopicListener —— 分別註冊和取消zookeeper的/admin/delete_topics下的topic刪除的監聽器
13. registerPartitionChangeListener/deregisterPartitionChangeListener —— 分別註冊和取消zookeeper的/brokers/topics/[topic]下的分區變動監聽器
14. registerListeners —— 註冊狀態轉換監聽器,另外若是開啓了topic刪除的功能(設置delete.topic.enable屬性爲true),那麼還要註冊刪除topic的監聽器
15. deregisterListeners —— 取消狀態轉換監聽器以及每一個topic的分區變動監聽器,清空topic-》分區變動監聽器的緩存。相似地,若是開啓了topic刪除的功能(設置delete.topic.enable屬性爲true),還要取消刪除topic的監聽器
16. shutdown —— controller關閉時會調用該方法來關閉狀態機:首先設置啓動標識位爲false,清空分區狀態緩存並取消全部Zookeeper監聽器
17. handleStateChanges —— 分區變動監聽器會調用該方法爲一組分區進行狀態轉換,具體作法就是遍歷分區集合,爲每個分區調用handleStateChange方法設置分區狀態,最後發送LeaderAndIsr請求以及更新元數據請求給broker
3、ReplicaStateMachine.scala
副本狀態機類,定義了一個分區副本的全部狀態集合:
NewReplica —— 建立topic或重分配分區時controller會建立新的副本,新建立的副本就是處於這個狀態。在此狀態中的副本只能接收"成爲follower"的狀態變動請求,可由NonExistentReplica狀態轉換而來
OnlineRelica —— 一旦啓動了一個副本以及該分區AR副本集合中的一部分,那麼就將設置該副本狀態爲OnlineReplica。在此狀態中的副本能夠接收"成爲leader"或"成爲follower"的狀態變動請求。可由NewRelica、OnlineReplica或OfflineReplica狀態轉換而來
OfflineReplica —— 若是一個副本掛掉(保存該副本的broker宕機)將被置於OfflineReplica狀態,可由NewReplica或OnlineReplica狀態轉換而來
ReplicaDeletionStarted —— 開啓副本刪除操做時會將副本狀態置於ReplicaDeletionStarted狀態,可由OfflineReplica狀態轉換而來
ReplicaDeletionSuccessful —— 若是副本刪除請求成功,返回的響應沒有錯誤的話,該副本會被置於ReplicaDeletionSuccessful狀態,可由ReplicaDeletionStarted狀態轉換而來
ReplicaDeletionIneligible —— 若是副本刪除失敗,將被置於ReplicaDeletionIneligible狀態,可由ReplicaDeletionStarted狀態轉換而來
NonExistentReplica —— 若是副本被成功刪除將被置於NonExistentReplica狀態,可由ReplicaDeletionSuccessful狀態轉換而來
在具體展開該類的字段方法以前,先說一下它內部定義的嵌套類,用於動態監聽副本全部的狀態變動:
BrokerChangeListener類
和不少監聽器類同樣,也實現了handleChildChange方法,具體邏輯以下:
  • 若是啓動了狀態機,首先開啓leader選舉計時器
  • 獲取要進行狀態變動的副本所在的全部broker Id,與controller中緩存的副本列表比較,找出新增長的broker
  • 在Zookeeper中獲取這些新增長broker的元數據信息並封裝到一個Broker列表中返回
  • 找出那些已被刪除的broker,而後使用給定的broker列表更新controller中可用broker緩存
  • 分別調用addBroker和removeBroker方法更新對應的broker線程
  • 最後調用onBrokerStartup和onBrokerFailure回調函數分別處理新增長的broker和刪除的broker
 
ReplicaStateMachine類與PartitionStateMachine類的代碼結構很像,咱們仍是一個個地分析其定義的字段和方法:
類字段
1. controllerContext —— 就是KafkaController類的一個實例,封裝了不少controller的方法
2. controllerId —— controller ID,由配置文件中的broker.id屬性指定
3. zkClient —— 一個ZooKeeper的客戶端,用於與zookeeper服務器交互
4. replicaState —— 保存的分區副本的狀態信息緩存
5. brokerChangeListener —— zk監聽器,用於監聽副本的狀態變動
6. hasStarted —— 主要用於標識該狀態機是否開啓
7. brokerRequestBatch —— 主要用於批量發送請求給Broker
8. stateChangeLogger —— 一個日誌類,用於一些日誌輸出
類方法
1. initializeReplicaState —— 設置zookeeper中全部分區的副本的初始狀態,在啓動副本狀態機時會調用該方法。具體邏輯以下:
  • 獲取controller中保存的分區副本分配緩存記錄
  • 找出每一個分區的AR信息,遍歷AR中的每一個broker,若是這個broker可用(是否在controller的可用broker列表緩存中),則將該副本狀態置於OnlineReplica,不然將副本狀態置於ReplicaDeletionIneligible狀態——controller failover時候broker會宕機,處於該broker的全部副本都要置於這個狀態
2. partitionAssignedToBroker —— 找出目標broker上某個topic的全部分區信息,不過這個方法貌似沒有被用到
3. assertValidPrevisousStates —— 給定目標轉換狀態,驗證當前狀態是否合法
4. handleStateChange —— 副本狀態機的核心方法。它定義了合法的狀態轉換,包括:
  • NonExistentReplica -> NewReplica:將當前leader和ISR封裝到一個LeaderAndIsr請求中發送給新的replica broker,併發送UpdateMetadata請求給每一個當前可用的broker
  • NewReplica -> OnlineReplica:將新增副本加入到AR中
  • OnlineReplica, OfflineReplica -> OnlineReplica:將當前leader和ISR封裝到一個LeaderAndIsr請求中發送給新的replica broker,併發送UpdateMetadata請求給每一個當前可用的broker
  • NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible -> OfflineReplica:首先發送StopReplicaRequest請求給副本(但不刪除副本),而後將該副本從ISR中移除,以後發送LeaderAndIsr請求給leader副本更新ISR併發送UpdateMetadata請求給全部可用的broker
  • OfflineReplica -> ReplicaDeletionStarted: 發送StopReplicaRequest請求給副本(同時刪除該副本)
  • ReplicaDeletionStarted -> ReplicaDeletionSuccessful:在狀態機中標記副本狀態
  • ReplicaDeletionStarted -> ReplicaDeletionIneligble:在狀態機中標記副本狀態
  • ReplicaDeletionSuccessful -> NonExistentReplica:從controller的分區副本分配緩存中移除某個副本
具體邏輯以下:
  • 若是狀態機沒有啓動,直接拋出異常退出
  • 獲取controller中保存的給定副本的狀態,若是無緩存記錄,初始化狀態爲NonExistentReplica,而後獲取controller中緩存的該分區的副本分配記錄
  • 根據給定的目標狀態進入不一樣的分支:
    • 若是要轉換爲NewReplica:必須驗證當前狀態是NonExistentReplica,而後從zookeeper中找出該副本分區的leader、ISR等信息。若是不存在這些信息,就等待leader選舉以後發請求給該副本;若是存在這些信息,還要判斷一些leader是否就是本身,若是是的話拋錯由於被選舉爲leader的副本是不能進行狀態轉換到NewReplica的。若是以上步驟都沒有拋錯,就把發送LeaderAndIsr請求給該副本併發送UpdateMetadata請求給全部可用的broker。最後設置副本狀態爲NewReplica
    • 若是要轉換爲ReplicaDeletionStarted,必須驗證當前狀態是OfflineReplica,而後更新副本狀態併發送中止副本的請求(經過回調函數來完成)
    • 若是要轉換爲ReplicaDeletionIneligible,說明該副本目前不能被刪除,首先要驗證當前狀態必須是ReplicaDeletionStarted,而後更新狀態爲ReplicaDeletionIneligible便可
    • 若是要轉換爲NonExistentReplica,必須驗證當前狀態是ReplicaDeletionSuccessful,以後在controller緩存中把副本從全部分區的AR中移除並從狀態緩存中移除該副本的記錄
    • 若是要轉換爲OnlineReplica,必須驗證當前狀態是NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible中的一種。若是是NewReplica狀態,找出當前分區的AR集合,若是該副本不在AR中則把它加入到AR;若是是其餘狀態的話則須要先檢查是否存在leader,若是不存在的話意味着該分區從未處於OnlinePartition狀態,也就是說broker從未開啓過該分區的消息日誌寫入,所以在本方法中也就什麼都不作。但若是存在leader信息,將發送LeaderAndIsr請求給該副本,併發送UpdateMetadata請求給全部可用的broker,最後設置狀態便可
    • 若是要轉換爲OfflineReplica,必須驗證當前狀態是NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible中的一種。首先發送中止副本的請求使得副本不在從leader出獲取消息,而後找出分區的leader和ISR。若是不存在則拋出異常,不然將分區副本從ISR中移除併發送LeaderAndIsr請求更新移除後的ISR,最後設置副本狀態便可
5. handleStateChanges —— 狀態機啓動時以及broker發生變動時會調用這個方法,主要負責處理一組副本的狀態變動。具體作法就是爲每個要處理的副本調用handleStateChange方法來進行狀態轉換,而後統一發送LeaderAndIsr請求列表中的全部請求
6. startup —— 啓動狀態機,先初始化副本狀態,而後設置啓動標識位,最後調用handleStateChanges方法將controller中保存全部當前副本設置爲OnlineReplica
7. registerBrokerChangeListener/deregisterBrokerChangeListener —— 在zk上註冊/取消用於監聽副本狀態變動的監聽器
8. registerListeners/deregisterListeners —— 同上
9. shutdown —— 關閉controller時候調用該方法關閉狀態機——設置啓動標識位爲false,清空副本狀態緩存並取消全部zk上的監聽器註冊
10. areAllReplicasForTopicDeleted —— 查看某topic下的全部副本是否已經被刪除
11. isAtLeastOneReplicaInDeletionStartedState —— 判斷是否存在至少一個副本正處於被刪除的過程當中
12. replicasInState —— 返回某個topic下指定副本狀態的全部副本
13. isAnyReplicaInState —— 判斷某topic下是否存在指定狀態的副本
14. replicasInDeletionStates —— 返回某topic下正處於刪除操做過程當中的全部副本
15. partitionAssignedToBroker —— 在一組topic中找出在給定broker有副本的topic及分區
相關文章
相關標籤/搜索