Kafka的Repilica分配策略

提及kafka真的是極大地方便了個人工做,每次用起ta都心懷感激.好用不說,還至關穩定.網絡


愛屋及烏,我決心一探kafka的究竟.dom


對我來講最感興趣的莫過於這幾個個問題:ide


1.在建立topic的時候,kafka如何分配partition以及replica所在的位置.函數


2.要是一個broker down了,那它的replica該怎麼從新分配.this


3.若是一個broker由於2成爲了一個topic的新replica,那麼他沒有以前的那些message該怎麼辦?須要從其餘broker拉過來嗎,若是要拉,那麼數據量太大的會不會對網絡形成負載?orm


4.kafka的Ack機制.當producer發來一條消息,是經由leader轉發,仍是能夠直接放到replica所在的broker上.當一個broker存下了消息,就發ack給produce,r仍是等大多數/全部replica存下了消息再發ack給producer.rem




這篇博客先討論第一個問題.kafka


當客戶端發起create topic請求時,在broker上會有這樣的調用棧.
源碼


KafkaApis.handleCreateTopicsRequest()->adminManager.createTopics()->AdminUtils.assignReplicasToBrokers()博客

真正關於assignReplicas就在assignReplicasToBrokers這個函數中完成.


現來看AdminUtils這個類中,做者在源碼上的註釋


* There are 3 goals of replica assignment:

*

* 1. Spread the replicas evenly among brokers.

* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.

* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible

*

* To achieve this goal for replica assignment without considering racks, we:

* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.

* 2. Assign the remaining replicas of each partition with an increasing shift.


重點是下面兩條!


1.從broker-list中選定一個隨機的位置,從這個位置開始,將每個partition的第一個replica依次賦予brokerList中的broker.


好比如今有broker0~4,同時該topic有10個partition,隨機選定的起始位置是broker0,那麼就從broker0開始依次賦予partition,當partition超過了broker的數目時,再回到一開始選定的broker開始分配,就是以下效果.


* broker-0  broker-1  broker-2  broker-3  broker-4

* p0        p1        p2        p3        p4       (1st replica)

* p5        p6        p7        p8        p9       (1st replica)

2.當分配好了第一個replica以後,剩下的replica以第一個replica所在的broker爲基準,依次賦予以後的broker

好比partition0的replica0給了broker2,那麼partion0的replica1與replica2依次賦予broker3和broker4


* broker-0  broker-1  broker-2  broker-3  broker-4

* p0        p1        p2        p3        p4       (1st replica)

* p5        p6        p7        p8        p9       (1st replica)

* p4        p0        p1        p2        p3       (2nd replica)

* p8        p9        p5        p6        p7       (2nd replica)

* p3        p4        p0        p1        p2       (3nd replica)

* p7        p8        p9        p5        p6       (3nd replica)


那麼brokerList從哪裏來?

這就有關於kafka與zookeeper的協做了,kafka問zookeeper要本身所在kafka集羣的brokerList.

誰問zookeeper要brokerList?

kafka集羣會經過zookeeper選出一個集羣中的leader,由這個leader與zookeeper交互.選舉或者加入集羣成爲follower在一個broker初始化的時候完成.




下面上一些源碼,均摘自AdminUtils類.在這篇文章中,我暫且不討論考慮racker的狀況,並去掉了一些錯誤檢查的部分.


private def assignReplicasToBrokersRackUnaware(nPartitions: Int,

                                               replicationFactor: Int,

                                               brokerList: Seq[Int],

                                               fixedStartIndex: Int,

                                               startPartitionId: Int): Map[Int, Seq[Int]] = {

  val ret = mutable.Map[Int, Seq[Int]]()

  val brokerArray = brokerList.toArray

  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

  var currentPartitionId = math.max(0, startPartitionId)

  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

  for (_ <- 0 until nPartitions) {

    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))

      nextReplicaShift += 1

    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length

    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

    for (j <- 0 until replicationFactor - 1)

      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

    ret.put(currentPartitionId, replicaBuffer)

    currentPartitionId += 1

  }

  ret

}

首先關注一下這個方法的返回值,這是一個map的返回值.key是partition的ID,而value是這個partition所在的brokerList.

每一個partition的第一個replica分配方式同我上文所說的相同,而這個partiion剩下的replica分配方式與我上文所說的實現有稍微的不一樣,就是加入了nextReplicashift.總之就是通過了某些的運算,replica並非在broker之間依次分配下去的.而是間隔了nextReplicaShift個broker分配的.


private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {

  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)

  (firstReplicaIndex + shift) % nBrokers

}

能夠看到的,每兩個replica之間所間隔的broker數目取決與1.nextReplicaShit的大小 2.該replica是該partition的第幾個replica

最後的shif在不考慮shift超出了broker數目的狀況下t爲1+nextReplicaShift+replicaindex


至於爲何要這麼作,多是儘可能將備份放在間隔遠的機器上.來提升容災備份的能力吧.

相關文章
相關標籤/搜索