Kafka partition 副本遷移與broker上下線

Kafka partition 副本遷移與broker上下線

1 前言

Controller 在初始化時,會利用 ZK 的 watch 機制註冊不少不一樣類型的監聽器,當監聽的事件被觸發時,Controller 就會觸發相應的操做。緩存

Controller 在初始化時,會註冊多種類型的監聽器,主要有如下幾種:session

  • l  監聽 /admin/reassign_partitions 節點,用於分區副本遷移的監聽;
  • l  監聽 /isr_change_notification 節點,用於 Partition Isr 變更的監聽,;
  • l  監聽 /admin/preferred_replica_election 節點,用於須要進行 Partition 最優 leader 選舉的監聽;
  • l  監聽 /brokers/topics 節點,用於 Topic 新建的監聽;
  • l  監聽 /brokers/topics/TOPIC_NAME 節點,用於 Topic Partition 擴容的監聽;
  • l  監聽 /admin/delete_topics 節點,用於 Topic 刪除的監聽;
  • l  監聽 /brokers/ids 節點,用於 Broker 上下線的監聽。

2 Partition 副本遷移總體流程

Partition 的副本遷移實際上就是將分區的副本從新分配到不一樣的代理節點上,若是 zk 中新副本的集合與 Partition 原來的副本集合相同,那麼這個副本就不須要從新分配了。併發

Partition 的副本遷移是經過監聽 zk 的 /admin/reassign_partitions 節點觸發的,Kafka 也向用戶提供相應的腳本工具進行副本遷移,副本遷移的腳本使用方法以下所示:工具

 

在調用腳本向 zk 提交 Partition 的遷移計劃時,遷移計劃更新到 zk 前須要進行一步判斷,若是該節點(寫入遷移計劃的節點)已經存在,即副本遷移還在進行,那麼本次副本遷移計劃是沒法提交的,實現的邏輯以下所示:線程

 

 

2.1 ZK PartitionsReassignedListener 副本遷移處理

在 zk 的 /admin/reassign_partitions 節點數據有變化時,就會觸發 PartitionsReassignedListener 的 doHandleDataChange() 方法,實現以下:3d

 

 

若是 Partition 出現下面的狀況,將不會進行副本遷移,直接將 Partition 的遷移計劃從 ZK 移除:代理

  • l  這個 Partition 的 reassignment 以前已經存在, 即正在遷移中;
  • l  這個 Partition 新分配的 replica 與以前的 replicas 相同;
  • l  這個 Partition 全部新分配 replica 都已經 dead;
  • l  這個 Partition 已經被設置了刪除標誌。

對於能夠進行副本遷移的 Partition 集合,這裏將會調用 Kafka Controller 的 initiateReassignReplicasForTopicPartition() 方法對每一個 Partition 進行處理。code

2.2 副本遷移初始化

進行了前面的判斷後,這個 Partition 知足了能夠遷移的條件,Controller 會首先初始化副本遷移的流程,實現以下所示blog

 

 

對於副本遷移流程初始化以下:隊列

  • l  經過 watchIsrChangesForReassignedPartition() 方法監控這個 Partition 的 LeaderAndIsr 變化,若是有新的副本數據同步完成,那麼 leader 會將其加到 isr 中更新到 zk 中,這時候 Controller 是能夠接收到相關的信息通知的;
  • l  將正在遷移的 Partition 添加到 partitionsBeingReassigned 中,它會記錄當前正在遷移的 Partition 列表;
  • l  將要遷移的 Topic 設置爲非法刪除刪除狀態,在這個狀態的 Topic 是沒法進行刪除的;
  • l  調用 onPartitionReassignment(),進行副本遷移。

在第一步中,會向這個 Partition 註冊一個額外的監聽器,監聽其 LeaderAndIsr 信息變化,以下所示:

 

 

若是該 Partition 的 LeaderAndIsr 信息有變更,那麼就會觸發這個 listener 的 doHandleDataChange() 方法:

  • l  首先檢查這個 Partition 是否在還在遷移中,不在的話直接結束流程,由於這個監聽器原本就是爲了 Partition 副本遷移而服務的;
  • l  從 zk 獲取最新的 leader 和 isr 信息,若是新分配的副本所有都在 isr 中,那麼就再次觸發 controller 的 onPartitionReassignment() 方法,再次調用時實際上已經證實了這個 Partition 的副本遷移已經完成,不然的話就會不進行任何處理,等待新分配的全部副本遷移完成。

2.3 副本遷移

Partition 副本遷移真正實際處理是在 Controller 的 onPartitionReassignment() 方法完成的,在看這個方法以前,先介紹幾個基本的概念(假設一個 Partition 原來的 replica 是 {一、二、3},新分配的副本列表是:{二、三、4}):

  • RAR = Reassigned replicas,即新分配的副本列表,也就是 {二、三、4};
  • OAR = Original list of replicas for partition,即這個 Partition 原來的副本列表,也就是 {一、二、3};
  • AR = current assigned replicas,該 Partition 當前的副本列表,這個會隨着階段的不一樣而變化;
  • RAR-OAR:須要建立、數據同步的新副本,也就是 {4};
  • OAR-RAR:須要刪除的副本,也就是{1}

這個方法的實現以下所示:

 

 

這個方法總體分爲如下12個步驟:

  • l  把 AR = OAR+RAR ({一、二、三、4})更新到 zk 及本地 Controller 緩存中;
  • l  發送 LeaderAndIsr 給 AR 中每個副本,而且會強制更新 zk 中 leader 的 epoch;
  • l  建立須要新建的副本(【RAR-OAR】,即 {4}),將其狀態設置爲 NewReplica;
  • l  等待直到 RAR({二、三、4}) 中的全部副本都在 ISR 中;
  • l  把 RAR({二、三、4}) 中的全部副本設置爲 OnReplica 狀態;
  • l  將緩存中 AR 更新爲 RAR(從新分配的副本列表,即 {二、三、4});
  • l  若是 leader 不在 RAR 中, 就從 RAR 選擇對應的 leader, 而後發送 LeaderAndIsr 請求;若是不須要,那麼只會更新 leader epoch,而後發送 LeaderAndIsr 請求; 在發送 LeaderAndIsr 請求前設置了 AR=RAR, 這將確保了 leader 在 isr 中不會添加任何 【RAR-OAR】中的副本(old replica,即 {1});
  • l  將【OAR-RAR】({1})中的副本設置爲 OfflineReplica 狀態,OfflineReplica 狀態的變化,將會從 ISR 中刪除【OAR-RAR】的副本,更新到 zk 中併發送 LeaderAndIsr 請求給 leader,通知 leader isr 變更。以後再發送 StopReplica 請求(delete=false)給【OAR-RAR】中的副本;
  • l  將【OAR-RAR】中的副本設置爲 NonExistentReplica 狀態。這將發送 StopReplica 請求(delete=true)給【OAR-RAR】中的副本,這些副本將會從本地上刪除數據;
  • l  在 zk 中更新 AR 爲 RAR;
  • l  更新 zk 中路徑 【/admin/reassign_partitions】信息,移除已經成功遷移的 Partition;
  • l  leader 選舉以後,這個 replica 和 isr 信息將會變更,發送 metadata 更新給全部的 broker。

上面的流程簡單來講,就是先建立新的 replica,開始同步數據,等待全部新的分配都加入到了 isr 中後,開始進行 leader 選舉(須要的狀況下),下線不須要的副本(OAR-RAR),下線完成後將 Partition 的最新 AR (即 RAR)信息更新到 zk 中,最後發送相應的請求給 broker,到這裏一個 Partition 的副本遷移算是完成了。

3 Broker上下線

每臺 Broker 在上線時,都會與 ZK 創建一個創建一個 session,並在 /brokers/ids 下注冊一個節點,節點名字就是 broker id,這個節點是臨時節點,該節點內部會有這個 Broker 的詳細節點信息。Controller 會監聽 /brokers/ids 這個路徑下的全部子節點,若是有新的節點出現,那麼就表明有新的 Broker 上線,若是有節點消失,就表明有 broker 下線,Controller 會進行相應的處理,Kafka 就是利用 ZK 的這種 watch 機制及臨時節點的特性來完成集羣 Broker 的上下線。

3.1 ZK 回調處理

BrokerChangeListener 是監聽 /brokers/ids 節點的監聽器,當該節點有變化時會觸發 doHandleChildChange() 方法,具體實現以下:

 

 

這裏須要重點關注 doHandleChildChange() 方法的實現,該方法處理邏輯以下:

  • l  從 ZK 獲取當前的 Broker 列表(curBrokers)及 broker id 的列表(curBrokerIds);
  • l  獲取當前 Controller 中緩存的 broker id 列表(liveOrShuttingDownBrokerIds);
  • l  獲取新上線 broker id 列表:newBrokerIds = curBrokerIds – liveOrShuttingDownBrokerIds;
  • l  獲取掉線的 broker id 列表:deadBrokerIds = liveOrShuttingDownBrokerIds – curBrokerIds;
  • l  對於新上線的 broker,先在 ControllerChannelManager 中添加該 broker(即創建與該 Broker 的鏈接、初始化相應的發送線程和請求隊列),最後 Controller 調用 onBrokerStartup() 上線該 Broker;
  • l  對於掉線的 broker,先在 ControllerChannelManager 中移除該 broker(即關閉與 Broker 的鏈接、關閉相應的發送線程和清空請求隊列),最後 Controller 調用 onBrokerFailure() 下線該 Broker。

3.2 broker上線

一臺 Broker 上線主要有如下兩步:

  • l  在 Controller Channel Manager 中添加該 Broker 節點,主要的內容是:Controller 創建與該 Broker 的鏈接、初始化相應的請求發送線程與請求隊列;
  • l  調用 Controller 的 onBrokerStartup() 方法上線該節點。

 

ontroller Channel Manager 添加 Broker 的實現以下,這裏就不重複講述了,前面講述 Controller 服務初始化的文章已經講述過這部分的內容。下面再看下 Controller 如何在 onBrokerStartup() 方法中實現 Broker 上線操做的,具體實現以下所示:

 

 

onBrokerStartup() 方法在實現的邏輯上分爲如下幾步:

  • l  調用 sendUpdateMetadataRequest() 方法向當前集羣全部存活的 Broker 發送 Update Metadata 請求,這樣的話其餘的節點就會知道當前的 Broker 已經上線了;
  • l  獲取當前節點分配的全部的 Replica 列表,並將其狀態轉移爲 OnlineReplica 狀態;
  • l  觸發 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,爲全部處於 NewPartition/OfflinePartition 狀態的 Partition 進行 leader 選舉,若是 leader 選舉成功,那麼該 Partition 的狀態就會轉移到 OnlinePartition 狀態,不然狀態轉移失敗;
  • l  若是副本遷移中有新的 Replica 落在這臺新上線的節點上,那麼開始執行副本遷移操做;
  • l  若是以前因爲這個 Topic 設置爲刪除標誌,可是因爲其中有 Replica 掉線而致使沒法刪除,這裏在節點啓動後,嘗試從新執行刪除操做。

到此爲止,一臺 Broker 算是真正加入到了 Kafka 的集羣中,在上述過程當中,涉及到 leader 選舉的操做,都會觸發 LeaderAndIsr 請求及 Metadata 請求的發送。

3.3 broker下線

一臺 Broker 掉線後主要有如下兩步:

l  首先在 Controller Channel Manager 中移除該 Broker 節點,主要的內容是:關閉 Controller 與 Broker 的鏈接和相應的請求發送線程,並清空請求隊列;

l  調用 Controller 的 onBrokerFailure() 方法下線該節點。

Controller Channel Manager 下線 Broker 的處理以下所示:

 

 

在 Controller Channel Manager 處理完掉線的 Broker 節點後,下面 KafkaController 將會調用 onBrokerFailure() 進行相應的處理,其實現以下:

 

 

Controller 對於掉線 Broker 的處理過程主要有如下幾步:

  • l  首先找到 Leader 在該 Broker 上全部 Partition 列表,而後將這些 Partition 的狀態所有轉移爲 OfflinePartition 狀態;
  • l  觸發 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,爲全部處於 NewPartition/OfflinePartition 狀態的 Partition 進行 Leader 選舉,若是 Leader 選舉成功,那麼該 Partition 的狀態就會遷移到 OnlinePartition 狀態,不然狀態轉移失敗(Broker 上線/掉線、Controller 初始化時都會觸發這個方法);
  • l  獲取在該 Broker 上的全部 Replica 列表,將其狀態轉移成 OfflineReplica 狀態;
  • l  過濾出設置爲刪除、而且有副本在該節點上的 Topic 列表,先將該 Replica 的轉移成 ReplicaDeletionIneligible 狀態,而後再將該 Topic 標記爲非法刪除,即由於有 Replica 掉線致使該 Topic 沒法刪除;
  • l  若是 leader 在該 Broker 上全部 Partition 列表不爲空,證實有 Partition 的 leader 須要選舉,在最後一步會觸發全局 metadata 信息的更新。

到這裏,一臺掉線的 Broker 算是真正下線完成了。

3.4 主動關閉broker

Controller 在接收這個關閉服務的請求,經過 shutdownBroker() 方法進行處理,實現以下所示:

 

 

上述方法的處理邏輯以下:

  • l  先將要下線的 Broker 添加到 shuttingDownBrokerIds 集合中,該集合記錄了當前正在進行關閉的 broker 列表;
  • l  獲取副本在該節點上的全部 Partition 的列表集合;
  • l  遍歷上述 Partition 列表進行處理:若是該 Partition 的 leader 是要下線的節點,那麼經過 PartitionStateMachine 進行狀態轉移(OnlinePartition –> OnlinePartition)觸發 leader 選舉,使用的 leader 選舉方法是 ControlledShutdownLeaderSelector,它會選舉 isr 中第一個沒有正在關閉的 Replica 做爲 leader,不然拋出 StateChangeFailedException 異常;
  • l  不然的話,即要下線的節點不是 leader,那麼就向要下線的節點發送 StopReplica 請求中止副本同步,並將該副本設置爲 OfflineReplica 狀態,這裏對 Replica 進行處理的緣由是爲了讓要下線的機器關閉副本同步流程,這樣 Kafka 服務才能正常關閉。

 

參考資料:

http://matt33.com/2018/06/16/partition-reassignment/

http://matt33.com/2018/06/17/broker-online-offline/

相關文章
相關標籤/搜索