副本和分區狀態機

副本和分區狀態機

1 前言

Controller 是從Kafka集羣中選取一個的broker,負責管理topic分區和副本的狀態的變化,經過上篇咱們知道了controller的啓動流程,這篇咱們學習一下分區和副本狀態機。緩存

分區狀態機記錄着當前集羣全部 Partition 的狀態信息以及如何對 Partition 狀態轉移進行相應的處理;副本狀態機則是記錄着當前集羣全部 Replica 的狀態信息以及如何對 Replica 狀態轉變進行相應的處理。函數

2 PartitionStateMachine

PartitionStateMachine 記錄着集羣全部 Partition 的狀態信息,它決定着一個 Partition 處在什麼狀態以及它在什麼狀態下能夠轉變爲何狀態,Kafka 中 Partition 的狀態總共有如下四種類型:學習

  • l  NonExistentPartition:這個表明着這個 Partition 以前沒有被建立過或者以前建立了如今又被刪除了,它有效的前置狀態是 OfflinePartition;
  • l  NewPartition:Partition 建立後,它將處於這個狀態,這個狀態的 Partition 尚未 leader 和 isr,它有效的前置狀態是 NonExistentPartition;
  • l  OnlinePartition:一旦這個 Partition 的 leader 被選舉出來了,它將處於這個狀態,它有效的前置狀態是 NewPartition、OnlinePartition、OfflinePartition;
  • l  OfflinePartition:若是這個 Partition 的 leader 掉線,這個 Partition 將被轉移到這個狀態,它有效的前置狀態是 NewPartition、OnlinePartition、OfflinePartition。

分區狀態機轉移圖以下所示:ui

 

 

先看下 KafkaController 在啓動時,調用 PartitionStateMachine 的 startup() 方法初始化的處理過程。spa

PartitionStateMachine 的初始化方法以下所示:.net

 

 

在這個方法中,PartitionStateMachine 先調用 initializePartitionState() 方法初始化集羣中全部 Partition 的狀態信息:3d

  • l  若是該 Partition 有 LeaderAndIsr 信息,那麼若是 Partition leader 所在的機器是 alive 的,那麼將其狀態設置爲 OnlinePartition,不然設置爲 OfflinePartition 狀態;
  • l  若是該 Partition 沒有 LeaderAndIsr 信息,那麼將其狀態設置爲 NewPartition。

這裏只是將 Partition 的狀態信息更新分區狀態機的緩存 partitionState 中,並無真正進行狀態的轉移。code

 

在初始化的第二步,將會調用 triggerOnlinePartitionStateChange() 方法,爲全部的狀態爲 NewPartition/OnlinePartition 的 Partition 進行 leader 選舉,選舉成功後的話,其狀態將會設置爲 OnlinePartition。blog

 

 

上面方法的目的是爲嘗試將全部的狀態爲 NewPartition/OnlinePartition 的 Partition 狀態轉移到 OnlinePartition,這個方法主要是作了兩件事:ip

  • l  狀態轉移(這個在下面詳細講述);
  • l  發送相應的請求。

2.1分區的狀態轉移

這裏以要轉移的 TargetState 區分作詳細詳細講解,當 TargetState 分別是 NewPartition、OfflinePartition、NonExistentPartition 或者 OnlinePartition 時,副本狀態機所作的事情。

TargetState: NewPartition

NewPartition 是 Partition 剛建立時的一個狀態,其處理邏輯以下:

 

 

實現邏輯:

l  校驗其前置狀態,它有效的前置狀態爲 NonExistentPartition;

l  將該 Partition 的狀態轉移爲 NewPartition 狀態,而且更新到緩存中。

TargetState: OnlinePartition

OnlinePartition 是一個 Partition 正常工做時的狀態,這個狀態下的 Partition 已經成功選舉出了 leader 和 isr 信息,其實現邏輯以下:

 

 

實現邏輯:

  • l  校驗這個 Partition 的前置狀態,有效的前置狀態是:NewPartition、OnlinePartition 或者 OfflinePartition;
  • l  若是前置狀態是 NewPartition,那麼爲該 Partition 選舉 leader 和 isr,更新到 zk 和 controller 的緩存中,若是副本沒有處於 alive 狀態的話,就拋出異常;
  • l  若是前置狀態是 OnlinePartition,那麼只是觸發 leader 選舉,在 OnlinePartition –> OnlinePartition 這種狀態轉移時,須要傳入 leader 選舉的方法,觸發該 Partition 的 leader 選舉;
  • l  若是前置狀態是 OfflinePartition,同上,也是觸發 leader 選舉。
  • l  更新 Partition 的狀態爲 OnlinePartition。

對於以上這幾種狀況,不管前置狀態是什麼,最後都會觸發這個 Partition 的 leader 選舉,leader 成功後,都會觸發向這個 Partition 的全部 replica 發送 LeaderAndIsr 請求。

TargetState: OfflinePartition

OfflinePartition 是這個 Partition 的 leader 掛掉時轉移的一個狀態,若是 Partition 轉移到這個狀態,那麼就意味着這個 Partition 沒有了可用 leader。

 

 

實現邏輯:

  • l  校驗其前置狀態,它有效的前置狀態爲 NewPartition、OnlinePartition 或者 OfflinePartition;
  • l  將該 Partition 的狀態轉移爲 OfflinePartition 狀態,而且更新到緩存中。

TargetState: NonExistentPartition

NonExistentPartition 表明了已經處於 OfflinePartition 狀態的 Partition 已經從 metadata 和 zk 中刪除後進入的狀態。

 

 

 

實現邏輯:

  • l  校驗其前置狀態,它有效的前置狀態爲 OfflinePartition;
  • l  將該 Partition 的狀態轉移爲 NonExistentPartition 狀態,而且更新到緩存中。

2.2狀態轉移觸發的條件

這裏主要是看一下上面 Partition 各類轉移的觸發的條件,整理的結果以下表所示,部份內容會在後續文章講解。

TargetState

觸發方法

做用

OnlinePartition

Controller 的 shutdownBroker()

優雅關閉 Broker 時調用,由於要下線的節點是 leader,因此須要觸發 leader 選舉

OnlinePartition

Controller 的 onNewPartitionCreation()

Partition 新建時,這個是在 Replica 已經變爲 NewPartition 狀態後進行的,爲新建的 Partition 初始化 leader 和 isr

OnlinePartition

controller 的 onPreferredReplicaElection()

對 Partition 進行最優 leader 選舉,目的是觸發 leader 選舉

OnlinePartition

controller 的 moveReassignedPartitionLeaderIfRequired()

分區副本遷移完成後,1. 當前的 leader 不在 RAR 中,須要觸發 leader 選舉;2. 當前 leader 在 RAR 可是掉線了,也須要觸發 leader 選舉

OnlinePartition

PartitionStateMachine 的 triggerOnlinePartitionStateChange()

當 Controller 從新選舉出來或 broker 有變化時,目的爲了那些狀態爲 NewPartition/OfflinePartition 的 Partition 從新選舉 leader,選舉成功後狀態變爲 OnlinePartition

OnlinePartition

PartitionStateMachine 的 initializePartitionState()

Controller 初始化時,遍歷 zk 的全部的分區,若是有 LeaderAndIsr 信息而且 leader 在 alive broker 上,那麼就將狀態轉爲 OnlinePartition。

OfflinePartition

controller 的 onBrokerFailure()

當有 broker 掉線時,將 leader 在這個機器上的 Partition 設置爲 OfflinePartition

OfflinePartition

TopicDeletionManager 的 completeDeleteTopic()

Topic 刪除成功後,中間會將該 Partition 的狀態先轉變爲 OfflinePartition

NonExistentPartition

TopicDeletionManager 的 completeDeleteTopic()

Topic 刪除成功後,最後會將該 Partition 的狀態轉移爲 NonExistentPartition

NewPartition

Controller 的 onNewPartitionCreation()

Partition 剛建立時的一箇中間狀態 ,此時還沒選舉 leader 和設置 isr 信息

 

3 ReplicaStateMachine

ReplicaStateMachine 記錄着集羣全部 Replica 的狀態信息,它決定着一個 replica 處在什麼狀態以及它在什麼狀態下能夠轉變爲何狀態,Kafka 中副本的狀態總共有如下七種類型:

  • l  NewReplica:這種狀態下 Controller 能夠建立這個 Replica,這種狀態下該 Replica 只能做爲 follower,它能夠是當建立topic或分區從新分配期間副本被建立,也能夠是 Replica 刪除後的一個臨時狀態,它有效的前置狀態是 NonExistentReplica;
  • l  OnlineReplica:一旦這個 Replica 被分配到指定的 Partition 上,而且 Replica 建立完成,那麼它將會被置爲這個狀態,在這個狀態下,這個 Replica 既能夠做爲 leader 也能夠做爲 follower,它有效的前置狀態是 NewReplica、OnlineReplica 或 OfflineReplica;
  • l  OfflineReplica:若是一個 Replica 掛掉(所在的節點宕機或者其餘狀況),該 Replica 將會被轉換到這個狀態,它有的效前置狀態是 NewReplica、OfflineReplica 或者 OnlineReplica;
  • l  ReplicaDeletionStarted:Replica 開始刪除時被置爲的狀態,它有效的前置狀態是 OfflineReplica;
  • l  ReplicaDeletionSuccessful:若是 Replica 在刪除時沒有遇到任何錯誤信息,它將被置爲這個狀態,這個狀態表明該 Replica 的數據已經從節點上清除了,它有效的前置狀態是 ReplicaDeletionStarted;
  • l  ReplicaDeletionIneligible:若是 Replica 刪除失敗,它將會轉移到這個狀態,這個狀態意思是非法刪除,也就是刪除是沒法成功的,它有效的前置狀態是 ReplicaDeletionStarted;
  • l  NonExistentReplica:若是 Replica 刪除成功,它將被轉移到這個狀態,它有效的前置狀態是:ReplicaDeletionSuccessful。

上面的狀態中其中後面4是專門爲 Replica 刪除而服務的,副本狀態機轉移圖以下所示:

 

 

在以前介紹KafkaController 在啓動時,會調用 ReplicaStateMachine 的 startup() 方法初始化的處理過程。

 

 

在這個方法中,ReplicaStateMachine 先調用 initializeReplicaState() 方法初始化集羣中全部 Replica 的狀態信息,若是 Replica 所在機器是 alive 的,那麼將其狀態設置爲 OnlineReplica,不然設置爲 ReplicaDeletionIneligible 狀態,這裏只是將 Replica 的狀態信息更新副本狀態機的緩存 replicaState中,並無真正進行狀態轉移的操做。

 

 

 

接着第二步調用 handleStateChanges() 將全部存活的副本狀態轉移爲 OnlineReplica 狀態,這裏纔是真正進行狀態轉移的地方,其具體實現以下:

 

 

這裏是副本狀態機 startup() 方法的最後一步,它的目的是將全部 alive 的 Replica 狀態轉移到 OnlineReplica 狀態,因爲前面已經這些 alive replica 的狀態設置成了 OnlineReplica,因此這裏 Replica 的狀態轉移狀況是:OnlineReplica –> OnlineReplica,這個方法主要是作了兩件事:

  • l  狀態轉移(這個在下面詳細講述);
  • l  發送相應的請求。

3.1 副本的狀態轉移

TargetState: NewReplica

NewReplica 這個狀態是 Replica 準備開始建立是的一個狀態,其實現邏輯以下:

 

 

當想要把 Replica 的狀態轉移爲 NewReplica 時,副本狀態機的處理邏輯以下:

  • l  校驗 Replica 的前置狀態,只有處於 NonExistentReplica 狀態的副本才能轉移到 NewReplica 狀態;
  • l  從 zk 中獲取該 Topic-Partition 的 LeaderIsrAndControllerEpoch 信息;
  • l  若是獲取不到上述信息,直接將該 Replica 的狀態轉移成 NewReplica,而後結束流程(對與新建的 Partition,處於這個狀態時,該 Partition 是沒有相應的 LeaderAndIsr 信息的);
  • l  獲取到 Partition 的 LeaderIsrAndControllerEpoch 信息,若是發現該 Partition 的 leader 是當前副本,那麼就拋出 StateChangeFailedException 異常,由於處在這個狀態的 Replica 是不能被選舉爲 leader 的;
  • l  獲取到了 Partition 的 LeaderIsrAndControllerEpoch 信息,而且該 Partition 的 leader 不是當前 replica,那麼向該 Partition 的全部 Replica 添加一個 LeaderAndIsr 請求(添加 LeaderAndIsr 請求時,實際上也會向全部的 Broker 都添加一個 Update-Metadata 請求);
  • l  最後將該 Replica 的狀態轉移成 NewReplica,而後結束流程。

TargetState: ReplicaDeletionStarted

這是 Replica 開始刪除時的狀態,Replica 轉移到這種狀態的處理實現以下:

 

 

這部分的實現邏輯:

  • l  校驗其前置狀態,Replica 只能是在 OfflineReplica 的狀況下才能轉移到這種狀態;
  • l  更新向該 Replica 的狀態爲 ReplicaDeletionStarted;
  • l  向該 replica 發送 StopReplica 請求(deletePartition = true),收到這請求後,broker 會從物理存儲上刪除這個 Replica 的數據內容;
  • l  若是請求返回的話會觸發其回調函數(這部分會在 topic 刪除部分講解)。

TargetState: ReplicaDeletionIneligible

ReplicaDeletionIneligible 是副本刪除失敗時的狀態,Replica 轉移到這種狀態的處理實現以下:

 

 

實現邏輯:

  • l  校驗其前置狀態,Replica 只能是在 ReplicaDeletionStarted 下才能轉移這種狀態;
  • l  更新該 Replica 的狀態爲 ReplicaDeletionIneligible。

TargetState: ReplicaDeletionSuccessful

ReplicaDeletionSuccessful 是副本刪除成功時的狀態,Replica 轉移到這種狀態的處理實現以下:

 

 

實現邏輯:

  • l  檢驗其前置狀態,Replica 只能是在 ReplicaDeletionStarted 下才能轉移這種狀態;
  • l  更新該 Replica 的狀態爲 ReplicaDeletionSuccessful。

TargetState: NonExistentReplica

NonExistentReplica 是副本徹底刪除、不存在這個副本的狀態,Replica 轉移到這種狀態的處理實現以下:

 

 

實現邏輯:

  • l  檢驗其前置狀態,Replica 只能是在 ReplicaDeletionSuccessful 下才能轉移這種狀態;
  • l  在 controller 的 partitionReplicaAssignment 刪除這個 Partition 對應的 replica 信息;
  • l  從 Controller 和副本狀態機中將這個 Topic 從緩存中刪除。

TargetState: OnlineReplica

OnlineReplica 是副本正常工做時的狀態,此時的 Replica 既能夠做爲 leader 也能夠做爲 follower,Replica 轉移到這種狀態的處理實現以下:

 

 

從前面的狀態轉移圖中能夠看出,當 Replica 處在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態時,Replica 是能夠轉移到 OnlineReplica 狀態的,下面分兩種狀況講述:

NewReplica –> OnlineReplica 的處理邏輯以下:

  • l  從 Controller 的 partitionReplicaAssignment 中獲取這個 Partition 的 AR;
  • l  若是 Replica 不在 AR 中的話,那麼就將其添加到 Partition 的 AR 中;
  • l  最後將 Replica 的狀態設置爲 OnlineReplica 狀態。

OnlineReplica/OfflineReplica/ReplicaDeletionIneligible –> OnlineReplica 的處理邏輯以下:

  • l  從 Controller 的 partitionLeadershipInfo 中獲取 Partition 的 LeaderAndIsr 信息;
  • l  若是該信息存在,那麼就向這個 Replica 所在 broker 添加這個 Partition 的 LeaderAndIsr 請求,並將 Replica 的狀態設置爲 OnlineReplica 狀態;
  • l  不然不作任務處理;
  • l  最後更新AR Replica 的狀態爲 OnlineReplica。

TargetState: OfflineReplica

OfflineReplica 是 Replica 所在 Broker 掉線時 Replica 的狀態,轉移到這種狀態的處理邏輯以下:

 

 

處理邏輯以下:

  • l  校驗其前置狀態,只有 Replica 在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態時,才能轉移到這種狀態;
  • l  向該 Replica 所在節點發送 StopReplica 請求(deletePartition = false);
  • l  調用 Controller 的 removeReplicaFromIsr() 方法將該 replica 從 Partition 的 isr 移除這個 replica(前提 isr 中還有其餘有效副本),而後向該 Partition 的其餘副本發送 LeaderAndIsr 請求;
  • l  更新這個 Replica 的狀態爲 OfflineReplica。

3.2狀態轉移觸發的條件

這裏主要是看一下上面 Replica 各類轉移的觸發的條件,整理的結果以下表所示

TargetState

觸發方法

做用

OnlineReplica

KafkaController 的 onBrokerStartup()

Broker 啓動時,目的是將在該節點的 Replica 狀態設置爲 OnlineReplica

OnlineReplica

KafkaController 的 onNewPartitionCreation()

新建 Partition 時,Replica 初始化及 Partition 狀態變成 OnlinePartition 後,新建立的 Replica 狀態也變爲 OnlineReplica;

OnlineReplica

KafkaController 的 onPartitionReassignment()

副本遷移完成後,RAR 中的副本設置爲 OnlineReplica 狀態

OnlineReplica

ReplicaStateMachine 的 startup()

副本狀態機剛初始化啓動時,將存活的副本狀態設置爲 OnlineReplica

OfflineReplica

TopicDeletionManager 的 markTopicForDeletionRetry()

將刪除失敗的 Replica 設置爲 OfflineReplica,從新進行刪除

OfflineReplica

TopicDeletionManager 的 startReplicaDeletion()

開始副本刪除時,先將副本設置爲 OfflineReplica

OfflineReplica

KafkaController 的 shutdownBroker() 方法

優雅關閉 broker 時,目的是把下線節點上的副本狀態設置爲 OfflineReplica

OfflineReplica

KafkaController 的 onBrokerFailure()

broker 掉線時,目的是把下線節點上的副本狀態設置爲 OfflineReplica

NewReplica

KafkaController 的 onNewPartitionCreation()

Partition 新建時,當 Partition 狀態變爲 NewPartition 後,副本的狀態變爲 NewReplica

NewReplica

KafkaController 的 startNewReplicasForReassignedPartition()

Partition 副本遷移時,將新分配的副本狀態設置爲 NewReplica;

ReplicaDeletionStarted

TopicDeletionManager 的 startReplicaDeletion()

下線副本時,將成功設置爲 OfflineReplica 的 Replica 設置爲 ReplicaDeletionStarted 狀態,開始物理上刪除副本數據(也是發送 StopReplica)

ReplicaDeletionStarted

KafkaController 的 stopOldReplicasOfReassignedPartition()

Partition 的副本遷移時,目的是下線那些 old replica,新的 replica 已經遷移到新分配的副本上了

ReplicaDeletionSuccessful

TopicDeletionManager 的 completeReplicaDeletion()

物理將數據成功刪除的 Replica 狀態會變爲這個

ReplicaDeletionSuccessful

KafkaController 的 stopOldReplicasOfReassignedPartition()

Partition 的副本遷移時,在下線那些舊 Replica 時的一個狀態,刪除成功

ReplicaDeletionIneligible

TopicDeletionManager 的 startReplicaDeletion()

開始副本刪除時,刪除失敗的副本會設置成這個狀態

ReplicaDeletionIneligible

KafkaController 的 stopOldReplicasOfReassignedPartition()

Partition 副本遷移時,在下線那些舊的 Replica 時的一個狀態,刪除失敗

NonExistentReplica

TopicDeletionManager 的 completeReplicaDeletion()

副本刪除成功後(狀態爲 ReplicaDeletionSuccessful),從狀態機和 Controller 的緩存中清除該副本的記錄;

NonExistentReplica

KafkaController 的 stopOldReplicasOfReassignedPartition()

Partition 的副本成功遷移、舊副本成功刪除後,從狀態機和 Controller 的緩存中清除舊副本的記錄

 

參考資料:

https://blog.csdn.net/lizhitao/article/details/28108919

https://www.maiyewang.com/?p=7855

http://matt33.com/2018/06/16/controller-state-machine/

相關文章
相關標籤/搜索