KafkaBroker 簡析

Kafka 依賴 Zookeeper 來維護集羣成員的信息:node

  • Kafka 使用 Zookeeper 的臨時節點來選舉 controller
  • Zookeeper 在 broker 加入集羣或退出集羣時通知 controller
  • controller 負責在 broker 加入或離開集羣時進行分區 leader 選舉

broker 管理

每一個 broker 都有一個惟一標識符 ID,這個標識符能夠在配置文件裏指定,也能夠自動生成。
在 broker 啓動的時候,它經過在 Zookeeper 的 /brokers/ids 路徑上建立臨時節點,把本身的 ID 註冊 Zookeeper。
Kafka 組件會訂閱 Zookeeper 的 /brokers/ids 路徑,當有 broker 加入集羣或退出集羣時,這些組件就能夠得到通知。算法

在 broker 停機、出現網絡分區或長時間垃圾回收停頓時,會致使其 Zookeeper 會話失效,致使其在啓動時建立的臨時節點會自動被移除。
監聽 broker 列表的 Kafka 組件會被告知該 broker 已移除,而後處理 broker 崩潰的後續事宜。apache

在徹底關閉一個 broker 以後,若是使用相同的 ID 啓動另外一個全新的 broker,它會當即加入集羣,並擁有與舊 broker 相同的分區和主題。網絡

controller 選舉

controller 其實就是一個 broker,它除了具備通常 broker 的功能以外,還負責分區 leader 的選舉。併發

爲了在整個集羣中指定一個惟一的 controller,broker 集羣須要進行選舉,該過程依賴如下兩個 Zookeeper 節點:負載均衡

// 臨時節點 controller(保存最新的 controller 節點信息,保證惟一性)
object ControllerZNode {
  def path = "/controller"
  def encode(brokerId: Int, timestamp: Long): Array[Byte] = {
    Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString).asJava)
  }
  def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
    js.asJsonObject("brokerid").to[Int]
  }
}

// 永久節點 controller_epoch(保存最新 controller 對應的任期號,用於避免腦裂)
object ControllerEpochZNode {
  def path = "/controller_epoch"
  def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8)
  def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt
}

broker 啓動後會發起一輪選舉,選舉經過 Zookeeper 提供的建立節點功能來實現:性能

全部 broker 啓動後都會嘗試搶佔建立臨時節點 /controller,建立成功的 broker 將成爲 controller。
新選出的 controller 會同時遞增 /controller_epoch 中的任期號,其餘 broker 能夠根據任期號忽略已過時 controller 的消息。
搶佔失敗的 broker 會收到一個 NODEEXISTS 響應,轉而在節點上建立 Watcher實時監控 /controller節點。
當 controller 被關閉或者斷開鏈接,Zookeeper 上的臨時節點就會消失,集羣裏的其餘 broker 會接收到通知併發起一輪新的選舉。

broker 中的 KafkaController 對象負責發起選舉:fetch

private def elect(): Unit = {

    // 檢查集羣中是否存在可用 controller (activeControllerId == -1)

    try {

      // 當前 broker 經過 KafkaZkClient 發起選舉,並選舉本身爲新的 controller
      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)

      // 若是 broker 當選,則更新對應的 controller 相關信息
      controllerContext.epoch = epoch
      controllerContext.epochZkVersion = epochZkVersion
      activeControllerId = config.brokerId

      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} and epoch zk version is now ${controllerContext.epochZkVersion}")

      onControllerFailover() // 選舉成功後觸發維護操做
    } catch {
      case e: ControllerMovedException =>
        maybeResign()
        
        if (activeControllerId != -1) // 其餘 broker 被選爲 controller
          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
        else // 本輪選舉沒有產生 controller
          warn("A controller has been elected but just resigned, this will result in another round of election", e)
      
      case t: Throwable =>
        error(s"Error while electing or becoming controller on broker ${config.brokerId}. Trigger controller movement immediately", t)
        triggerControllerMove()
    }
  }

KafkaZkClient 中更新 Zookeeper 的邏輯以下:this

def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = {
    val timestamp = time.milliseconds()

    // 從 /controller_epoch 獲取當前 controller 對應的 epoch 與 zkVersion
    // 若 /controller_epoch 不存在則嘗試建立
    val (curEpoch, curEpochZkVersion) = getControllerEpoch
      .map(e => (e._1, e._2.getVersion))
      .getOrElse(maybeCreateControllerEpochZNode())

    // 建立 /controller 並原子性更新 /controller_epoch
    val newControllerEpoch = curEpoch + 1
    val expectedControllerEpochZkVersion = curEpochZkVersion

    debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion")

    // 處理 /controller 節點已存在的狀況,直接返回最新節點信息
    def checkControllerAndEpoch(): (Int, Int) = {
      val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException(
        s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " +
          s"Aborting controller startup procedure"))
      if (controllerId == curControllerId) {
        val (epoch, stat) = getControllerEpoch.getOrElse(
          throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it"))

        // 若是最新的 epoch 與 newControllerEpoch 相等,則能夠推斷 zkVersion 與當前 broker 已知的 zkVersion 一致
        if (epoch == newControllerEpoch)
          return (newControllerEpoch, stat.getVersion)
      }
      throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
    }

    // 封裝 zookeeper 請求
    def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = {
      val response = retryRequestUntilConnected(
        MultiRequest(Seq(
          // 發送 CreateRequest 建立 /controller 臨時節點
          CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL),
          // 發送 SetDataRequest 更新 /controller_epoch 節點信息
          SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)))
      )
      response.resultCode match {
        case Code.NODEEXISTS | Code.BADVERSION => checkControllerAndEpoch()
        case Code.OK =>
          val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult]
          (newControllerEpoch, setDataResult.getStat.getVersion)
        case code => throw KeeperException.create(code)
      }
    }

    // 向 zookeepr 發起請求 
    tryCreateControllerZNodeAndIncrementEpoch()
  }

分區管理

一個 Kafka 分區本質上就是一個備份日誌,經過利用多份相同的冗餘副本replica保持系統高可用性。
Kafka 把分區的全部副本均勻地分配到全部 broker上,並從這些副本中挑選一個做爲 leader 副本對外提供服務。
而其餘副本被稱爲 follower 副本,不對外提供服務,只能被動地向 leader 副本請求數據,保持與 leader 副本的同步。spa

當 controller 發現一個 broker 加入集羣時,它會使用 broker.id 來檢查新加入的 broker 是否包含現有分區的副本。
若是有,controller 就把變動通知發送全部 broker,新 broker 中的分區做爲 follower 副本開始從 leader 那裏複製消息。

ISR

Kafka 爲每一個主題維護了一組同步副本集合in-sync replicas(其中包含 leader 副本)。
只有被 ISR 中的全部副本都接收到的那部分生產者寫入的消息纔對消費者可見,這意味着 ISR 中的全部副本都會與 leader 保持同步狀態。
爲了不出現新 leader 數據不完整致使分區數據丟失的狀況,只有 ISR 中 follower 副本纔有資格被選舉爲 leader。
若 follower 副本沒法在 replica.lag.time.max.ms 毫秒內向 leader 請求數據,那麼該 follower 就會被視爲不一樣步,leader 會將其剔除出 ISR。
leader 會在 ISR 集合發生變動時,會在/isr_change_notification下建立一個永久節點並寫入變動信息。
當監控/isr_change_notification的 controller 接收到通知後,會更新其餘 broker 的元數據,最後刪除已處理過的節點。
當出現瞬時峯值流量,只要 follower 不是持續性落後,就不會反覆地在 ISR 中移進、移出,避免頻繁訪問 Zookeeper 影響性能。

首選副本

建立主題時,Kafka 會爲每一個分區選定一個初始分區 leaderpreferred leader,其對應的副本被稱爲首選副本preferred replica

controller 在建立主題時會保證 leader 在 broker 之間均衡分佈,所以當 leader 按照初始的首選副本分佈時,broker 間的負載均衡狀態最佳。

然而 broker 失效是難以免的,重啓後的首選副本只能做爲 follower 副本加入 ISR 中,不能再對外提供服務。

隨着集羣的不斷運行,leader 不均衡現象會愈發明顯:集羣中的一小部分 broker 上承載了大量的分區 leader 副本

能夠設置 auto.leader.rebalance.enable = true 解決這一問題:

broker 會按期在後臺計算其上非首選副本 leader 數量,當該值其與上的總分區數比例超過了 leader.imbalance.per.broker.percentage 時會自動執行一次 leader 均衡操做。

分區重分配

當一個新的 broker 剛加入集羣時,不會自動地分擔己有 topic 的負載,它只會對後續新增的 topic 生效。

若是要讓新增 broker 爲己有的 topic 服務,用戶必須手動地調整現有的 topic 的分區分佈,將一部分分區搬移到新增 broker 上。這就是所謂的分區重分配reassignment操做。

除了處理 broker 擴容致使的不均衡以外,再均衡還能用於處理 broker 存儲負載不均衡的狀況,在單個或多個 broker 之間的日誌目錄之間從新分配分區。 用於解決多個代理之間的存儲負載不平衡。


首領選舉

觸發分區 leader 選舉的幾種場景:

  • Offline:建立新分區或分區失去現有 leader
  • Reassign:用戶執行重分配操做
  • PreferredReplica:將 leader 遷移回首選副本
  • ControlledShutdown:分區的現有 leader 即將下線

當上述幾種狀況發生時,controller 會遍歷全部相關的主題分區並從爲其指定新的 leader。

而後向全部包含相關主題分區的 broker 發送更新請求,其中包含了最新的 leader 與 follower 副本分配信息。

更新完畢後,新 leader 會開始處理來自生產者和消費者的請求,而follower 開始重新 leader 那裏複製消息。

分區狀態信息在對應的節點信息:

// 節點 /brokers/topics/{topic-name}/partitions/{partition-no}/state 保存分區最新狀態信息的
object TopicPartitionStateZNode {
  def path(partition: TopicPartition) = s"${TopicPartitionZNode.path(partition)}/state"
  def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
    val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
    val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
    Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
      "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr.asJava).asJava)
  }
  def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
    Json.parseBytes(bytes).map { js =>
      val leaderIsrAndEpochInfo = js.asJsonObject
      val leader = leaderIsrAndEpochInfo("leader").to[Int]
      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
      val zkPathVersion = stat.getVersion
      LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
    }
  }
}

PartitionStateMachine 管理分區選舉的代碼:

private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
  ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
    
    // 請求 Zookeeper 獲取 partition 當前狀態  
    val getDataResponses = try {
      zkClient.getTopicPartitionStatesRaw(partitions)
    } catch {
      case e: Exception =>
        return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
    }

    val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
    val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]

    getDataResponses.foreach { getDataResponse =>
      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
      val currState = partitionState(partition)
      if (getDataResponse.resultCode == Code.OK) {
        // 剔除狀態已失效或不存在的 partition
        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
          case Some(leaderIsrAndControllerEpoch) =>
            if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
              val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
                s"already written by another controller. This probably means that the current controller $controllerId went through " +
                s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
              failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
            } else { 
              validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
            }

          case None =>
            val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
            failedElections.put(partition, Left(exception))
        }

      } else if (getDataResponse.resultCode == Code.NONODE) {
        val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
        failedElections.put(partition, Left(exception))
      } else {
        failedElections.put(partition, Left(getDataResponse.resultException.get))
      }
    }

    // 若是所有 partition 均失效,則跳過這次選舉
    if (validLeaderAndIsrs.isEmpty) {
      return (failedElections.toMap, Seq.empty)
    }

    // 根據指定的選舉策略選擇 partition leader
    val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
      // Elect leaders for new or offline partitions.
      case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(validLeaderAndIsrs, allowUnclean)
        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
      // Elect leaders for partitions that are undergoing reassignment.
      case ReassignPartitionLeaderElectionStrategy =>
        leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
      // Elect preferred leaders.
      case PreferredReplicaPartitionLeaderElectionStrategy =>
        leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
      // Elect leaders for partitions whose current leaders are shutting down.
      case ControlledShutdownPartitionLeaderElectionStrategy =>
        leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
    }

    partitionsWithoutLeaders.foreach { electionResult =>
      val partition = electionResult.topicPartition
      val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
    }

    // 將選舉結果同步到 TopicPartitionStateZNode 對應的 Zookeeper 節點
    val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
    val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
    val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)

    finishedUpdates.forKeyValue { (partition, result) =>
      result.foreach { leaderAndIsr =>
        val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
        controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
          leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
      }
    }

    (finishedUpdates ++ failedElections, updatesToRetry)
  }

日誌複製

複製流程

日誌複製中的一些重要偏移概念:

  • 起始位移base offset:副本所含第一條消息的 offset
  • 高水位值high watermark:副本最新一條己提交消息的 offset
  • 日誌末端位移log end offset:副本中下一條待寫入消息的 offset

每一個副本會同時維護 HW 與 LEO 值:

  • leader 保證只有 HW 及其以前的消息,纔對消費者是可見的。
  • follower 宕機後重啓時會對其日誌截斷,只保留 HW 及其以前的日誌消息(新版本有改動)。

Kafka 中的複製流程大體以下:

  • leader 會將接收到的消息寫入日誌文件,同時更新 \(\tiny \textsf{leader LEO}\)
  • follower 發送 fetch 請求指定 offset 的消息
  • 接收到請求後 leader 會根據 offset 更新下面兩個值
    • \(\tiny \textsf{follower LEO} = \texttt{offset}\)
    • \(\tiny \textsf{HW} = \min(\textsf{leader LEO}, \min(\textsf{follower LEO of ISR}))\)
  • leader 返回消息的同時會附帶上最新的 HW
  • follower 接收到響應後會將消息寫入日誌文件,並同時更新 HW

leader epoch

在前面咱們提到 follower 在重啓後會對日誌進行截斷,這可能致使消息會丟失:

假設某個分區分佈在 A 和 B 兩個 broker 上,且最開始時 B 是分區 leader

  • 某個時刻,follower A 從 leader B 同步消息 m2,但此時並未收到 HW 更新
  • 就在此時,follower A 發生了重啓,此時它會截斷 m2 所在的日誌,而後才向 leader B 從新請求數據
  • 不巧,此時 leader B 也發生了宕機,此時 follower A 會被選爲新的 leader A,這意味着消息 m2 已經永久丟失了

爲了解決這一問題,Kafka 爲每一屆 leader 分配了一個惟一的 epoch,由其追加到日誌的消息都會包含這個 epoch。

而後每一個副本都在本地維護一個 epoch 快照文件,並在其中保存 (epoch, offset)

  • epoch 表示 leader 的版本號,當 leader 變動一次 epoch 就會加 1
  • offset 則對應 epoch 版本的 leader 寫入第一條消息的對於的位移

回到以前的場景,增長了 leader epoch 以後的行爲以下:

  • follower A 發生重啓後,會向 leader B 發送 LeaderEpochRequest 請求最新的 leader epoch
  • leader B 會在響應中返回本身的 LEO
  • follower A 接收到響應後發現無需對日誌進行截斷,從而避免了消息 m2 丟失

更多的細節能夠參考這篇文章

文件格式

建立主題時,Kafka 會爲主題的每一個分區在文件系統中建立了一個對應的子目錄,命名格式爲主題名-分區號,每一個日誌子目錄的文件構成以下:

[lhop@localhost log]$ tree my-topic-*
my-topic-0
├── 00000000000050209130.index
├── 00000000000050209130.log
├── 00000000000050209130.snapshot
├── 00000000000050209130.timeindex
└── leader-epoch-checkpoint
my-topic-1
├── 00000000000048329826.index
├── 00000000000048329826.log
├── 00000000000048329826.timeindex
└── leader-epoch-checkpoint

其中的 leader-epoch-checkpoint文件用於存儲 leader epoch 快照,用於協助崩潰的副本執行恢復操做,在此就不詳細展開。咱們重點關注剩餘的兩類文件。


數據文件

日誌段文件(.log)的文件保存着真實的 Kafka 記錄。
Kafka 使用該文件第一條記錄對應的 offset 來命名此文件。
每一個日誌段文件是有上限大小的,由 broker 端參數log.segment.bytes控制。

除了鍵、值和偏移量外,消息裏還包含了消息大小、校驗和、消息格式版本號、壓縮算法和時間戳。時間戳能夠是生產者發送消息的時間,也能夠是消息到達 broker 的時間,這個是可配置的。

若是生產者發送的是壓縮過的消息,那麼同一個批次的消息會被壓縮在一塊兒。broker 會原封不動的將消息存入磁盤,而後再把它發送給消費者。消費者在解壓這個消息以後,會看到整個批次的消息,它們都有本身的時間戳和偏移量。

這意味着 broker 可使用zero-copy技術給消費者發送消息,同時避免了對生產者已經壓縮過的消息進行解壓和再壓縮。


索引文件

位移索引文件(.index)與時間戳索引(.timeindex)是兩個特殊的索引文件:

  • 前者能夠幫助快速定位記錄所在的物理文件位置
  • 後者則是根據給定的時間戳查找對應的位移信息

它們都屬於稀疏索引文件,每寫入若干條記錄後才增長一個索引項。寫入間隔能夠 broker 端參數 log.index.interval.bytes 設置。

索引文件嚴格按照時間戳順序保存,所以 Kafka 能夠利用二分查找算法提升查找速度。

相關文章
相關標籤/搜索