kafka知識體系-集羣partitions/replicas默認分配解析

本系列主要講解kafka基本設計和原理分析,分以下內容:html

  1. 基本概念
  2. 消息模型
  3. kafka副本同步機制
  4. kafka文件存儲機制
  5. kafka數據可靠性和一致性保證
  6. kafka leader選舉
  7. kafka消息傳遞語義
  8. Kafka集羣partitions/replicas默認分配解析

Kafka集羣partitions/replicas默認分配解析

kafka在建立topic,須要指定分區數和副本的數量,本節探討分區、副本在broker上的分配狀況。算法

目標編程

replica assignment有三個目標:併發

  • 在brokers之間均分replicas
  • partition與它的其餘replicas再也不同一個broker上
  • 若是broker有rack信息,則partition的replicas儘可能分配在不一樣rack上面

kafka0.10版本支持了2種replica assignment策略(對於partition來講,它也是由n個replica組成的),一種是rack unware,一種是rack-ware,這裏的rack就是機架的意思。分佈式

rack unaware高併發

  • 隨機從broker list選一個開始,而後對每一個partition的第一個replica進行round-robin分配
  • 以後對每一個partition的其他replicas進行遞增1位錯位開來

這種策略分配算法核心代碼以下:測試

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                 replicationFactor: Int,
                                                 brokerList: Seq[Int],
                                                 fixedStartIndex: Int,
                                                 startPartitionId: Int): Map[Int, Seq[Int]] = {
    val ret = mutable.Map[Int, Seq[Int]]()
    val brokerArray = brokerList.toArray
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
    }
    ret
  }


  private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
    val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
    (firstReplicaIndex + shift) % nBrokers
  }

上述代碼含義大體以下先分配分區,再分配該分區的副本
假設咱們如今有5個broker,對topic1設置10個分區,三個副本。即
nPartitions=10,replicationFactor=3,brokerList={0,1,2,3,4},nBrokers=5this

假設從broker-0開始,有10個partition,每一個partition有3個replica
則能夠看到p0在broker-0,p1在broker-1,依次round下來。
到了第二個replica的時候,能夠看到p0在broker-1,p1在broker-2,這樣遞增1位錯開。scala

經過繼承RackAwareTest類的測試代碼以下:設計

package unit.kafka.admin

import kafka.admin.{BrokerMetadata, AdminUtils, RackAwareTest}
import kafka.utils.Logging
import org.junit.Assert._
import org.junit.Test

import scala.collection.Map

class AdminRackUnAwareTest extends RackAwareTest with Logging {
  @Test
  def testReplicaAssignment() {
    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
    println(actualAssignment)
  }
}

輸出結果爲:

Map(8 -> ArrayBuffer(3, 0, 1), 2 -> ArrayBuffer(2, 3, 4), 5 -> ArrayBuffer(0, 2, 3), 4 -> ArrayBuffer(4, 0, 1), 7 -> ArrayBuffer(2, 4, 0), 1 -> ArrayBuffer(1, 2, 3), 9 -> ArrayBuffer(4, 1, 2), 3 -> ArrayBuffer(3, 4, 0), 6 -> ArrayBuffer(1, 3, 4), 0 -> ArrayBuffer(0, 1, 2))

爲方便查看,繪圖以下:

分配策略:

首分區
    broker=i%nBrokers
副本分區
shift=1+(i/nBrokers+j)%(nBrokers-1)
broker=[i+shift]%nBrokers

針對本文狀況,i取值{0,1,2,3,4,5,6,7,8,9},j取值{0,1}

i=0
首分區
    broker=0
副本分區
i=0,j=0:shift=1,broker=1
i=0,j=1:shift=2,broker=2

i=3
首分區
    broker=3
副本分區
i=3,j=0:shift=1,broker=4
i=3,j=1:shift=2,broker=0


i=6
首分區
    broker=1
副本分區
i=6,j=0:shift=2,broker=3
i=6,j=1:shift=3,broker=4

rack aware

  • 首先對broker list跟rack進行一次映射
  • 按rack順序round起來獲得一個新的broker-list
  • 使用round-robbin對parition跟broker進行映射

核心代碼以下:

private def assignReplicasToBrokersRackAware(nPartitions: Int,
                                               replicationFactor: Int,
                                               brokerMetadatas: Seq[BrokerMetadata],
                                               fixedStartIndex: Int,
                                               startPartitionId: Int): Map[Int, Seq[Int]] = {
    val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
      id -> rack
    }.toMap
    val numRacks = brokerRackMap.values.toSet.size
    val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
    val numBrokers = arrangedBrokerList.size
    val ret = mutable.Map[Int, Seq[Int]]()
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
      val leader = arrangedBrokerList(firstReplicaIndex)
      val replicaBuffer = mutable.ArrayBuffer(leader)
      val racksWithReplicas = mutable.Set(brokerRackMap(leader))
      val brokersWithReplicas = mutable.Set(leader)
      var k = 0
      for (_ <- 0 until replicationFactor - 1) {
        var done = false
        while (!done) {
          val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
          val rack = brokerRackMap(broker)
          // Skip this broker if
          // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
          //    that do not have any replica, or
          // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
          if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
              && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
            replicaBuffer += broker
            racksWithReplicas += rack
            brokersWithReplicas += broker
            done = true
          }
          k += 1
        }
      }
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
    }
    ret
  }

假設有6個broker,3個rack,6個partition,每一個partition有3個replica
broker和rack映射以下:
0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"

按rack順序round起來獲得一個新的broker-list,
0(rack1),3(rack2),1(rack3),5(rack1),4(rack2),2(rack3)

最後使用round-robbin對parition跟broker進行映射

0 -> 0,3,1
1 -> 3,1,5
2 -> 1,5,4
3 -> 5,4,2
4 -> 4,2,0
5 -> 2,0,3

關於做者
愛編程、愛鑽研、愛分享、愛生活
關注分佈式、高併發、數據挖掘
如需捐贈,請掃碼

相關文章
相關標籤/搜索