本系列主要講解kafka基本設計和原理分析,分以下內容:html
kafka在建立topic,須要指定分區數和副本的數量,本節探討分區、副本在broker上的分配狀況。算法
目標編程
replica assignment有三個目標:併發
kafka0.10版本支持了2種replica assignment策略(對於partition來講,它也是由n個replica組成的),一種是rack unware,一種是rack-ware,這裏的rack就是機架的意思。分佈式
rack unaware高併發
這種策略分配算法核心代碼以下:測試
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret } private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers }
上述代碼含義大體以下先分配分區,再分配該分區的副本
假設咱們如今有5個broker,對topic1設置10個分區,三個副本。即
nPartitions=10,replicationFactor=3,brokerList={0,1,2,3,4},nBrokers=5
this
假設從broker-0開始,有10個partition,每一個partition有3個replica
則能夠看到p0在broker-0,p1在broker-1,依次round下來。
到了第二個replica的時候,能夠看到p0在broker-1,p1在broker-2,這樣遞增1位錯開。scala
經過繼承RackAwareTest類的測試代碼以下:設計
package unit.kafka.admin import kafka.admin.{BrokerMetadata, AdminUtils, RackAwareTest} import kafka.utils.Logging import org.junit.Assert._ import org.junit.Test import scala.collection.Map class AdminRackUnAwareTest extends RackAwareTest with Logging { @Test def testReplicaAssignment() { val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None)) val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0) println(actualAssignment) } }
輸出結果爲:
Map(8 -> ArrayBuffer(3, 0, 1), 2 -> ArrayBuffer(2, 3, 4), 5 -> ArrayBuffer(0, 2, 3), 4 -> ArrayBuffer(4, 0, 1), 7 -> ArrayBuffer(2, 4, 0), 1 -> ArrayBuffer(1, 2, 3), 9 -> ArrayBuffer(4, 1, 2), 3 -> ArrayBuffer(3, 4, 0), 6 -> ArrayBuffer(1, 3, 4), 0 -> ArrayBuffer(0, 1, 2))
爲方便查看,繪圖以下:
分配策略:
首分區 broker=i%nBrokers 副本分區 shift=1+(i/nBrokers+j)%(nBrokers-1) broker=[i+shift]%nBrokers
針對本文狀況,i取值{0,1,2,3,4,5,6,7,8,9}
,j取值{0,1}
i=0 首分區 broker=0 副本分區 i=0,j=0:shift=1,broker=1 i=0,j=1:shift=2,broker=2 i=3 首分區 broker=3 副本分區 i=3,j=0:shift=1,broker=4 i=3,j=1:shift=2,broker=0 i=6 首分區 broker=1 副本分區 i=6,j=0:shift=2,broker=3 i=6,j=1:shift=3,broker=4
rack aware
核心代碼以下:
private def assignReplicasToBrokersRackAware(nPartitions: Int, replicationFactor: Int, brokerMetadatas: Seq[BrokerMetadata], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) => id -> rack }.toMap val numRacks = brokerRackMap.values.toSet.size val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap) val numBrokers = arrangedBrokerList.size val ret = mutable.Map[Int, Seq[Int]]() val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size val leader = arrangedBrokerList(firstReplicaIndex) val replicaBuffer = mutable.ArrayBuffer(leader) val racksWithReplicas = mutable.Set(brokerRackMap(leader)) val brokersWithReplicas = mutable.Set(leader) var k = 0 for (_ <- 0 until replicationFactor - 1) { var done = false while (!done) { val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size)) val rack = brokerRackMap(broker) // Skip this broker if // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks // that do not have any replica, or // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 } } ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret }
假設有6個broker,3個rack,6個partition,每一個partition有3個replica
broker和rack映射以下:
0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
按rack順序round起來獲得一個新的broker-list,
0(rack1),3(rack2),1(rack3),5(rack1),4(rack2),2(rack3)
最後使用round-robbin對parition跟broker進行映射
0 -> 0,3,1 1 -> 3,1,5 2 -> 1,5,4 3 -> 5,4,2 4 -> 4,2,0 5 -> 2,0,3
關於做者
愛編程、愛鑽研、愛分享、愛生活
關注分佈式、高併發、數據挖掘
如需捐贈,請掃碼