本文主要研究一下kafka的partition分配,主要是key到parition的映射,partition對consumer的分配,以及partition的replica對broker/machine的分配。html
在kafka0.8版本的時候,是這樣的
kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/producer/internals/Partitioner.javajava
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class Partitioner { private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); /** * Compute the partition for the given record. * * @param record The record being sent * @param cluster The current cluster metadata */ public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int numPartitions = partitions.size(); if (record.partition() != null) { // they have given us a partition, use it if (record.partition() < 0 || record.partition() >= numPartitions) throw new IllegalArgumentException("Invalid partition given with record: " + record.partition() + " is not in the range [0..." + numPartitions + "]."); return record.partition(); } else if (record.key() == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic()); if (availablePartitions.size() > 0) { int part = Utils.abs(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.abs(nextValue) % numPartitions; } } else { // hash the key to choose a partition return Utils.abs(Utils.murmur2(record.key())) % numPartitions; } } }
public interface Partitioner extends Configurable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }
而後以前的實現改成默認的實現
kafka-clients-0.9.0.1-sources.jar!/org/apache/kafka/clients/producer/internals/DefaultPartitioner.javagit
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); /** * A cheap way to deterministically convert a number to a positive value. When the input is * positive, the original value is returned. When the input number is negative, the returned * positive value is the original value bit AND against 0x7fffffff which is not its absolutely * value. * * Note: changing this method in the future will possibly cause partition selection not to be * compatible with the existing messages already placed on a partition. * * @param number a given number * @return a positive number. */ private static int toPositive(int number) { return number & 0x7fffffff; } public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } public void close() {} }
同一個group之間的consumer若是分配parition,主要看這個
kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/internals/PartitionAssignor.javagithub
/** * This interface is used to define custom partition assignment for use in * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe * to the topics they are interested in and forward their subscriptions to a Kafka broker serving * as the group coordinator. The coordinator selects one member to perform the group assignment and * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called * to perform the assignment and the results are forwarded back to each respective members * * In some cases, it is useful to forward additional metadata to the assignor in order to make * assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation * can use this user data to forward the rackId belonging to each member. */ public interface PartitionAssignor { /** * Return a serializable object representing the local member's subscription. This can include * additional information as well (e.g. local host/rack information) which can be leveraged in * {@link #assign(Cluster, Map)}. * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)} * and variants * @return Non-null subscription with optional user data */ Subscription subscription(Set<String> topics); /** * Perform the group assignment given the member subscriptions and current cluster metadata. * @param metadata Current topic/broker metadata known by consumer * @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)} * @return A map from the members to their respective assignment. This should have one entry * for all members who in the input subscription map. */ Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); /** * Callback which is invoked when a group member receives its assignment from the leader. * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} */ void onAssignment(Assignment assignment); }
內置了兩個實現apache
/** * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order * and the consumers in lexicographic order. We then divide the number of partitions by the total number of * consumers to determine the number of partitions to assign to each consumer. If it does not evenly * divide, then the first few consumers will have one extra partition. * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p1, t1p0, t1p1] * C1: [t0p2, t1p2] */ public class RangeAssignor extends AbstractPartitionAssignor { @Override public String name() { return "range"; } private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) { Map<String, List<String>> res = new HashMap<>(); for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); for (String topic : subscriptionEntry.getValue()) put(res, topic, consumerId); } return res; } @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
/** * The round robin assignor lays out all the available partitions and all the available consumers. It * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * will be within a delta of exactly one across all consumers.) * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p2, t1p1] * C1: [t0p1, t1p0, t1p2] * * When subscriptions differ across consumer instances, the assignment process still considers each * consumer instance in round robin fashion but skips over an instance if it is not subscribed to * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2, * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. * * Tha assignment will be: * C0: [t0p0] * C1: [t1p0] * C2: [t1p1, t2p0, t2p1, t2p2] */ public class RoundRobinAssignor extends AbstractPartitionAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); } return assignment; } public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { SortedSet<String> topics = new TreeSet<>(); for (List<String> subscription : subscriptions.values()) topics.addAll(subscription); List<TopicPartition> allPartitions = new ArrayList<>(); for (String topic : topics) { Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic != null) allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic)); } return allPartitions; } @Override public String name() { return "roundrobin"; } }
當topic的parition或者同一個group的consumer變更時,則會觸發parition與consumer的rebalance,來保證均衡負載,具體能夠詳見Kafka消費組(consumer group)app
kafka在建立topic的時候,須要指定paritition以及replication的數量。這個數目是提早固定好的。那麼具體partiton是到哪些mathine呢?
具體見這個類
kafka-0.10.2.1-src/core/src/main/scala/kafka/admin/AdminUtils.scaladom
/** * There are 3 goals of replica assignment: * * 1. Spread the replicas evenly among brokers. * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible * * To achieve this goal for replica assignment without considering racks, we: * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift. * * Here is an example of assigning * broker-0 broker-1 broker-2 broker-3 broker-4 * p0 p1 p2 p3 p4 (1st replica) * p5 p6 p7 p8 p9 (1st replica) * p4 p0 p1 p2 p3 (2nd replica) * p8 p9 p5 p6 p7 (2nd replica) * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) * * To create rack aware assignment, this API will first create a rack alternated broker list. For example, * from this brokerID -> rack mapping: * * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1" * * The rack alternated list will be: * * 0, 3, 1, 5, 4, 2 * * Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment * will be: * * 0 -> 0,3,1 * 1 -> 3,1,5 * 2 -> 1,5,4 * 3 -> 5,4,2 * 4 -> 4,2,0 * 5 -> 2,0,3 * * Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start * shifting the followers. This is to ensure we will not always get the same set of sequences. * In this case, if there is another partition to assign (partition #6), the assignment will be: * * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0) * * The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated * broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have * any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on * the broker list. * * As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that * each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect * situation where the number of replicas is the same as the number of racks and each rack has the same number of * brokers, it guarantees that the replica distribution is even across brokers and racks. * * @return a Map from partition id to replica ids * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to * assign each replica to a unique rack. * */ def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata], nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, startPartitionId: Int = -1): Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new InvalidPartitionsException("number of partitions must be larger than 0") if (replicationFactor <= 0) throw new InvalidReplicationFactorException("replication factor must be larger than 0") if (replicationFactor > brokerMetadatas.size) throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}") if (brokerMetadatas.forall(_.rack.isEmpty)) assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, startPartitionId) else { if (brokerMetadatas.exists(_.rack.isEmpty)) throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment") assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId) } }
這裏方法的註釋已經寫的很清楚了,不過我這裏再複述一下。ide
replica assignment有三個目標:wordpress
kafka0.10版本支持了2種replica assignment策略(對於partition來講,它也是由n個replica組成的),一種是rack unware,一種是rack-ware,這裏的rack就是機架的意思。this
rack unware(假設有5個broker,10個partition,每一個partition有3個replica
)
這種策略主要以下:
* broker-0 broker-1 broker-2 broker-3 broker-4 * p0 p1 p2 p3 p4 (1st replica) * p5 p6 p7 p8 p9 (1st replica) * p4 p0 p1 p2 p3 (2nd replica) * p8 p9 p5 p6 p7 (2nd replica) * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica)
這裏假設從broker-0開始,而後又10個partition,每一個partition有3個replica
則能夠看到p0在broker-0,p1在broker-1,依次round下來。
到了第二個replica的時候,能夠看到p0在broker-1,p1在broker-2,這樣遞增1位錯開來。
代碼
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret }
這裏假設有6個broker,3個rack,6個partition,每一個partition有3個replica
)0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
而後按rack順序round起來獲得一個新的broker-list,
0(rack1),3(rack2),1(rack3),5(rack1),4(rack2),2(rack3)
而後使用round-robbin對parition跟broker進行映射
* 0 -> 0,3,1 * 1 -> 3,1,5 * 2 -> 1,5,4 * 3 -> 5,4,2 * 4 -> 4,2,0 * 5 -> 2,0,3
partition0的三個replica分別在broker-0,broker-3,broker-1上面
partition1的三個replica分別在broker-3,broker-1,broker-5上面假設這裏parition個數大於broker個數的話,那麼對於多的parition,其第二個replica將移位開來,好比
6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
對於每一個parition的第一個replica是按rack映射後的list來round-robbin分配,以後的其餘replica則是偏向選擇尚未replica的broker,直到每一個rack都有replica以後繼續使用round-robin。
當replicas大於或等於racks數量時,則每一個rack至少有個一replica;不然的話,每一個rack至多一個replica。在理想的狀況下,replicas與racks相等,每一個rack有着相同數目的broker,這樣保證了broker和rack之間的replica均衡分佈。
代碼
private def assignReplicasToBrokersRackAware(nPartitions: Int, replicationFactor: Int, brokerMetadatas: Seq[BrokerMetadata], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) => id -> rack }.toMap val numRacks = brokerRackMap.values.toSet.size val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap) val numBrokers = arrangedBrokerList.size val ret = mutable.Map[Int, Seq[Int]]() val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size val leader = arrangedBrokerList(firstReplicaIndex) val replicaBuffer = mutable.ArrayBuffer(leader) val racksWithReplicas = mutable.Set(brokerRackMap(leader)) val brokersWithReplicas = mutable.Set(leader) var k = 0 for (_ <- 0 until replicationFactor - 1) { var done = false while (!done) { val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size)) val rack = brokerRackMap(broker) // Skip this broker if // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks // that do not have any replica, or // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 } } ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret }