提及kafka真的是極大地方便了個人工做,每次用起ta都心懷感激.好用不說,還至關穩定.網絡
愛屋及烏,我決心一探kafka的究竟.dom
對我來講最感興趣的莫過於這幾個個問題:ide
1.在建立topic的時候,kafka如何分配partition以及replica所在的位置.函數
2.要是一個broker down了,那它的replica該怎麼從新分配.this
3.若是一個broker由於2成爲了一個topic的新replica,那麼他沒有以前的那些message該怎麼辦?須要從其餘broker拉過來嗎,若是要拉,那麼數據量太大的會不會對網絡形成負載?orm
4.kafka的Ack機制.當producer發來一條消息,是經由leader轉發,仍是能夠直接放到replica所在的broker上.當一個broker存下了消息,就發ack給produce,r仍是等大多數/全部replica存下了消息再發ack給producer.rem
這篇博客先討論第一個問題.kafka
當客戶端發起create topic請求時,在broker上會有這樣的調用棧.
源碼
KafkaApis.handleCreateTopicsRequest()->adminManager.createTopics()->AdminUtils.assignReplicasToBrokers()博客
真正關於assignReplicas就在assignReplicasToBrokers這個函數中完成.
現來看AdminUtils這個類中,做者在源碼上的註釋
* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
*
* To achieve this goal for replica assignment without considering racks, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
重點是下面兩條!
1.從broker-list中選定一個隨機的位置,從這個位置開始,將每個partition的第一個replica依次賦予brokerList中的broker.
好比如今有broker0~4,同時該topic有10個partition,隨機選定的起始位置是broker0,那麼就從broker0開始依次賦予partition,當partition超過了broker的數目時,再回到一開始選定的broker開始分配,就是以下效果.
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
2.當分配好了第一個replica以後,剩下的replica以第一個replica所在的broker爲基準,依次賦予以後的broker
好比partition0的replica0給了broker2,那麼partion0的replica1與replica2依次賦予broker3和broker4
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
那麼brokerList從哪裏來?
這就有關於kafka與zookeeper的協做了,kafka問zookeeper要本身所在kafka集羣的brokerList.
誰問zookeeper要brokerList?
kafka集羣會經過zookeeper選出一個集羣中的leader,由這個leader與zookeeper交互.選舉或者加入集羣成爲follower在一個broker初始化的時候完成.
下面上一些源碼,均摘自AdminUtils類.在這篇文章中,我暫且不討論考慮racker的狀況,並去掉了一些錯誤檢查的部分.
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
首先關注一下這個方法的返回值,這是一個map的返回值.key是partition的ID,而value是這個partition所在的brokerList.
每一個partition的第一個replica分配方式同我上文所說的相同,而這個partiion剩下的replica分配方式與我上文所說的實現有稍微的不一樣,就是加入了nextReplicashift.總之就是通過了某些的運算,replica並非在broker之間依次分配下去的.而是間隔了nextReplicaShift個broker分配的.
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
能夠看到的,每兩個replica之間所間隔的broker數目取決與1.nextReplicaShit的大小 2.該replica是該partition的第幾個replica
最後的shif在不考慮shift超出了broker數目的狀況下t爲1+nextReplicaShift+replicaindex
至於爲何要這麼作,多是儘可能將備份放在間隔遠的機器上.來提升容災備份的能力吧.