Kafka 依賴 Zookeeper 來維護集羣成員的信息:node
每一個 broker 都有一個惟一標識符 ID,這個標識符能夠在配置文件裏指定,也能夠自動生成。
在 broker 啓動的時候,它經過在 Zookeeper 的 /brokers/ids
路徑上建立臨時節點,把本身的 ID 註冊 Zookeeper。
Kafka 組件會訂閱 Zookeeper 的 /brokers/ids
路徑,當有 broker 加入集羣或退出集羣時,這些組件就能夠得到通知。算法
在 broker 停機、出現網絡分區或長時間垃圾回收停頓時,會致使其 Zookeeper 會話失效,致使其在啓動時建立的臨時節點會自動被移除。
監聽 broker 列表的 Kafka 組件會被告知該 broker 已移除,而後處理 broker 崩潰的後續事宜。apache
在徹底關閉一個 broker 以後,若是使用相同的 ID 啓動另外一個全新的 broker,它會當即加入集羣,並擁有與舊 broker 相同的分區和主題。網絡
controller 其實就是一個 broker,它除了具備通常 broker 的功能以外,還負責分區 leader 的選舉。併發
爲了在整個集羣中指定一個惟一的 controller,broker 集羣須要進行選舉,該過程依賴如下兩個 Zookeeper 節點:負載均衡
// 臨時節點 controller(保存最新的 controller 節點信息,保證惟一性) object ControllerZNode { def path = "/controller" def encode(brokerId: Int, timestamp: Long): Array[Byte] = { Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString).asJava) } def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => js.asJsonObject("brokerid").to[Int] } } // 永久節點 controller_epoch(保存最新 controller 對應的任期號,用於避免腦裂) object ControllerEpochZNode { def path = "/controller_epoch" def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8) def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt }
broker 啓動後會發起一輪選舉,選舉經過 Zookeeper 提供的建立節點功能來實現:性能
/controller
,建立成功的 broker 將成爲 controller。
/controller_epoch
中的任期號,其餘 broker 能夠根據任期號忽略已過時 controller 的消息。
Watcher
實時監控
/controller
節點。
broker 中的 KafkaController 對象負責發起選舉:fetch
private def elect(): Unit = { // 檢查集羣中是否存在可用 controller (activeControllerId == -1) try { // 當前 broker 經過 KafkaZkClient 發起選舉,並選舉本身爲新的 controller val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId) // 若是 broker 當選,則更新對應的 controller 相關信息 controllerContext.epoch = epoch controllerContext.epochZkVersion = epochZkVersion activeControllerId = config.brokerId info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} and epoch zk version is now ${controllerContext.epochZkVersion}") onControllerFailover() // 選舉成功後觸發維護操做 } catch { case e: ControllerMovedException => maybeResign() if (activeControllerId != -1) // 其餘 broker 被選爲 controller debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e) else // 本輪選舉沒有產生 controller warn("A controller has been elected but just resigned, this will result in another round of election", e) case t: Throwable => error(s"Error while electing or becoming controller on broker ${config.brokerId}. Trigger controller movement immediately", t) triggerControllerMove() } }
KafkaZkClient 中更新 Zookeeper 的邏輯以下:this
def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = { val timestamp = time.milliseconds() // 從 /controller_epoch 獲取當前 controller 對應的 epoch 與 zkVersion // 若 /controller_epoch 不存在則嘗試建立 val (curEpoch, curEpochZkVersion) = getControllerEpoch .map(e => (e._1, e._2.getVersion)) .getOrElse(maybeCreateControllerEpochZNode()) // 建立 /controller 並原子性更新 /controller_epoch val newControllerEpoch = curEpoch + 1 val expectedControllerEpochZkVersion = curEpochZkVersion debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion") // 處理 /controller 節點已存在的狀況,直接返回最新節點信息 def checkControllerAndEpoch(): (Int, Int) = { val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException( s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " + s"Aborting controller startup procedure")) if (controllerId == curControllerId) { val (epoch, stat) = getControllerEpoch.getOrElse( throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it")) // 若是最新的 epoch 與 newControllerEpoch 相等,則能夠推斷 zkVersion 與當前 broker 已知的 zkVersion 一致 if (epoch == newControllerEpoch) return (newControllerEpoch, stat.getVersion) } throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") } // 封裝 zookeeper 請求 def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = { val response = retryRequestUntilConnected( MultiRequest(Seq( // 發送 CreateRequest 建立 /controller 臨時節點 CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL), // 發送 SetDataRequest 更新 /controller_epoch 節點信息 SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion))) ) response.resultCode match { case Code.NODEEXISTS | Code.BADVERSION => checkControllerAndEpoch() case Code.OK => val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult] (newControllerEpoch, setDataResult.getStat.getVersion) case code => throw KeeperException.create(code) } } // 向 zookeepr 發起請求 tryCreateControllerZNodeAndIncrementEpoch() }
一個 Kafka 分區本質上就是一個備份日誌,經過利用多份相同的冗餘副本replica
保持系統高可用性。
Kafka 把分區的全部副本均勻地分配到全部 broker上,並從這些副本中挑選一個做爲 leader 副本對外提供服務。
而其餘副本被稱爲 follower 副本,不對外提供服務,只能被動地向 leader 副本請求數據,保持與 leader 副本的同步。spa
當 controller 發現一個 broker 加入集羣時,它會使用 broker.id 來檢查新加入的 broker 是否包含現有分區的副本。
若是有,controller 就把變動通知發送全部 broker,新 broker 中的分區做爲 follower 副本開始從 leader 那裏複製消息。
Kafka 爲每一個主題維護了一組同步副本集合in-sync replicas
(其中包含 leader 副本)。
只有被 ISR 中的全部副本都接收到的那部分生產者寫入的消息纔對消費者可見,這意味着 ISR 中的全部副本都會與 leader 保持同步狀態。
爲了不出現新 leader 數據不完整致使分區數據丟失的狀況,只有 ISR 中 follower 副本纔有資格被選舉爲 leader。
若 follower 副本沒法在 replica.lag.time.max.ms
毫秒內向 leader 請求數據,那麼該 follower 就會被視爲不一樣步,leader 會將其剔除出 ISR。
leader 會在 ISR 集合發生變動時,會在/isr_change_notification
下建立一個永久節點並寫入變動信息。
當監控/isr_change_notification
的 controller 接收到通知後,會更新其餘 broker 的元數據,最後刪除已處理過的節點。
當出現瞬時峯值流量,只要 follower 不是持續性落後,就不會反覆地在 ISR 中移進、移出,避免頻繁訪問 Zookeeper 影響性能。
建立主題時,Kafka 會爲每一個分區選定一個初始分區 leaderpreferred leader
,其對應的副本被稱爲首選副本preferred replica
。
controller 在建立主題時會保證 leader 在 broker 之間均衡分佈,所以當 leader 按照初始的首選副本分佈時,broker 間的負載均衡狀態最佳。
然而 broker 失效是難以免的,重啓後的首選副本只能做爲 follower 副本加入 ISR 中,不能再對外提供服務。
隨着集羣的不斷運行,leader 不均衡現象會愈發明顯:集羣中的一小部分 broker 上承載了大量的分區 leader 副本。
能夠設置 auto.leader.rebalance.enable = true
解決這一問題:
leader.imbalance.per.broker.percentage
時會自動執行一次 leader 均衡操做。
當一個新的 broker 剛加入集羣時,不會自動地分擔己有 topic 的負載,它只會對後續新增的 topic 生效。
若是要讓新增 broker 爲己有的 topic 服務,用戶必須手動地調整現有的 topic 的分區分佈,將一部分分區搬移到新增 broker 上。這就是所謂的分區重分配reassignment
操做。
除了處理 broker 擴容致使的不均衡以外,再均衡還能用於處理 broker 存儲負載不均衡的狀況,在單個或多個 broker 之間的日誌目錄之間從新分配分區。 用於解決多個代理之間的存儲負載不平衡。
觸發分區 leader 選舉的幾種場景:
當上述幾種狀況發生時,controller 會遍歷全部相關的主題分區並從爲其指定新的 leader。
而後向全部包含相關主題分區的 broker 發送更新請求,其中包含了最新的 leader 與 follower 副本分配信息。
更新完畢後,新 leader 會開始處理來自生產者和消費者的請求,而follower 開始重新 leader 那裏複製消息。
分區狀態信息在對應的節點信息:
// 節點 /brokers/topics/{topic-name}/partitions/{partition-no}/state 保存分區最新狀態信息的 object TopicPartitionStateZNode { def path(partition: TopicPartition) = s"${TopicPartitionZNode.path(partition)}/state" def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr.asJava).asJava) } def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = { Json.parseBytes(bytes).map { js => val leaderIsrAndEpochInfo = js.asJsonObject val leader = leaderIsrAndEpochInfo("leader").to[Int] val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int] val isr = leaderIsrAndEpochInfo("isr").to[List[Int]] val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int] val zkPathVersion = stat.getVersion LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch) } } }
PartitionStateMachine 管理分區選舉的代碼:
private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = { // 請求 Zookeeper 獲取 partition 當前狀態 val getDataResponses = try { zkClient.getTopicPartitionStatesRaw(partitions) } catch { case e: Exception => return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty) } val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]] val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)] getDataResponses.foreach { getDataResponse => val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition] val currState = partitionState(partition) if (getDataResponse.resultCode == Code.OK) { // 剔除狀態已失效或不存在的 partition TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match { case Some(leaderIsrAndControllerEpoch) => if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) { val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " + s"already written by another controller. This probably means that the current controller $controllerId went through " + s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}." failedElections.put(partition, Left(new StateChangeFailedException(failMsg))) } else { validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr } case None => val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state") failedElections.put(partition, Left(exception)) } } else if (getDataResponse.resultCode == Code.NONODE) { val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state") failedElections.put(partition, Left(exception)) } else { failedElections.put(partition, Left(getDataResponse.resultException.get)) } } // 若是所有 partition 均失效,則跳過這次選舉 if (validLeaderAndIsrs.isEmpty) { return (failedElections.toMap, Seq.empty) } // 根據指定的選舉策略選擇 partition leader val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match { // Elect leaders for new or offline partitions. case OfflinePartitionLeaderElectionStrategy(allowUnclean) => val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(validLeaderAndIsrs, allowUnclean) leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty) // Elect leaders for partitions that are undergoing reassignment. case ReassignPartitionLeaderElectionStrategy => leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) // Elect preferred leaders. case PreferredReplicaPartitionLeaderElectionStrategy => leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) // Elect leaders for partitions whose current leaders are shutting down. case ControlledShutdownPartitionLeaderElectionStrategy => leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) } partitionsWithoutLeaders.foreach { electionResult => val partition = electionResult.topicPartition val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy" failedElections.put(partition, Left(new StateChangeFailedException(failMsg))) } // 將選舉結果同步到 TopicPartitionStateZNode 對應的 Zookeeper 節點 val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) finishedUpdates.forKeyValue { (partition, result) => result.foreach { leaderAndIsr => val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition, leaderIsrAndControllerEpoch, replicaAssignment, isNew = false) } } (finishedUpdates ++ failedElections, updatesToRetry) }
日誌複製中的一些重要偏移概念:
base offset
:副本所含第一條消息的 offsethigh watermark
:副本最新一條己提交消息的 offsetlog end offset
:副本中下一條待寫入消息的 offset
每一個副本會同時維護 HW 與 LEO 值:
Kafka 中的複製流程大體以下:
在前面咱們提到 follower 在重啓後會對日誌進行截斷,這可能致使消息會丟失:
假設某個分區分佈在 A 和 B 兩個 broker 上,且最開始時 B 是分區 leader
爲了解決這一問題,Kafka 爲每一屆 leader 分配了一個惟一的 epoch,由其追加到日誌的消息都會包含這個 epoch。
而後每一個副本都在本地維護一個 epoch 快照文件,並在其中保存 (epoch, offset)
:
回到以前的場景,增長了 leader epoch 以後的行爲以下:
LeaderEpochRequest
請求最新的 leader epoch更多的細節能夠參考這篇文章。
建立主題時,Kafka 會爲主題的每一個分區在文件系統中建立了一個對應的子目錄,命名格式爲主題名-分區號
,每一個日誌子目錄的文件構成以下:
[lhop@localhost log]$ tree my-topic-* my-topic-0 ├── 00000000000050209130.index ├── 00000000000050209130.log ├── 00000000000050209130.snapshot ├── 00000000000050209130.timeindex └── leader-epoch-checkpoint my-topic-1 ├── 00000000000048329826.index ├── 00000000000048329826.log ├── 00000000000048329826.timeindex └── leader-epoch-checkpoint
其中的 leader-epoch-checkpoint
文件用於存儲 leader epoch 快照,用於協助崩潰的副本執行恢復操做,在此就不詳細展開。咱們重點關注剩餘的兩類文件。
日誌段文件(.log)的文件保存着真實的 Kafka 記錄。
Kafka 使用該文件第一條記錄對應的 offset 來命名此文件。
每一個日誌段文件是有上限大小的,由 broker 端參數log.segment.bytes
控制。
除了鍵、值和偏移量外,消息裏還包含了消息大小、校驗和、消息格式版本號、壓縮算法和時間戳。時間戳能夠是生產者發送消息的時間,也能夠是消息到達 broker 的時間,這個是可配置的。
若是生產者發送的是壓縮過的消息,那麼同一個批次的消息會被壓縮在一塊兒。broker 會原封不動的將消息存入磁盤,而後再把它發送給消費者。消費者在解壓這個消息以後,會看到整個批次的消息,它們都有本身的時間戳和偏移量。
這意味着 broker 可使用zero-copy
技術給消費者發送消息,同時避免了對生產者已經壓縮過的消息進行解壓和再壓縮。
位移索引文件(.index)與時間戳索引(.timeindex)是兩個特殊的索引文件:
它們都屬於稀疏索引文件,每寫入若干條記錄後才增長一個索引項。寫入間隔能夠 broker 端參數 log.index.interval.bytes
設置。
索引文件嚴格按照時間戳順序保存,所以 Kafka 能夠利用二分查找算法提升查找速度。