建立topic——kafka源碼探究之一

kafka是一款優秀的分佈式消息隊列,以吞吐量大,性能優秀著名。算法

一.相關的名詞解釋

broker:存儲消息的物理實體,無狀態,可多個擴展部署
Topic:一類消息的集合
Partition:一類消息的某個消息分區,一個topic有多個partition,可部署在多個broker上
Replicas:備份,partition級別,單個partition可有多個replicas,保證高可用
Controller:主broker,負責partition的leader選舉
Zookeeper:存儲broker、controller,consumer group等的信息,提供分佈式鎖等
ISR:in sync replica,同步副本列表,能保持fetch leader消息的replica,列表包括leader和follow
HW:high watermark,消費者能消費到的最新消息位置
LEO:log end offset,每一個partition的log最後一條message的位置。segmentfault

二.建立topic流程

kafka的topic建立邏輯分爲兩部分,命令行部分和controller的後臺邏輯部分。命令行部分是腳本直接調用的主函數,負責校驗命令以及分配replica,以後把分配結果寫入zookeeper節點,再由controller監聽zookeeper節點的變化,完成topic建立的後臺邏輯部分。最好禁用auto-create,統一建立topic。緩存

流程:腳本命令實際是運行了TopicCommand的main函數,調用TopicCommand識別命令後判斷是create命令後則調用createTopic方法。再判斷是否有手動指定副本,即--replica-assignment參數,若是沒有,則由系統自動分配,算法以下:dom

a.將全部Broker(假設共n個Broker)和待分配的Partition排序,隨機選取一個broker做爲開始分配的節點
b.假設選取第一個開始分配的broker爲b1,將partition(p11)分配到b1,p12分配到b2,以此類推…
c.第一份replica(p1)分配完後,分配第二份replica(p2)。隨機選取一個shift(步長),該shift做爲p2j與p1j的間隔。假設隨機出的shift爲s,p21分配到b(1+s),p22分配到b(2+s).

如下是kafka0.8.2.2版本中關於分配算法的註釋:
There are 3 goals of replica assignment:分佈式

  1. Spread the replicas evenly among brokers.
  2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
  3. If all brokers have rack information, assign the replicas for each partition to different racks if possible

To achieve this goal for replica assignment without considering racks, we:ide

  1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
  2. Assign the remaining replicas of each partition with an increasing shift.

Here is an example of assigning
broker-0 broker-1 broker-2 broker-3 broker-4
p0 p1 p2 p3 p4 (1st replica)
p5 p6 p7 p8 p9 (1st replica)
p4 p0 p1 p2 p3 (2nd replica)
p8 p9 p5 p6 p7 (2nd replica)
p3 p4 p0 p1 p2 (3nd replica)
p7 p8 p9 p5 p6 (3nd replica)函數

簡單來講,分配的目標就是保證同一個partition的多個replica不能在同一個broker上且儘可能在broker間均勻分配。

若是有—replica-assignment參數,參數值遵儲以下格式:id0:id1:id2,id3:id4:id5,id6:id7:id8.表示Topic一共有3個Partition(以「,」分隔),每一個Partition均有3個Replica(以「:」分隔)。

如下詳細敘述無指定參數時建立topic的具體流程:
在TopicCommand的createTopic方法中,首先從zookeeper中得到當前brokers list,以後調用assignReplicasToBrokers方法得到partition和broker的分配關係,最後把該關係寫入zk節點。性能

Kafka的controller內部的TopicChangeListener會監聽/brokers/topics目錄下子節點變化。一旦該目錄子節點數發生變化會調用相應監聽器的處理方法。先更新controller的緩存信息,以後是建立對應的分區機器副本對象,併爲每一個分區肯定leader副本及ISR。fetch

對controller而言,內部包含兩個狀態機,分區狀態機(PartitionStateMachine)和副本狀態機(ReplicaStateMachine),兩個狀態機的狀態變化如圖1,圖2所示。this

clipboard.png
圖1 分區狀態流轉

clipboard.png
圖2 副本狀態流轉

分區狀態機監聽到zk節點變動以後,主要進行了如下幾個步驟操做。

1.得到zk下新的topic列表,與當前緩存的列表相比對,肯定新增的topic,更新controller緩存的topic列表
2.從/brokers/topics/${topic}節點中取出topic全部分區的副本分配方案,再更新controller對應的部分信息
3.調用onNewTopicCreation開始建立topic,註冊分區變動監聽器(PartitionModificationListener),再調用onNewPartitionCreation建立分區
4.建立分區對象,並設置成NewPartition狀態;
5.爲每一個分區建立對應的副本對象,從controller緩存中找出分區的分配方案,而後把分區下的全部副本設置成NewReplica狀態
6.把分區狀態轉換爲OnlinePartition,首先進行leader選舉,方法是選取副本集合中的第一個副本做爲leader副本,並把整個副本集合做爲ISR,同時把這些信息更新到zk的brokers/topics/test/partitions/0/state節點,以後更新controller的leader緩存。最後將全部信息(UpdateMetadataRequest)發送到集羣中其餘的broker。
7.最後設置副本對象狀態爲OnlineReplica。

如下是詳細的流程圖:

圖片描述

broker的高可用及高伸縮——kafka源碼探究之二
https://segmentfault.com/a/11...

消息生產與消息存儲——kafka源碼探究之三
https://segmentfault.com/a/11...

相關文章
相關標籤/搜索