Controller 是從Kafka集羣中選取一個的broker,負責管理topic分區和副本的狀態的變化,經過上篇咱們知道了controller的啓動流程,這篇咱們學習一下分區和副本狀態機。緩存
分區狀態機記錄着當前集羣全部 Partition 的狀態信息以及如何對 Partition 狀態轉移進行相應的處理;副本狀態機則是記錄着當前集羣全部 Replica 的狀態信息以及如何對 Replica 狀態轉變進行相應的處理。函數
PartitionStateMachine 記錄着集羣全部 Partition 的狀態信息,它決定着一個 Partition 處在什麼狀態以及它在什麼狀態下能夠轉變爲何狀態,Kafka 中 Partition 的狀態總共有如下四種類型:學習
分區狀態機轉移圖以下所示:ui
先看下 KafkaController 在啓動時,調用 PartitionStateMachine 的 startup()
方法初始化的處理過程。spa
PartitionStateMachine 的初始化方法以下所示:.net
在這個方法中,PartitionStateMachine 先調用 initializePartitionState() 方法初始化集羣中全部 Partition 的狀態信息:3d
這裏只是將 Partition 的狀態信息更新分區狀態機的緩存 partitionState 中,並無真正進行狀態的轉移。code
在初始化的第二步,將會調用 triggerOnlinePartitionStateChange()
方法,爲全部的狀態爲 NewPartition/OnlinePartition 的 Partition 進行 leader 選舉,選舉成功後的話,其狀態將會設置爲 OnlinePartition。blog
上面方法的目的是爲嘗試將全部的狀態爲 NewPartition/OnlinePartition 的 Partition 狀態轉移到 OnlinePartition,這個方法主要是作了兩件事:ip
這裏以要轉移的 TargetState 區分作詳細詳細講解,當 TargetState 分別是 NewPartition、OfflinePartition、NonExistentPartition 或者 OnlinePartition 時,副本狀態機所作的事情。
NewPartition 是 Partition 剛建立時的一個狀態,其處理邏輯以下:
實現邏輯:
l 校驗其前置狀態,它有效的前置狀態爲 NonExistentPartition;
l 將該 Partition 的狀態轉移爲 NewPartition 狀態,而且更新到緩存中。
OnlinePartition 是一個 Partition 正常工做時的狀態,這個狀態下的 Partition 已經成功選舉出了 leader 和 isr 信息,其實現邏輯以下:
實現邏輯:
對於以上這幾種狀況,不管前置狀態是什麼,最後都會觸發這個 Partition 的 leader 選舉,leader 成功後,都會觸發向這個 Partition 的全部 replica 發送 LeaderAndIsr 請求。
OfflinePartition 是這個 Partition 的 leader 掛掉時轉移的一個狀態,若是 Partition 轉移到這個狀態,那麼就意味着這個 Partition 沒有了可用 leader。
實現邏輯:
NonExistentPartition 表明了已經處於 OfflinePartition 狀態的 Partition 已經從 metadata 和 zk 中刪除後進入的狀態。
實現邏輯:
這裏主要是看一下上面 Partition 各類轉移的觸發的條件,整理的結果以下表所示,部份內容會在後續文章講解。
TargetState |
觸發方法 |
做用 |
OnlinePartition |
Controller 的 shutdownBroker() |
優雅關閉 Broker 時調用,由於要下線的節點是 leader,因此須要觸發 leader 選舉 |
OnlinePartition |
Controller 的 onNewPartitionCreation() |
Partition 新建時,這個是在 Replica 已經變爲 NewPartition 狀態後進行的,爲新建的 Partition 初始化 leader 和 isr |
OnlinePartition |
controller 的 onPreferredReplicaElection() |
對 Partition 進行最優 leader 選舉,目的是觸發 leader 選舉 |
OnlinePartition |
controller 的 moveReassignedPartitionLeaderIfRequired() |
分區副本遷移完成後,1. 當前的 leader 不在 RAR 中,須要觸發 leader 選舉;2. 當前 leader 在 RAR 可是掉線了,也須要觸發 leader 選舉 |
OnlinePartition |
PartitionStateMachine 的 triggerOnlinePartitionStateChange() |
當 Controller 從新選舉出來或 broker 有變化時,目的爲了那些狀態爲 NewPartition/OfflinePartition 的 Partition 從新選舉 leader,選舉成功後狀態變爲 OnlinePartition |
OnlinePartition |
PartitionStateMachine 的 initializePartitionState() |
Controller 初始化時,遍歷 zk 的全部的分區,若是有 LeaderAndIsr 信息而且 leader 在 alive broker 上,那麼就將狀態轉爲 OnlinePartition。 |
OfflinePartition |
controller 的 onBrokerFailure() |
當有 broker 掉線時,將 leader 在這個機器上的 Partition 設置爲 OfflinePartition |
OfflinePartition |
TopicDeletionManager 的 completeDeleteTopic() |
Topic 刪除成功後,中間會將該 Partition 的狀態先轉變爲 OfflinePartition |
NonExistentPartition |
TopicDeletionManager 的 completeDeleteTopic() |
Topic 刪除成功後,最後會將該 Partition 的狀態轉移爲 NonExistentPartition |
NewPartition |
Controller 的 onNewPartitionCreation() |
Partition 剛建立時的一箇中間狀態 ,此時還沒選舉 leader 和設置 isr 信息 |
ReplicaStateMachine 記錄着集羣全部 Replica 的狀態信息,它決定着一個 replica 處在什麼狀態以及它在什麼狀態下能夠轉變爲何狀態,Kafka 中副本的狀態總共有如下七種類型:
上面的狀態中其中後面4是專門爲 Replica 刪除而服務的,副本狀態機轉移圖以下所示:
在以前介紹KafkaController 在啓動時,會調用 ReplicaStateMachine 的 startup()
方法初始化的處理過程。
在這個方法中,ReplicaStateMachine 先調用 initializeReplicaState()
方法初始化集羣中全部 Replica 的狀態信息,若是 Replica 所在機器是 alive 的,那麼將其狀態設置爲 OnlineReplica,不然設置爲 ReplicaDeletionIneligible 狀態,這裏只是將 Replica 的狀態信息更新副本狀態機的緩存 replicaState
中,並無真正進行狀態轉移的操做。
接着第二步調用 handleStateChanges()
將全部存活的副本狀態轉移爲 OnlineReplica 狀態,這裏纔是真正進行狀態轉移的地方,其具體實現以下:
這裏是副本狀態機 startup() 方法的最後一步,它的目的是將全部 alive 的 Replica 狀態轉移到 OnlineReplica 狀態,因爲前面已經這些 alive replica 的狀態設置成了 OnlineReplica,因此這裏 Replica 的狀態轉移狀況是:OnlineReplica –> OnlineReplica,這個方法主要是作了兩件事:
NewReplica 這個狀態是 Replica 準備開始建立是的一個狀態,其實現邏輯以下:
當想要把 Replica 的狀態轉移爲 NewReplica 時,副本狀態機的處理邏輯以下:
這是 Replica 開始刪除時的狀態,Replica 轉移到這種狀態的處理實現以下:
這部分的實現邏輯:
ReplicaDeletionIneligible 是副本刪除失敗時的狀態,Replica 轉移到這種狀態的處理實現以下:
實現邏輯:
ReplicaDeletionSuccessful 是副本刪除成功時的狀態,Replica 轉移到這種狀態的處理實現以下:
實現邏輯:
NonExistentReplica 是副本徹底刪除、不存在這個副本的狀態,Replica 轉移到這種狀態的處理實現以下:
實現邏輯:
OnlineReplica 是副本正常工做時的狀態,此時的 Replica 既能夠做爲 leader 也能夠做爲 follower,Replica 轉移到這種狀態的處理實現以下:
從前面的狀態轉移圖中能夠看出,當 Replica 處在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態時,Replica 是能夠轉移到 OnlineReplica 狀態的,下面分兩種狀況講述:
NewReplica –> OnlineReplica 的處理邏輯以下:
OnlineReplica/OfflineReplica/ReplicaDeletionIneligible –> OnlineReplica 的處理邏輯以下:
OfflineReplica 是 Replica 所在 Broker 掉線時 Replica 的狀態,轉移到這種狀態的處理邏輯以下:
處理邏輯以下:
這裏主要是看一下上面 Replica 各類轉移的觸發的條件,整理的結果以下表所示
TargetState |
觸發方法 |
做用 |
OnlineReplica |
KafkaController 的 onBrokerStartup() |
Broker 啓動時,目的是將在該節點的 Replica 狀態設置爲 OnlineReplica |
OnlineReplica |
KafkaController 的 onNewPartitionCreation() |
新建 Partition 時,Replica 初始化及 Partition 狀態變成 OnlinePartition 後,新建立的 Replica 狀態也變爲 OnlineReplica; |
OnlineReplica |
KafkaController 的 onPartitionReassignment() |
副本遷移完成後,RAR 中的副本設置爲 OnlineReplica 狀態 |
OnlineReplica |
ReplicaStateMachine 的 startup() |
副本狀態機剛初始化啓動時,將存活的副本狀態設置爲 OnlineReplica |
OfflineReplica |
TopicDeletionManager 的 markTopicForDeletionRetry() |
將刪除失敗的 Replica 設置爲 OfflineReplica,從新進行刪除 |
OfflineReplica |
TopicDeletionManager 的 startReplicaDeletion() |
開始副本刪除時,先將副本設置爲 OfflineReplica |
OfflineReplica |
KafkaController 的 shutdownBroker() 方法 |
優雅關閉 broker 時,目的是把下線節點上的副本狀態設置爲 OfflineReplica |
OfflineReplica |
KafkaController 的 onBrokerFailure() |
broker 掉線時,目的是把下線節點上的副本狀態設置爲 OfflineReplica |
NewReplica |
KafkaController 的 onNewPartitionCreation() |
Partition 新建時,當 Partition 狀態變爲 NewPartition 後,副本的狀態變爲 NewReplica |
NewReplica |
KafkaController 的 startNewReplicasForReassignedPartition() |
Partition 副本遷移時,將新分配的副本狀態設置爲 NewReplica; |
ReplicaDeletionStarted |
TopicDeletionManager 的 startReplicaDeletion() |
下線副本時,將成功設置爲 OfflineReplica 的 Replica 設置爲 ReplicaDeletionStarted 狀態,開始物理上刪除副本數據(也是發送 StopReplica) |
ReplicaDeletionStarted |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 的副本遷移時,目的是下線那些 old replica,新的 replica 已經遷移到新分配的副本上了 |
ReplicaDeletionSuccessful |
TopicDeletionManager 的 completeReplicaDeletion() |
物理將數據成功刪除的 Replica 狀態會變爲這個 |
ReplicaDeletionSuccessful |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 的副本遷移時,在下線那些舊 Replica 時的一個狀態,刪除成功 |
ReplicaDeletionIneligible |
TopicDeletionManager 的 startReplicaDeletion() |
開始副本刪除時,刪除失敗的副本會設置成這個狀態 |
ReplicaDeletionIneligible |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 副本遷移時,在下線那些舊的 Replica 時的一個狀態,刪除失敗 |
NonExistentReplica |
TopicDeletionManager 的 completeReplicaDeletion() |
副本刪除成功後(狀態爲 ReplicaDeletionSuccessful),從狀態機和 Controller 的緩存中清除該副本的記錄; |
NonExistentReplica |
KafkaController 的 stopOldReplicasOfReassignedPartition() |
Partition 的副本成功遷移、舊副本成功刪除後,從狀態機和 Controller 的緩存中清除舊副本的記錄 |
參考資料:
https://blog.csdn.net/lizhitao/article/details/28108919