本文共分爲三個部分:
- Kafka Topic建立方式
- Kafka Topic Partitions Assignment實現原理
- Kafka資源隔離方案
1. Kafka Topic建立方式
Kafka Topic建立方式有如下兩種表現形式:
(1)建立Topic時直接指定Topic Partition Replica與Kafka Broker之間的存儲映射關係
/usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeeperHost:ZooKeeperPort --create --topic TopicName --replica-assignment id0:id1:id2,id3:id4:id5,id6:id7:id8
其中,「id0:id1:id2,id3:id4:id5,id6:id7:id8」表示Topic TopicName一共有3個Partition(以「,」分隔),每一個Partition均有3個Replica(以「:」分隔),Topic Partition Replica與Kafka Broker之間的對應關係以下:
Partition0 Replica:Broker id0、Broker id一、Broker id2;
Partition1 Replica:Broker id三、Broker id四、Broker id5;
Partition2 Replica:Broker id六、Broker id七、Broker id8;
(2)建立Topic時由Kafka自動分配Topic Partition Replica與Kafka Broker之間的存儲映射關係
/usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeeperHost:ZooKeeperPort --create --topic TopicName
第(1)種方式徹底依靠人爲手工指定,這裏僅僅探討使用第(2)種方式建立Topic時,「自動分配」是如何實現的。
2. Kafka Topic Partition Replica Assignment實現原理
Replica Assignment的目標有兩個:
(1)使Partition Replica可以均勻地分配至各個Kafka Broker(負載均衡);
(2)若是Partition的第一個Replica分配至某一個Kafka Broker,那麼這個Partition的其它Replica則須要分配至其它的Kafka Brokers,即Partition Replica分配至不一樣的Broker;
注意,這裏有一個約束條件:Topic Partition Replicas Size <= Kafka Brokers Size。
「自動分配」的核心工做過程以下:
隨機選取一個StartingBroker(Broker id0、Broker id一、Broker id二、...),隨機選取IncreasingShift初始值([0,nBrokers - 1])
(1)從StartingBroker開始,使用輪詢的方式依次將各個Partition的Replicas分配至各個Broker;
對於每個Partition,Replicas的分配過程以下:
(2)Partition的第一個Replica分配至StartingBroker;
(3)根據IncreasingShift計算第n(n>=2)個Replica的Shift(即與第1個Replica的間隔量),依據Shift將其分配至相應的Broker;
(4)StartingBroker移至下一個Broker;
(5)若是Brokers已經被輪詢完一次,則IncreasingShift遞增一;不然,繼續(2)。
假設有5個Brokers(broker-0、broker-一、broker-二、broker-三、broker-4),Topic有10個Partition(p0、p一、p二、p三、p四、p五、p六、p七、p八、p9),每個Partition有3個Replica,依據上述工做過程,分配結果以下:
broker-0 broker-1 broker-2 broker-3 broker-4
p0 p1 p2 p3 p4 (1st replica)
p5 p6 p7 p8 p9 (1st replica)
p4 p0 p1 p2 p3 (2nd replica)
p8 p9 p5 p6 p7 (2nd replica)
p3 p4 p0 p1 p2 (3nd replica)
p7 p8 p9 p5 p6 (3nd replica)
詳細步驟以下:
選取broker-0做爲StartingBroker,IncreasingShift初始值爲1,
對於p0,replica1分配至broker-0,IncreasingShift爲1,因此replica2分配至broker-1,replica3分配至broker-2;
對於p1,replica1分配至broker-1,IncreasingShift爲1,因此replica2分配至broker-2,replica3分配至broker-3;
對於p2,replica1分配至broker-2,IncreasingShift爲1,因此replica2分配至broker-3,replica3分配至broker-4;
對於p3,replica1分配至broker-3,IncreasingShift爲1,因此replica2分配至broker-4,replica3分配至broker-1;
對於p4,replica1分配至broker-4,IncreasingShift爲1,因此replica2分配至broker-0,replica3分配至broker-1;
注:IncreasingShift用於計算Shift,Shift表示Partition的第n(n>=2)個Replica與第1個Replica之間的間隔量。若是IncreasingShift值爲m,那麼Partition的第2個Replica與第1個Replica的間隔量爲m + 1,第3個Replica與第1個Replica的間隔量爲m + 2,...,依次類推。Shift的取值範圍:[1,brokerSize - 1]。
此時,broker-0、broker-一、broker-二、broker-三、broker-4分別做爲StartingBroker被輪詢分配一次,繼續輪詢;但IncreasingShift遞增爲2。
對於p5,replica1分配至broker-0,IncreasingShift爲2,因此replica2分配至broker-2,replica3分配至broker-3;
對於p6,replica1分配至broker-1,IncreasingShift爲2,因此replica2分配至broker-3,replica3分配至broker-4;
對於p7,replica1分配至broker-2,IncreasingShift爲2,因此replica2分配至broker-4,replica3分配至broker-0;
對於p8,replica1分配至broker-3,IncreasingShift爲2,因此replica2分配至broker-0,replica3分配至broker-1;
對於p9,replica1分配至broker-4,IncreasingShift爲2,因此replica2分配至broker-1,replica3分配至broker-2;
此時,broker-0、broker-一、broker-二、broker-三、broker-4分別做爲StartingBroker再次被輪詢一次,若是還有其它Partition,則繼續輪詢,IncreasingShift遞增爲3,依次類推。
這裏有幾點須要注意:
(1)爲何要隨機選取StartingBroker,而不是每次都選取broker-0做爲StartingBroker?
以broker-0、broker-一、broker-二、broker-三、broker-4爲例,由於分配過程是以輪詢方式進行的,若是每次都選取broker-0做爲StartingBroker,那麼Brokers列表中的前面部分將有可能被分配相對比較多的Partition Replicas,從而致使這部分Brokers負載較高,隨機選取能夠保證相對比較好的均勻效果。
(2)爲何Brokers列表每次輪詢一次,IncreasingShift值都須要遞增1?
Kafka Topic Partition數目較多的狀況下,Partition的第1個Replica與第n(n>=2)個Replica之間的間隔量隨着IncreasingShift的變化面變化,可以更好的均勻分配Replica。
scala.kafka.admin.AdminUtils.assignReplicasToBrokers()實現上述Topic Partition Replica與Broker之間的分配過程,源碼以下:
brokerList:Kafka Brokers列表;
nPartitions:Topic待分配的Partition數目;
replicationFactor:Topic Partition Replica數目;
fixedStartIndex:若是顯示指定,默認值爲0;它的值與兩個變量值相關:startIndex和nextReplicaShift,詳情見後;
startPartitionId:從Topic的哪個Partition開始分配,一般狀況下是0,Topic增長Partition時該值不爲0。
val ret = new mutable.HashMap[Int, List[Int]]()
分配結果保存至一個Map變量ret,key爲Partition Id,value爲分配的Brokers列表。
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
startIndex表示StartingBroker,currentPartitionId表示當前爲哪一個Partition分配Brokers,nextReplicaShift表示當前的IncreasingShit值。
接下來就是一個循環,用於爲每個Partition的Replicas分配Brokers,其中Partition的第1個Replica由「(currentPartitionId + startIndex) % brokerList.size」決定,其他的Replica由「replicaIndex()」決定。
shift表示着第n(n >= 2)個Replica與第一個Replica之間的間隔量,「1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)」的計算方式很是巧妙,它保證了shift的取值範圍:[1,nBrokers](你們能夠本身體會一下)。
3. Kafka資源隔離方案
實時數據處理場景中,若是數據量比較大,爲了保證寫入/消費的吞吐量,咱們建立Topic時一般會指定比較大的Partition數目,從而使得數據儘量地被分散至更多的Partition,Partition被儘量均勻的分配至Kafka集羣中的各個Broker,從負載均衡的角度看,一切都很美好。從業務的角度看,會有資源競爭的問題,畢竟Kafka Broker機器的帶寬資源是有限的,在帶寬比較緊張的情形下,任何一個業務方的數據量波動(這裏僅指數據量增長),全部的業務方都會受到影響;從運維的角度看,會有可用性的問題,任何一臺Kafka Broker機器都負載着全部Topic的數據傳輸、存儲,若是出現宕機的狀況,將會波及到全部的Topic。針對這種狀況,咱們提出了劃分資源池的資源隔離方案:
Kafka集羣有9臺Brokers組成:broker-一、broker-二、...、broker-9,建立9個Topic:t一、t二、...、t9,每一個Topic有9個Partition(假設Replica爲1),如上圖所示,咱們將9臺Brokers切分紅3個資源池:Pool1(broker-一、broker-二、broker-3)、Pool2(broker-四、broker-五、broker-6)、Pool3(broker-七、broker-八、broker-9),Topic的分配狀況以下:
Pool1:t一、t二、t3
Pool2:t四、t五、t6
Pool3:t七、t八、t9
能夠看出,這三個資源池的物理資源是徹底獨立的,三個資源池實際上至關於三個小集羣。
這種資源池的劃分方式不但能夠作到物理資源的隔離,還能夠必定程度上解決異構機型(MEM、DISK)帶來的問題,能夠把機型類似的機器組成一個資源池。實際實施時須要綜合考慮業務狀況、機器狀況,合理劃分資源池,並根據具體的Topic狀況將其分配至合適的資源池內。
Kafka Topic的建立也變爲兩步:
(1)使用kafka-topics.sh建立Topic;
(2)使用kafka-reassign-partitions.sh移動Topic Partition Replicas至指定的資源池(具體的Brokers列表)。