Kafka 源碼解析之 topic 建立過程(三)

歡迎你們關注 github.com/hsfxuebao/j… ,但願對你們有所幫助,要是以爲能夠的話麻煩給點一下Star哈java

本文是 Kafka 源碼解析的第三篇,主要講述一個 topic 的建立過程,從 topic 是如何建立到 topic 真正建立成功的中間詳細過程,文章主要內容能夠分爲如下幾個部分:git

  1. topic 是如何建立的?
    • 命令行建立;
    • Producer 發送數據時,自動建立;
  2. topic 建立時,replicas 是如何分配的?
    • 指定 replicas 的分配;
    • 自動 replicas 分配;
  3. replicas 更新到 zk 後,底層如何建立一個 topic?
    • 建立 Partition 對象及狀態更新;
    • 建立 Partition 的 replica 對象及狀態更新。

一個 topic 的完整建立過程以下圖所示(以 topic 的 replicas 自動建立,且 broker 沒有機架感知爲例)github

Topic 完整建立過程Topic 完整建立過程算法

上圖只是列出一些主要的方法調用,具體內容下面會詳細講述(在看下面的內容時,最後配合上面這張圖來看)。apache

topic 介紹

topic 是 Kafka 中的一個消息隊列的標識,也能夠認爲是消息隊列的一個 id,用於區分不一樣的消息隊列,一個 topic 由多個 partition 組成,這些 partition 是一般是分佈在不一樣的多臺 Broker 上的,爲了保證數據的可靠性,一個 partition 又會設置爲多個副本(replica),一般會設置兩副本或三副本。以下圖所示,這個一個名爲『topic』的 topic,它由三個 partition 組成,兩副本,假設 Kafka 集羣有三臺 Broker(replica 0_1 表明 partition 0 的第一個副本)。緩存

Kafka Topic 的組成

                            Kafka Topic 的組成markdown

在設置副本時,副本數是必須小於集羣的 Broker 數的,副本只有設置在不一樣的機器上纔有做用。session

topic 如何建立

topic 在建立時有兩種方式:app

  1. 經過 kafka-topics.sh 建立一個 topic,能夠設置相應的副本數讓 Server 端自動進行 replica 分配,也能夠直接指定手動 replica 的分配;
  2. Server 端若是 auto.create.topics.enable 設置爲 true 時,那麼當 Producer 向一個不存在的 topic 發送數據時,該 topic 一樣會被建立出來,此時,副本數默認是1。

下面看一下這兩種方式的底層實現。ide

kafka-topics.sh 建立 topic

在 Kafka 的安裝目錄下,經過下面這條命令能夠建立一個 partition 爲3,replica 爲2的 topic(test)

/bin/kafka-topics.sh --create --topic test --zookeeper XXXX --partitions 3 --replication-factor 2
複製代碼

kafka-topics.sh 其實是調用 kafka.admin.TopicCommand 的方法來建立 topic,其實現以下:

//note: 建立 topic
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  val topic = opts.options.valueOf(opts.topicOpt)
  val configs = parseTopicConfigsToBeAdded(opts)
  val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
  if (Topic.hasCollisionChars(topic))
    println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
  try {
    if (opts.options.has(opts.replicaAssignmentOpt)) {//note: 指定 replica 的分配,直接向 zk 更新便可
      val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
    } else {//note: 未指定 replica 的分配,調用自動分配算法進行分配
      CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
      val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
      val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
      val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                          else RackAwareMode.Enforced
      AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
    }
    println("Created topic \"%s\".".format(topic))
  } catch  {
    case e: TopicExistsException => if (!ifNotExists) throw e
  }
}
複製代碼

若是指定了 partition 各個 replica 的分佈,那麼將 partition replicas 的結果驗證以後直接更新到 zk 上,驗證的 replicas 的代碼是在 parseReplicaAssignment 中實現的,以下所示

def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
  val partitionList = replicaAssignmentList.split(",")
  val ret = new mutable.HashMap[Int, List[Int]]()
  for (i <- 0 until partitionList.size) {
    val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
    val duplicateBrokers = CoreUtils.duplicates(brokerList)
    if (duplicateBrokers.nonEmpty)//note: 同一個 partition 對應的 replica 是不能相同的
      throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
    ret.put(i, brokerList.toList)
    if (ret(i).size != ret(0).size)//note: 同一個 topic 的副本數必須相同
      throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
  }
  ret.toMap
}
複製代碼

若是沒有指定 parittion replicas 分配的話,將會調用 AdminUtils.createTopic 方法建立 topic,這個方法首先會檢測當前的 Kafka 集羣是否機架感知,若是有的話先獲取 Broker 的機架信息,接着再使用 Replica 自動分配算法來分配 Partition 的 replica,最後就跟指定 replica 方式同樣,將 replicas 的結果更新到 zk 中。

def createTopic(zkUtils: ZkUtils,
                topic: String,
                partitions: Int,
                replicationFactor: Int,
                topicConfig: Properties = new Properties,
                rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)//note: 有機架感知的狀況下,返回 Broker 與機架之間的信息
  val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)//note: 獲取 partiiton 的 replicas 分配
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)//note: 更新到 zk 上
}
複製代碼

Producer 建立 topic

只有當 Server 端的 auto.create.topics.enable 設置爲 true 時,Producer 向一個不存在的 topic 發送數據,該 topic 纔會被自動建立。

當 Producer 在向一個 topic 發送 produce 請求前,會先經過發送 Metadata 請求來獲取這個 topic 的 metadata。Server 端在處理 Metadata 請求時,若是發現要獲取 metadata 的 topic 不存在但 Server 容許 producer 自動建立 topic 的話(若是開啓權限時,要求 Producer 須要有相應權限:對 topic 有 Describe 權限,而且對當前集羣有 Create 權限),那麼 Server 將會自動建立該 topic.

//note: 獲取 topic 的 metadata 信息
private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
  val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
  if (topics.isEmpty || topicResponses.size == topics.size) {
    topicResponses
  } else {
    val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet//note: 集羣上暫時不存在的 topic 列表
    val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
      if (topic == Topic.GroupMetadataTopicName) {
        createGroupMetadataTopic()
      } else if (config.autoCreateTopicsEnable) {//note: auto.create.topics.enable 爲 true 時,即容許自動建立 topic
        createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
      } else {
        new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false,
          java.util.Collections.emptyList())
      }
    }
    topicResponses ++ responsesForNonExistentTopics
  }
}
複製代碼

其中 createTopic 仍是調用了 AdminUtils.createTopic 來建立 topic,與命令行建立的底層實現是同樣。

private def createTopic(topic: String,
                        numPartitions: Int,
                        replicationFactor: Int,
                        properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
  try {
    //note: 仍是調用 AdminUtils 命令建立 topic
    AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
    info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
      .format(topic, numPartitions, replicationFactor))
    new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
      java.util.Collections.emptyList())
  } catch {
    case _: TopicExistsException => // let it go, possibly another broker created this topic
      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
        java.util.Collections.emptyList())
    case ex: Throwable  => // Catch all to prevent unhandled errors
      new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, Topic.isInternal(topic),
        java.util.Collections.emptyList())
  }
}
複製代碼

replica 如何分配

經過前面的內容,能夠看到,不管使用哪一種方式,最後都是經過 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() 將 topic 的 Partition replicas 的更新到 zk 上,這中間關鍵的一點在於:Partition 的 replicas 是如何分配的。在建立時,咱們既能夠指定相應 replicas 分配,也可使用默認的算法自動分配。

建立時指定 replicas 分配

在建立 topic 時,能夠經過如下形式直接指定 topic 的 replica

/bin/kafka-topics.sh --create --topic test --zookeeper XXXX --replica-assignment 1:2,3:4,5:6
複製代碼

該 topic 有三個 partition,其中,partition 0 的 replica 分佈在1和2上,partition 1 的 replica 分佈在3和4上,partition 3 的 replica 分佈在4和5上。

這樣狀況下,在建立 topic 時,Server 端會將該 replica 分佈直接更新到 zk 上。

replicas 自動分配算法

在建立 topic 時,Server 經過 AdminUtils.assignReplicasToBrokers() 方法來獲取該 topic partition 的 replicas 分配。

/**
   * 副本分配時,有三個原則:
   * 1. 將副本平均分佈在全部的 Broker 上;
   * 2. partition 的多個副本應該分配在不一樣的 Broker 上;
   * 3. 若是全部的 Broker 有機架信息的話, partition 的副本應該分配到不一樣的機架上。
   *
   * 爲實現上面的目標,在沒有機架感知的狀況下,應該按照下面兩個原則分配 replica:
   * 1. 從 broker.list 隨機選擇一個 Broker,使用 round-robin 算法分配每一個 partition 的第一個副本;
   * 2. 對於這個 partition 的其餘副本,逐漸增長 Broker.id 來選擇 replica 的分配。
   *
   * @param brokerMetadatas
   * @param nPartitions
   * @param replicationFactor
   * @param fixedStartIndex
   * @param startPartitionId
   * @return
   */
 def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                             nPartitions: Int,
                             replicationFactor: Int,
                             fixedStartIndex: Int = -1,
                             startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
   if (nPartitions <= 0) // note: 要增長的 partition 數須要大於0
     throw new InvalidPartitionsException("number of partitions must be larger than 0")
   if (replicationFactor <= 0) //note: replicas 應該大於0
     throw new InvalidReplicationFactorException("replication factor must be larger than 0")
   if (replicationFactor > brokerMetadatas.size) //note: replicas 超過了 broker 數
     throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
   if (brokerMetadatas.forall(_.rack.isEmpty))//note: 沒有開啓機架感知
     assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
       startPartitionId)
   else { //note: 機架感知的狀況
     if (brokerMetadatas.exists(_.rack.isEmpty)) //note: 並非全部的機架都有機架感知
       throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
     assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
       startPartitionId)
   }
 }
複製代碼

這裏沒有開啓機架感知模式來介紹 topic partition replicas 的分配狀況,其分配算法主要是 assignReplicasToBrokersRackUnaware() 方法中實現。

//note: partition 分配
 private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                replicationFactor: Int,
                                                brokerList: Seq[Int],
                                                fixedStartIndex: Int,
                                                startPartitionId: Int): Map[Int, Seq[Int]] = {
   val ret = mutable.Map[Int, Seq[Int]]()
   val brokerArray = brokerList.toArray
   val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) //note: 隨機選擇一個Broker
   var currentPartitionId = math.max(0, startPartitionId) //note: 開始增長的第一個 partition
   var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
   for (_ <- 0 until nPartitions) { //note: 對每一個 partition 進行分配
     if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
       nextReplicaShift += 1 //note: 防止 partition 過大時,其中某些 partition 的分配(leader、follower)徹底同樣
     val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length //note: partition 的第一個 replica
     val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
     for (j <- 0 until replicationFactor - 1) //note: 其餘 replica 的分配
       replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
     ret.put(currentPartitionId, replicaBuffer)
     currentPartitionId += 1
   }
   ret
 }

 //note: 爲 partition 設置完第一個 replica 後,其餘 replica 分配的計算
 private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
   val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)//note: 在 secondReplicaShift 的基礎上增長一個 replicaIndex
   (firstReplicaIndex + shift) % nBrokers
 }
複製代碼

這裏舉一個栗子,假設一個 Kafka 集羣有5個節點,新建的 topic 有10個 partition,而且是三副本,假設最初隨機選擇的 startIndexnextReplicaShift 節點均爲0

  • partition 爲0時,那第一副本在 (0+0)%5=0,第二個副本在 (0+(1+(0+0)%4)))%5=1,第三副本在 (0+(1+(0+1)%4)))%5=2
  • partition 爲2時,那第一副本在 (0+2)%5=2,第二個副本在 (2+(1+(0+0)%4)))%5=3,第三副本在 (2+(1+(0+1)%4)))%5=4
  • partition 爲5時,那第一副本在 (0+5)%5=0,第二個副本在 (0+(1+(1+0)%4)))%5=2,第三副本在 (0+(1+(1+1)%4)))%5=3(partition 數是 Broker 數一倍時,nextReplicaShift 值會增長1);
  • partition 爲8時,那第一副本在 (0+8)%5=3,第二個副本在 (3+(1+(1+0)%4)))%5=0,第三副本在 (3+(1+(1+1)%4)))%5=1

分配以下表所示:

replicas 更新到 zk 後觸發的操做

這一部分的內容是由 Kafka Controller 來控制的(Kafka Controller 將會在後續文章中講解),當一個 topic 的 replicas 更新到 zk 上後,監控 zk 這個目錄的方法會被觸發(TopicChangeListener.doHandleChildChange()方法),能夠配合文章第一張圖來看。

//note: 當 zk 上 topic 節點上有變動時,這個方法就會調用
    def doHandleChildChange(parentPath: String, children: Seq[String]) {
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          try {
            val currentChildren = {
              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
              children.toSet
            }
            val newTopics = currentChildren -- controllerContext.allTopics//note: 新建立的 topic 列表
            val deletedTopics = controllerContext.allTopics -- currentChildren//note: 已經刪除的 topic 列表
            controllerContext.allTopics = currentChildren

            //note: 新建立 topic 對應的 partition 列表
            val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
              !deletedTopics.contains(p._1.topic))//note: 把已經刪除 partition 過濾掉
            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)//note: 將新增的 tp-replicas 更新到緩存中
            info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
              deletedTopics, addedPartitionReplicaAssignment))
            if (newTopics.nonEmpty)//note: 處理新建的 topic
              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
          } catch {
            case e: Throwable => error("Error while handling new topic", e)
          }
        }
      }
    }
複製代碼

這個方法主要作了如下內容:

  • 獲取 zk 的 topic 變動信息,獲得新建立的 topic 列表(newTopics)以及被刪除的 topic 列表(deletedTopics);
  • deletedTopics 的 replicas 從 controller 的緩存中刪除,並將新增 topic 的 replicas 更新到 controller 的緩存中;
  • 調用 KafkaController 的 onNewTopicCreation() 建立 partition 和 replica 對象。

KafkaController 中 onNewTopicCreation() 方法先對這些 topic 註冊 PartitionChangeListener,而後再調用 onNewPartitionCreation() 方法建立 partition 和 replicas 的實例對象,topic 建立的主要實現是在 KafkaController onNewPartitionCreation() 這個方法中

//note: 當 partition state machine 監控到有新 topic 或 partition 時,這個方法將會被調用
 /**
   * 1. 註冊 partition change listener;
   * 2. 觸發 the new partition callback,也便是 onNewPartitionCreation()
   * 3. 發送 metadata 請求給全部的 Broker
   * @param topics
   * @param newPartitions
   */
 def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
   info("New topic creation callback for %s".format(newPartitions.mkString(",")))
   // subscribe to partition changes
   topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
   onNewPartitionCreation(newPartitions)
 }

 //note: topic 變化時,這個方法將會被調用
 //note: 1. 將新建立的 partition 置爲 NewPartition 狀態; 2.從 NewPartition 改成 OnlinePartition 狀態
 //note: 1. 將新建立的 Replica 置爲 NewReplica 狀態; 2.從 NewReplica 改成 OnlineReplica 狀態
 def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
   info("New partition creation callback for %s".format(newPartitions.mkString(",")))
   partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
   replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
   partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
   replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
 }
複製代碼

在詳細介紹這四個方法的調用以前,先簡單詳述一下 Partition 和 Replica 狀態機的變化。

Partition 狀態機

關於 Partition 狀態的變化能夠參考 Kafka 中的這個方法 PartitionStateMachine,狀態機的具體轉換狀況以下圖所示

Partition 狀態機Partition 狀態機

一個 Partition 對象有四種狀態:

  1. NonExistentPartition:這個 partition 不存在;
  2. NewPartition:這個 partition 剛建立,有對應的 replicas,但尚未 leader 和 isr;
  3. OnlinePartition:這個 partition 的 leader 已經選舉出來了,處理正常的工做狀態;
  4. OfflinePartition:partition 的 leader 掛了。

partition 只有在 OnlinePartition 這個狀態時,纔是可用狀態。

Replica 狀態機

關於 Replica 狀態的變化能夠參考 Kafka 中的這個方法 ReplicaStateMachine,,狀態機的具體轉換狀況以下圖所示

Replica 狀態機Replica 狀態機

Replica 對象有七種狀態,中文解釋的比較難以理解,直接上原文對這幾種狀態的解釋。

  1. NewReplica:The controller can create new replicas during partition reassignment. In this state, a replica can only get become follower state change request.
  2. OnlineReplica:Once a replica is started and part of the assigned replicas for its partition, it is in this state. In this state, it can get either become leader or become follower state change requests.
  3. OfflineReplica:If a replica dies, it moves to this state. This happens when the broker hosting the replica is down.
  4. ReplicaDeletionStarted:If replica deletion starts, it is moved to this state.
  5. ReplicaDeletionSuccessful:If replica responds with no error code in response to a delete replica request, it is moved to this state.
  6. ReplicaDeletionIneligible:If replica deletion fails, it is moved to this state.
  7. NonExistentReplica:If a replica is deleted successfully, it is moved to this state.

onNewPartitionCreation() 詳解

這個方法有如下四步操做:

  1. partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 建立 Partition 對象,並將其狀態置爲 NewPartition 狀態
  2. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):建立 Replica 對象,並將其狀態置爲 NewReplica 狀態;
  3. partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):將 partition 對象從 NewPartition 改成 OnlinePartition 狀態;
  4. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica):將 Replica 對象從 NewReplica 改成 OnlineReplica 狀態。

partitionStateMachine > NewPartition

這部分的做用是,建立分區對象,並將其狀態設置爲 NewPartition

case NewPartition =>
  //note: 新建一個 partition
  assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
  partitionState.put(topicAndPartition, NewPartition) //note: 緩存 partition 的狀態
  val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
  stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
                            .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
                                    assignedReplicas))
複製代碼

replicaStateMachine > NewReplica

這部分是爲每一個 Partition 建立對應的 replica 對象,並將其狀態設置爲 NewReplica,參照狀態機的變化圖更好理解。

case NewReplica =>
          assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)  //note: 驗證
          // start replica as a follower to the current leader for its partition
          val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
          leaderIsrAndControllerEpochOpt match {
            case Some(leaderIsrAndControllerEpoch) =>
              if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)//note: 這個狀態的 Replica 不能做爲 leader
                throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
                  .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
              //note: 向全部 replicaId 發送 LeaderAndIsr 請求,這個方法同時也會向全部的 broker 發送 updateMeta 請求
              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                                  topic, partition, leaderIsrAndControllerEpoch,
                                                                  replicaAssignment)
            case None => // new leader request will be sent to this replica when one gets elected
複製代碼

partitionStateMachine > OnlinePartition

這個方法的主要的做用是將 partition 對象的狀態由 NewPartition 設置爲 OnlinePartition,從狀態機圖中能夠看到,會有如下兩步操做:

  1. 初始化 leader 和 isr,replicas 中的第一個 replica 將做爲 leader,全部 replica 做爲 isr,並把 leader 和 isr 信息更新到 zk;
  2. 發送 LeaderAndIsr 請求給全部的 replica,發送 UpdateMetadata 給全部 Broker。

具體操做以下:

// post: partition has been assigned replicas
       case OnlinePartition =>
         assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
         partitionState(topicAndPartition) match {
           case NewPartition =>
             // initialize leader and isr path for new partition
             initializeLeaderAndIsrForPartition(topicAndPartition) //note: 爲新建的 partition 初始化 leader 和 isr
           case OfflinePartition =>
             electLeaderForPartition(topic, partition, leaderSelector)
           case OnlinePartition => // invoked when the leader needs to be re-elected
             electLeaderForPartition(topic, partition, leaderSelector)
           case _ => // should never come here since illegal previous states are checked above
         }
複製代碼

實際的操做是在 initializeLeaderAndIsrForPartition() 方法中完成,這個方法是當 partition 對象的狀態由 NewPartition 變爲 OnlinePartition 時觸發的,用來初始化該 partition 的 leader 和 isr。簡單來講,就是選取 Replicas 中的第一個 Replica 做爲 leader,全部的 Replica 做爲 isr,最後調用 brokerRequestBatch.addLeaderAndIsrRequestForBrokers 向全部 replicaId 發送 LeaderAndIsr 請求以及向全部的 broker 發送 UpdateMetadata 請求(關於 Server 對 LeaderAndIsr 和 UpdateMetadata 請求的處理將會後續文章中講述)。

//note: 當 partition 狀態由 NewPartition 變爲 OnlinePartition 時,將觸發這一方法,用來初始化 partition 的 leader 和 isr
 private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
   val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
   val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
   liveAssignedReplicas.size match {
     case 0 =>
       val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
                      "live brokers are [%s]. No assigned replica is alive.")
                        .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
       stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
       throw new StateChangeFailedException(failMsg)
     case _ =>
       debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
       // make the first replica in the list of assigned replicas, the leader
       val leader = liveAssignedReplicas.head //note: replicas 中的第一個 replica 選作 leader
       val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
         controller.epoch)
       debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
       try {
         zkUtils.createPersistentPath(
           getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
           zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))//note: zk 上初始化節點信息
         // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
         // took over and initialized this partition. This can happen if the current controller went into a long
         // GC pause
         controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
         brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
           topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)//note: 向 live 的 Replica 發送  LeaderAndIsr 請求
       } catch {
         case _: ZkNodeExistsException =>
           // read the controller epoch
           val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
             topicAndPartition.partition).get
           val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
                          "exists with value %s and controller epoch %d")
                            .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
           stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
           throw new StateChangeFailedException(failMsg)
       }
   }
複製代碼

replicaStateMachine > OnlineReplica

這一步也就是最後一步,將 Replica 對象的狀態由 NewReplica 更新爲 OnlineReplica 狀態,這些 Replica 才真正可用。

case OnlineReplica =>
          assertValidPreviousStates(partitionAndReplica,
            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
          replicaState(partitionAndReplica) match {
            case NewReplica =>
              // add this replica to the assigned replicas list for its partition
              //note: 向 the assigned replicas list 添加這個 replica(正常狀況下這些 replicas 已經更新到 list 中了)
              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
              if(!currentAssignedReplicas.contains(replicaId))
                controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                                targetState))
複製代碼

一直到這一步,一個 topic 就纔算真正被建立完成。

kafka源碼註釋分析

轉自:Kafka 源碼分析系列

相關文章
相關標籤/搜索