kafka集羣Controller競選與責任設計思路架構詳解-kafka 商業環境實戰

本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。node

1 無所不能的Controller

  • 某一個broker被選舉出來承擔特殊的角色,就是控制器Controller。緩存

  • Leader會向zookeeper上註冊Watcher,其餘broker幾乎不用監聽zookeeper的狀態變化。學習

  • Controller集羣就是用來管理和協調Kafka集羣的,具體就是管理集羣中全部分區的狀態和分區對應副本的狀態。this

  • 每個Kafka集羣任意時刻都只能有一個controller,當集羣啓動的時候,全部的broker都會參與到controller的競選,最終只能有一個broker勝出。spa

  • Controller維護的狀態分爲兩類:1:管理每一臺Broker上對應的分區副本。2:管理每個Topic分區的狀態。線程

  • KafkaController 核心代碼,其中包含副本狀態機和分區狀態機scala

    class KafkaController(val config : KafkaConfig, zkClient: ZkClient, 
      val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
          this.logIdent = "[Controller " + config.brokerId + "]: "
          private var isRunning = true
          private val stateChangeLogger = KafkaController.stateChangeLogger
          val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
    
          val partitionStateMachine = new PartitionStateMachine(this)
          val replicaStateMachine = new ReplicaStateMachine(this)
          
          private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
          onControllerResignation, config.brokerId)
          // have a separate scheduler for the controller to be able to start and stop independently of the
          // kafka server
          private val autoRebalanceScheduler = new KafkaScheduler(1)
          var deleteTopicManager: TopicDeletionManager = null
          val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
          private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
          private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
          private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
          private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
          
          private val partitionReassignedListener = new PartitionsReassignedListener(this)
          private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
    複製代碼
  • KafkaController中共定義了五種selector選舉器日誌

    一、ReassignedPartitionLeaderSelector
      從可用的ISR中選取第一個做爲leader,把當前的ISR做爲新的ISR,將重分配的副本集合做爲接收LeaderAndIsr請求的副本集合。
      二、PreferredReplicaPartitionLeaderSelector
      若是從assignedReplicas取出的第一個副本就是分區leader的話,則拋出異常,不然將第一個副本設置爲分區leader。
      三、ControlledShutdownLeaderSelector
      將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,而後選取第一個副本做爲leader,而後令當前AR做爲接收LeaderAndIsr請求的副本。
      四、NoOpLeaderSelector
      原則上不作任何事情,返回當前的leader和isr。
      五、OfflinePartitionLeaderSelector
      從活着的ISR中選擇一個broker做爲leader,若是ISR中沒有活着的副本,則從assignedReplicas中選擇一個副本做爲leader,leader選舉成功後註冊到Zookeeper中,並更新全部的緩存。
    複製代碼
  • kafka修改分區和副本數code

    ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe  --topic test1
      
      Topic:test1       PartitionCount:3        ReplicationFactor:2     Configs:
      Topic: test1      Partition: 0    Leader: 2       Replicas: 2,4   Isr: 2,4
      Topic: test1      Partition: 1    Leader: 3       Replicas: 3,5   Isr: 3,5
      Topic: test1      Partition: 2    Leader: 4       Replicas: 4,1   Isr: 4,1
    複製代碼
  • topic 分區擴容cdn

    ./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
    複製代碼

2 ReplicaStateMachine (ZK持久化副本分配方案)

  • Replica有7種狀態:

    1 NewReplica: 在partition reassignment期間KafkaController建立New replica
      
      2 OnlineReplica: 當一個replica變爲一個parition的assingned replicas時
      其狀態變爲OnlineReplica, 即一個有效的OnlineReplica
      
      3 Online狀態的parition才能轉變爲leader或isr中的一員
      
      4 OfflineReplica: 當一個broker down時, 上面的replica也隨之die, 其狀態轉變爲Onffline;
      ReplicaDeletionStarted: 當一個replica的刪除操做開始時,其狀態轉變爲ReplicaDeletionStarted
      
      5 ReplicaDeletionSuccessful: Replica成功刪除後,其狀態轉變爲ReplicaDeletionSuccessful
      
      6 ReplicaDeletionIneligible: Replica成功失敗後,其狀態轉變爲ReplicaDeletionIneligible
      
      7 NonExistentReplica:  Replica成功刪除後, 從ReplicaDeletionSuccessful狀態轉變爲NonExistentReplica狀態
    複製代碼
  • ReplicaStateMachine 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

  • startup: 啓動ReplicaStateMachine

  • initializeReplicaState: 初始化每一個replica的狀態, 若是replica所在的broker是live狀態,則此replica的狀態爲OnlineReplica。

  • 處理能夠轉換到Online狀態的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 而且發送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)

  • 當建立某個topic時,該topic下全部分區的全部副本都是NonExistent。

  • 當controller加載Zookeeper中該topic每個分區的全部副本信息到內存中,同時將副本的狀態變動爲New。

  • 以後controller選擇該分區副本列表中的第一個副本做爲分區的leader副本並設置全部副本進入ISR,而後在Zookeeper中持久化該決定。

  • 一旦肯定了分區的Leader和ISR以後,controller會將這些消息以請求的方式發送給全部的副本。

  • 同時將這些副本狀態同步到集羣的全部broker上以便讓他們知曉。

  • 最後controller 會把分區的全部副本狀態設置爲Online。

3 partitionStateMachine (根據副本分配方案建立分區)

  • Partition有以下四種狀態

    NonExistentPartition: 這個partition尚未被建立或者是建立後又被刪除了;
      NewPartition: 這個parition已建立, replicas也已分配好,但leader/isr還未就緒;
      OnlinePartition: 這個partition的leader選好;
      OfflinePartition: 這個partition的leader掛了,這個parition狀態爲OfflinePartition;
    複製代碼
  • 當建立Topic時,controller負責建立分區對象,它首先會短暫的將全部分區狀態設置爲NonExistent。

  • 以後讀取Zookeeper副本分配方案,而後令分區狀態設置爲NewPartion。

  • 處於NewPartion狀態的分區還沒有有leader和ISR,所以Controller會初始化leader和ISR信息並設置分區狀態爲OnlinePartion,此時分區正常工做。

  • 本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。

4 Controller職責所在(監聽znode狀態變化作執行)

  • UpdateMetadataRequest:更新元數據請求(好比:topic有多少個分區,每個分區的leader在哪一臺broker上以及分區的副本列表),隨着集羣的運行,這部分信息隨時均可能變動,一旦發生變動,controller會將最新的元數據廣播給全部存活的broker。具體方式就是給全部broker發送UpdateMetadataRequest請求
  • CreateTopics: 建立topic請求。當前不論是經過API方式、腳本方式(--create)抑或是CreateTopics請求方式來建立topic,作法幾乎都是在Zookeeper的/brokers/topics下建立znode來觸發建立邏輯,而controller會監聽該path下的變動來執行真正的「建立topic」邏輯
  • DeleteTopics:刪除topic請求。和CreateTopics相似,也是經過建立Zookeeper下的/admin/delete_topics/節點來觸發刪除topic,主要邏輯有:1:中止全部副本運行。2:刪除全部副本的日誌數據。3:移除zk上的 /admin/delete_topics/節點。
  • 分區重分配:即kafka-reassign-partitions腳本作的事情。一樣是與Zookeeper結合使用,腳本寫入/admin/reassign_partitions節點來觸發,controller負責按照方案分配分區。執行過程是:先擴展再伸縮機制(就副本和新副本集合同時存在)。
  • Preferred leader分配:調整分區leader副本,preferred leader選舉當前有兩種觸發方式:1. 自動觸發(auto.leader.rebalance.enable = true),controller會自動調整Preferred leader。2. kafka-preferred-replica-election腳本觸發。二者步驟相同,都是向Zookeeper的/admin/preferred_replica_election寫數據,controller提取數據執行preferred leader分配
  • 分區擴展:即增長topic分區數。標準作法也是經過kafka-reassign-partitions腳本完成,不過用戶可直接往Zookeeper中寫數據來實現,好比直接把新增分區的副本集合寫入到/brokers/topics/下,而後controller會爲你自動地選出leader並增長分區
  • 集羣擴展:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務發現的工做
  • broker崩潰:一樣地,controller經過Zookeeper可實時偵測broker狀態。一旦有broker掛掉了,controller可當即感知併爲受影響分區選舉新的leader
  • ControlledShutdown:broker除了崩潰,還能「優雅」地退出。broker一旦自行終止,controller會接收到一個ControlledShudownRequest請求,而後controller會妥善處理該請求並執行各類收尾工做
  • Controller leader選舉:controller必然要提供本身的leader選舉以防這個全局惟一的組件崩潰宕機致使服務中斷。這個功能也是經過Zookeeper的幫助實現的。

5 Controller與Broker之間的通訊機制(NIO select)

  • controller啓動時會爲集羣中的全部Broker建立一個專屬的Socket鏈接,加入有100臺broker機器,那麼controller會建立100個Socket鏈接。新版本目前統一使用NIO select ,實際上仍是要維護100個線程。

6 ControllerContext數據組件

  • controller的緩存,可謂是最重要的數據組件了,ControllerContext彙總了Zookeeper中關於kafka集羣中全部元數據信息,是controller可以正確提供服務的基礎。

本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。

7 總結

kafka集羣Controller主要幹經過ZK持久化副本分配方案,根據副本分配方案建立分區,監聽ZK znode狀態變化作執行處理,維護分區和副本ISR機制穩定運行。感謝huxihx技術博客以及相關書籍,讓我理解了Controller核心機制,寫一篇學習筆記,做爲總結,辛苦成文,實屬不易,謝謝。

秦凱新 於深圳 201812021541

相關文章
相關標籤/搜索