Kafka設計解析(三)- Kafka High Availability (下)

【原創聲明】本文屬做者原創,已受權InfoQ中文站首發,轉載請務必在文章開頭標明出自「Jason's Blog」,並附上原文連接http://www.jasongj.com/2015/06/08/KafkaColumn3/

同時歡迎關注做者微信公衆號【大數據架構】
大數據架構html

摘要

  本文在上篇文章基礎上,更加深刻講解了Kafka的HA機制,主要闡述了HA相關各類場景,如Broker failover,Controller failover,Topic建立/刪除,Broker啓動,Follower從Leader fetch數據等詳細處理過程。同時介紹了Kafka提供的與Replication相關的工具,如從新分配Partition等。node

Broker Failover過程

Controller對Broker failure的處理過程

  1. Controller在Zookeeper的/brokers/ids節點上註冊Watch。一旦有Broker宕機(本文用宕機表明任何讓Kafka認爲其Broker die的情景,包括但不限於機器斷電,網絡不可用,GC致使的Stop The World,進程crash等),其在Zookeeper對應的Znode會自動被刪除,Zookeeper會fire Controller註冊的Watch,Controller便可獲取最新的倖存的Broker列表。
  2. Controller決定set_p,該集合包含了宕機的全部Broker上的全部Partition。
  3. 對set_p中的每個Partition:
      3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR。
      3.2 決定該Partition的新Leader。若是當前ISR中有至少一個Replica還倖存,則選擇其中一個做爲新Leader,新的ISR則包含當前ISR中全部倖存的Replica。不然選擇該Partition中任意一個倖存的Replica做爲新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。若是該Partition的全部Replica都宕機了,則將新的Leader設置爲-1。
      3.3 將新的Leader,ISR和新的leader_epochcontroller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操做只有Controller版本在3.1至3.3的過程當中無變化時纔會執行,不然跳轉到3.1。
  4. 直接經過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller能夠在一個RPC操做中發送多個命令從而提升效率。
      Broker failover順序圖以下所示。
    broker failover sequence diagram

  LeaderAndIsrRequest結構以下
LeaderAndIsrRequest正則表達式

  LeaderAndIsrResponse結構以下
LeaderAndIsrResponseapache

建立/刪除Topic

  1. Controller在Zookeeper的/brokers/topics節點上註冊Watch,一旦某個Topic被建立或刪除,則Controller會經過Watch獲得新建立/刪除的Topic的Partition/Replica分配。
  2. 對於刪除Topic操做,Topic工具會將該Topic名字存於/admin/delete_topics。若delete.topic.enable爲true,則Controller註冊在/admin/delete_topics上的Watch被fire,Controller經過回調向對應的Broker發送StopReplicaRequest;若爲false則Controller不會在/admin/delete_topics上註冊Watch,也就不會對該事件做出反應,此時Topic操做只被記錄而不會被執行。
  3. 對於建立Topic操做,Controller從/brokers/ids讀取當前全部可用的Broker列表,對於set_p中的每個Partition:
      3.1 從分配給該Partition的全部Replica(稱爲AR)中任選一個可用的Broker做爲新的Leader,並將AR設置爲新的ISR(由於該Topic是新建立的,因此AR中全部的Replica都沒有數據,可認爲它們都是同步的,也即都在ISR中,任意一個Replica均可做爲Leader)
      3.2 將新的Leader和ISR寫入/brokers/topics/[topic]/partitions/[partition]
  4. 直接經過RPC向相關的Broker發送LeaderAndISRRequest。
      建立Topic順序圖以下所示。
    create topic sequence diagram

Broker響應請求流程

  Broker經過kafka.network.SocketServer及相關模塊接受各類請求並做出響應。整個網絡通訊模塊基於Java NIO開發,並採用Reactor模式,其中包含1個Acceptor負責接受客戶請求,N個Processor負責讀寫數據,M個Handler處理業務邏輯。
  Acceptor的主要職責是監聽並接受客戶端(請求發起方,包括但不限於Producer,Consumer,Controller,Admin Tool)的鏈接請求,並創建和客戶端的數據傳輸通道,而後爲該客戶端指定一個Processor,至此它對該客戶端該次請求的任務就結束了,它能夠去響應下一個客戶端的鏈接請求了。其核心代碼以下。
Kafka SocketServer Acceptor_run
  
  Processor主要負責從客戶端讀取數據並將響應返回給客戶端,它自己並不處理具體的業務邏輯,而且其內部維護了一個隊列來保存分配給它的全部SocketChannel。Processor的run方法會循環從隊列中取出新的SocketChannel並將其SelectionKey.OP_READ註冊到selector上,而後循環處理已就緒的讀(請求)和寫(響應)。Processor讀取完數據後,將其封裝成Request對象並將其交給RequestChannel。
  RequestChannel是Processor和KafkaRequestHandler交換數據的地方,它包含一個隊列requestQueue用來存放Processor加入的Request,KafkaRequestHandler會從裏面取出Request來處理;同時它還包含一個respondQueue,用來存放KafkaRequestHandler處理完Request後返還給客戶端的Response。
  Processor會經過processNewResponses方法依次將requestChannel中responseQueue保存的Response取出,並將對應的SelectionKey.OP_WRITE事件註冊到selector上。當selector的select方法返回時,對檢測到的可寫通道,調用write方法將Response返回給客戶端。
  KafkaRequestHandler循環從RequestChannel中取Request並交給kafka.server.KafkaApis處理具體的業務邏輯。json

LeaderAndIsrRequest響應過程

  對於收到的LeaderAndIsrRequest,Broker主要經過ReplicaManager的becomeLeaderOrFollower處理,流程以下:微信

  1. 若請求中controllerEpoch小於當前最新的controllerEpoch,則直接返回ErrorMapping.StaleControllerEpochCode。
  2. 對於請求中partitionStateInfos中的每個元素,即((topic, partitionId), partitionStateInfo):
      2.1 若partitionStateInfo中的leader epoch大於當前ReplicManager中存儲的(topic, partitionId)對應的partition的leader epoch,則:
        2.1.1 若當前brokerid(或者說replica id)在partitionStateInfo中,則將該partition及partitionStateInfo存入一個名爲partitionState的HashMap中
        2.1.2不然說明該Broker不在該Partition分配的Replica list中,將該信息記錄於log中
      2.2不然將相應的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中
  3. 篩選出partitionState中Leader與當前Broker ID相等的全部記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
  4. 若partitionsTobeLeader不爲空,則對其執行makeLeaders方。
  5. 若partitionsToBeFollower不爲空,則對其執行makeFollowers方法。
  6. 若highwatermak線程還未啓動,則將其啓動,並將hwThreadInitialized設爲true。
  7. 關閉全部Idle狀態的Fetcher。

  LeaderAndIsrRequest處理過程以下圖所示
LeaderAndIsrRequest Flow Chart網絡

Broker啓動過程

  Broker啓動後首先根據其ID在Zookeeper的/brokers/idszonde下建立臨時子節點(Ephemeral node),建立成功後Controller的ReplicaStateMachine註冊其上的Broker Change Watch會被fire,從而經過回調KafkaController.onBrokerStartup方法完成如下步驟:架構

  1. 向全部新啓動的Broker發送UpdateMetadataRequest,其定義以下。
    UpdateMetadataRequest
  2. 將新啓動的Broker上的全部Replica設置爲OnlineReplica狀態,同時這些Broker會爲這些Partition啓動high watermark線程。
  3. 經過partitionStateMachine觸發OnlinePartitionStateChange。

Controller Failover

  Controller也須要Failover。每一個Broker都會在Controller Path (/controller)上註冊一個Watch。當前Controller失敗時,對應的Controller Path會自動消失(由於它是Ephemeral Node),此時該Watch被fire,全部「活」着的Broker都會去競選成爲新的Controller(建立新的Controller Path),可是隻會有一個競選成功(這點由Zookeeper保證)。競選成功者即爲新的Leader,競選失敗者則從新在新的Controller Path上註冊Watch。由於Zookeeper的Watch是一次性的,被fire一次以後即失效,因此須要從新註冊。
  Broker成功競選爲新Controller後會觸發KafkaController.onControllerFailover方法,並在該方法中完成以下操做:併發

  1. 讀取並增長Controller Epoch。
  2. 在ReassignedPartitions Patch(/admin/reassign_partitions)上註冊Watch。
  3. 在PreferredReplicaElection Path(/admin/preferred_replica_election)上註冊Watch。
  4. 經過partitionStateMachine在Broker Topics Patch(/brokers/topics)上註冊Watch。
  5. delete.topic.enable設置爲true(默認值是false),則partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上註冊Watch。
  6. 經過replicaStateMachine在Broker Ids Patch(/brokers/ids)上註冊Watch。
  7. 初始化ControllerContext對象,設置當前全部Topic,「活」着的Broker列表,全部Partition的Leader及ISR等。
  8. 啓動replicaStateMachine和partitionStateMachine。
  9. 將brokerState狀態設置爲RunningAsController。
  10. 將每一個Partition的Leadership信息發送給全部「活」着的Broker。
  11. auto.leader.rebalance.enable配置爲true(默認值是true),則啓動partition-rebalance線程。
  12. delete.topic.enable設置爲true且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。

Partition從新分配

  管理工具發出從新分配Partition請求後,會將相應信息寫到/admin/reassign_partitions上,而該操做會觸發ReassignedPartitionsIsrChangeListener,從而經過執行回調函數KafkaController.onPartitionReassignment來完成如下操做:app

  1. 將Zookeeper中的AR(Current Assigned Replicas)更新爲OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 強制更新Zookeeper中的leader epoch,向AR中的每一個Replica發送LeaderAndIsrRequest。
  3. 將RAR - OAR中的Replica設置爲NewReplica狀態。
  4. 等待直到RAR中全部的Replica都與其Leader同步。
  5. 將RAR中全部的Replica都設置爲OnlineReplica狀態。
  6. 將Cache中的AR設置爲RAR。
  7. 若Leader不在RAR中,則從RAR中從新選舉出一個新的Leader併發送LeaderAndIsrRequest。若新的Leader不是從RAR中選舉而出,則還要增長Zookeeper中的leader epoch。
  8. 將OAR - RAR中的全部Replica設置爲OfflineReplica狀態,該過程包含兩部分。第一,將Zookeeper上ISR中的OAR - RAR移除並向Leader發送LeaderAndIsrRequest從而通知這些Replica已經從ISR中移除;第二,向OAR - RAR中的Replica發送StopReplicaRequest從而中止再也不分配給該Partition的Replica。
  9. 將OAR - RAR中的全部Replica設置爲NonExistentReplica狀態從而將其從磁盤上刪除。
  10. 將Zookeeper中的AR設置爲RAR。
  11. 刪除/admin/reassign_partition
      
    注意:最後一步纔將Zookeeper中的AR更新,由於這是惟一一個持久存儲AR的地方,若是Controller在這一步以前crash,新的Controller仍然可以繼續完成該過程。
      如下是Partition從新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition從新分配過程當中Zookeeper中的AR和Leader/ISR路徑以下
    | AR | leader/isr | Sttep |
    |------|-----------|----------|
    | {1,2,3} | 1/{1,2,3} | (initial state) |
    | {1,2,3,4,5,6} | 1/{1,2,3} | (step 2) |
    | {1,2,3,4,5,6} | 1/{1,2,3,4,5,6} | (step 4) |
    | {1,2,3,4,5,6} | 4/{1,2,3,4,5,6} | (step 7) |
    | {1,2,3,4,5,6} | 4/{4,5,6} | (step 8) |
    | {4,5,6} | 4/{4,5,6} | (step 10) |

Follower從Leader Fetch數據

  Follower經過向Leader發送FetchRequest獲取消息,FetchRequest結構以下
FetchRequest
  從FetchRequest的結構能夠看出,每一個Fetch請求都要指定最大等待時間和最小獲取字節數,以及由TopicAndPartition和PartitionFetchInfo構成的Map。實際上,Follower從Leader數據和Consumer從Broker Fetch數據,都是經過FetchRequest請求完成,因此在FetchRequest結構中,其中一個字段是clientID,而且其默認值是ConsumerConfig.DefaultClientId。
  
  Leader收到Fetch請求後,Kafka經過KafkaApis.handleFetchRequest響應該請求,響應過程以下:

  1. replicaManager根據請求讀出數據存入dataRead中。
  2. 若是該請求來自Follower則更新其相應的LEO(log end offset)以及相應Partition的High Watermark
  3. 根據dataRead算出可讀消息長度(單位爲字節)並存入bytesReadable中。
  4. 知足下面4個條件中的1個,則當即將相應的數據返回
  • Fetch請求不但願等待,即fetchRequest.macWait <= 0
  • Fetch請求不要求必定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo爲空
  • 有足夠的數據可供返回,即bytesReadable >= fetchRequest.minBytes
  • 讀取數據時發生異常
  1. 若不知足以上4個條件,FetchRequest將不會當即返回,並將該請求封裝成DelayedFetch。檢查該DeplayedFetch是否知足,若知足則返回請求,不然將該請求加入Watch列表

  Leader經過以FetchResponse的形式將消息返回給Follower,FetchResponse結構以下
FetchResponse

Replication工具

Topic Tool

  $KAFKA_HOME/bin/kafka-topics.sh,該工具可用於建立、刪除、修改、查看某個Topic,也可用於列出全部Topic。另外,該工具還可修改如下配置。

unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes

Replica Verification Tool

  $KAFKA_HOME/bin/kafka-replica-verification.sh,該工具用來驗證所指定的一個或多個Topic下每一個Partition對應的全部Replica是否都同步。可經過topic-white-list這一參數指定所須要驗證的全部Topic,支持正則表達式。
  

Preferred Replica Leader Election Tool

用途
  有了Replication機制後,每一個Partition可能有多個備份。某個Partition的Replica列表叫做AR(Assigned Replicas),AR中的第一個Replica即爲「Preferred Replica」。建立一個新的Topic或者給已有Topic增長Partition時,Kafka保證Preferred Replica被均勻分佈到集羣中的全部Broker上。理想狀況下,Preferred Replica會被選爲Leader。以上兩點保證了全部Partition的Leader被均勻分佈到了集羣當中,這一點很是重要,由於全部的讀寫操做都由Leader完成,若Leader分佈過於集中,會形成集羣負載不均衡。可是,隨着集羣的運行,該平衡可能會由於Broker的宕機而被打破,該工具就是用來幫助恢復Leader分配的平衡。
  事實上,每一個Topic從失敗中恢復過來後,它默認會被設置爲Follower角色,除非某個Partition的Replica所有宕機,而當前Broker是該Partition的AR中第一個恢復回來的Replica。所以,某個Partition的Leader(Preferred Replica)宕機並恢復後,它極可能再也不是該Partition的Leader,但仍然是Preferred Replica。
  
原理

  1. 在Zookeeper上建立/admin/preferred_replica_election節點,並存入須要調整Preferred Replica的Partition信息。
  2. Controller一直Watch該節點,一旦該節點被建立,Controller會收到通知,並獲取該內容。
  3. Controller讀取Preferred Replica,若是發現該Replica當前並不是是Leader而且它在該Partition的ISR中,Controller向該Replica發送LeaderAndIsrRequest,使該Replica成爲Leader。若是該Replica當前並不是是Leader,且不在ISR中,Controller爲了保證沒有數據丟失,並不會將其設置爲Leader。
     
    用法

    $KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

  在包含8個Broker的Kafka集羣上,建立1個名爲topic1,replication-factor爲3,Partition數爲8的Topic,使用以下命令查看其Partition/Replica分佈。

$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181

  查詢結果以下圖所示,從圖中能夠看到,Kafka將全部Replica均勻分佈到了整個集羣,而且Leader也均勻分佈。
preferred_topic_test_1

  手動中止部分Broker,topic1的Partition/Replica分佈以下圖所示。從圖中能夠看到,因爲Broker 1/2/4都被中止,Partition 0的Leader由原來的1變爲3,Partition 1的Leader由原來的2變爲5,Partition 2的Leader由原來的3變爲6,Partition 3的Leader由原來的4變爲7。
preferred_topic_test_2  
  
  再從新啓動ID爲1的Broker,topic1的Partition/Replica分佈以下。能夠看到,雖然Broker 1已經啓動(Partition 0和Partition5的ISR中有1),可是1並非任何一個Parititon的Leader,而Broker 5/6/7都是2個Partition的Leader,即Leader的分佈不均衡——一個Broker最可能是2個Partition的Leader,而最少是0個Partition的Leader。
preferred_topic_test_3
  
  運行該工具後,topic1的Partition/Replica分佈以下圖所示。由圖可見,除了Partition 1和Partition 3因爲Broker 2和Broker 4還未啓動,因此其Leader不是其Preferred Repliac外,其它全部Partition的Leader都是其Preferred Replica。同時,與運行該工具前相比,Leader的分配更均勻——一個Broker最可能是2個Parittion的Leader,最少是1個Partition的Leader。
preferred_topic_test_4
  
  啓動Broker 2和Broker 4,Leader分佈與上一步相比並未變化,以下圖所示。
preferred_topic_test_5

  再次運行該工具,全部Partition的Leader都由其Preferred Replica承擔,Leader分佈更均勻——每一個Broker承擔1個Partition的Leader角色。
  
  除了手動運行該工具使Leader分配均勻外,Kafka還提供了自動平衡Leader分配的功能,該功能可經過將auto.leader.rebalance.enable設置爲true開啓,它將週期性檢查Leader分配是否平衡,若不平衡度超過必定閾值則自動由Controller嘗試將各Partition的Leader設置爲其Preferred Replica。檢查週期由leader.imbalance.check.interval.seconds指定,不平衡度閾值由leader.imbalance.per.broker.percentage指定。
  

Kafka Reassign Partitions Tool

用途
  該工具的設計目標與Preferred Replica Leader Election Tool有些相似,都旨在促進Kafka集羣的負載均衡。不一樣的是,Preferred Replica Leader Election只能在Partition的AR範圍內調整其Leader,使Leader分佈均勻,而該工具還能夠調整Partition的AR。
  Follower須要從Leader Fetch數據以保持與Leader同步,因此僅僅保持Leader分佈的平衡對整個集羣的負載均衡來講是不夠的。另外,生產環境下,隨着負載的增大,可能須要給Kafka集羣擴容。向Kafka集羣中增長Broker很是簡單方便,可是對於已有的Topic,並不會自動將其Partition遷移到新加入的Broker上,此時可用該工具達到此目的。某些場景下,實際負載可能遠小於最初預期負載,此時可用該工具將分佈在整個集羣上的Partition重裝分配到某些機器上,而後能夠中止不須要的Broker從而實現節約資源的目的。
  須要說明的是,該工具不只能夠調整Partition的AR位置,還可調整其AR數量,即改變該Topic的replication factor。
  
原理
  該工具只負責將所需信息存入Zookeeper中相應節點,而後退出,不負責相關的具體操做,全部調整都由Controller完成。

  1. 在Zookeeper上建立/admin/reassign_partitions節點,並存入目標Partition列表及其對應的目標AR列表。
  2. Controller註冊在/admin/reassign_partitions上的Watch被fire,Controller獲取該列表。
  3. 對列表中的全部Partition,Controller會作以下操做:
  • 啓動RAR - AR中的Replica,即新分配的Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
  • 等待新的Replica與Leader同步
  • 若是Leader不在RAR中,從RAR中選出新的Leader
  • 中止並刪除AR - RAR中的Replica,即再也不須要的Replica
  • 刪除/admin/reassign_partitions節點

用法
  該工具備三種使用模式

  • generate模式,給定須要從新分配的Topic,自動生成reassign plan(並不執行)
  • execute模式,根據指定的reassign plan從新分配Partition
  • verify模式,驗證從新分配Partition是否成功

  下面這個例子將使用該工具將Topic的全部Partition從新分配到Broker 4/5/6/7上,步驟以下:
1. 使用generate模式,生成reassign plan
  指定須要從新分配的Topic ({"topics":[{"topic":"topic1"}],"version":1}),並存入/tmp/topics-to-move.json文件中,而後執行以下命令

$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json 
--broker-list "4,5,6,7" --generate

  結果以下圖所示
reassign_1
  
2. 使用execute模式,執行reassign plan
  將上一步生成的reassignment plan存入/tmp/reassign-plan.json文件中,並執行

$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file /tmp/reassign-plan.json --execute

reassign_2

  此時,Zookeeper上/admin/reassign_partitions節點被建立,且其值與/tmp/reassign-plan.json文件的內容一致。
reassign_3

3. 使用verify模式,驗證reassign是否完成
  執行verify命令

$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file /tmp/reassign-plan.json --verify

  結果以下所示,從圖中可看出topic1的全部Partititon都從新分配成功。
reassign_4

  接下來用Topic Tool再次驗證。

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

  結果以下圖所示,從圖中可看出topic1的全部Partition都被從新分配到Broker 4/5/6/7,且每一個Partition的AR與reassign plan一致。
reassign_5

  須要說明的是,在使用execute以前,並不必定要使用generate模式自動生成reassign plan,使用generate模式只是爲了方便。事實上,某些場景下,generate模式生成的reassign plan並不必定能知足需求,此時用戶能夠本身設置reassign plan。
  

State Change Log Merge Tool

用途
  該工具旨在從整個集羣的Broker上收集狀態改變日誌,並生成一個集中的格式化的日誌以幫助診斷狀態改變相關的故障。每一個Broker都會將其收到的狀態改變相關的的指令存於名爲state-change.log的日誌文件中。某些狀況下,Partition的Leader Election可能會出現問題,此時咱們須要對整個集羣的狀態改變有個全局的瞭解從而診斷故障並解決問題。該工具將集羣中相關的state-change.log日誌按時間順序合併,同時支持用戶輸入時間範圍和目標Topic及Partition做爲過濾條件,最終將格式化的結果輸出。
  
用法

bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger 
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log 
--topic topic1 --partitions 0,1,2,3,4,5,6,7

下篇預告

  下篇文章將詳細介紹Kafka Simple Consumer API和High Level Consumer API,以及0.9.*版本中對Kafka Consumer的從新設計。

相關文章
相關標籤/搜索