(基於0.10版本)node
Kafka的coordiantor要作的事情就是group management,就是要對一個團隊(或者叫組)的成員進行管理。Group management就是要作這些事情:算法
Kafka爲其設計了一個協議,就收作Group Management Protocol.apache
很明顯,consumer group所要作的事情,是能夠用group management 協議作到的。而cooridnator, 及這個協議,也是爲了實現不依賴Zookeeper的高級消費者而提出並實現的。只不過,Kafka對高級消費者的成員管理行爲進行了抽象,抽象出來group management功能共有的邏輯,以此設計了Group Management Protocol, 使得這個協議不僅適用於Kafka consumer(目前Kafka Connect也在用它),也能夠做爲其它"group"的管理協議。數組
那麼,這個協議抽象出來了哪些group management共有的邏輯呢? Kafka Consumer的AbstractCoordinator的註釋給出了一些答案。網絡
AbstractCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. appSee ConsumerCoordinator for example usage.框架
From a high level, Kafka's group management protocol consists of the following sequence of actions:ide
- Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
- Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
- State Assignment: The leader collects the metadata from all the members of the group and assigns state.
- Group Stabilization: Each member receives the state assigned by the leader and begins processing.
To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in performAssignment(String, String, Map) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer).性能
首先,AbstractorCoordinator是位於broker端的coordinator的客戶端。這段註釋裏的"The cooridnator"都是指broker端的那個cooridnator,而不是AbstractCoordiantor。AbstractCoordinator和broker端的coordinator的分工,能夠從註釋裏大體看出來。這段註釋說,Kafka的group management protocol包括如下的動做序列:fetch
這裏邊有三個角色:coordinator, group memeber, group leader.
有這麼幾個狀況:
要回答這些問題,就要看代碼了。AbstractCoordinator的註釋還沒完,它接下來這麼說:
To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in performAssignment(String, String, Map) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer).
這是說AbstractCoordinator的實現必須實現三個方法: metadata(), performAssignment(String, String, Map)和onJoinComplete(int, String, String, ByteBuffer)。
從這三個方法入手,能夠了解Group Management Protocol的一些細節。
metadata()
protected abstract List<ProtocolMetadata> metadata();Get the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).
Returns:
Non-empty map of supported protocols and metadata
這個方法返回的是這個group member所支持的協議,以及適用於生個協議的protocol。這些數據會提交給coordinator,coordinator會考慮到全部成員支持的協議,來爲它們選擇一個通用的協議。
下面看一下ConsumerCoordinator對它的實現:
@Override public List<ProtocolMetadata> metadata() { List<ProtocolMetadata> metadataList = new ArrayList<>(); for (PartitionAssignor assignor : assignors) { Subscription subscription = assignor.subscription(subscriptions.subscription()); ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); metadataList.add(new ProtocolMetadata(assignor.name(), metadata)); } return metadataList; }
在這裏,consumer提供給每一個協議的metadata都是同樣的,是Subscription對象包含的數據。Subscription是PartitionAssignor的一個內部類,它有兩個field
class Subscription { private final List<String> topics; private final ByteBuffer userData; ... }
也就是說,consumer提供給coordinator的有兩部分信息:1. 它訂閱了哪些topic 2. userData。對於consumer, userData其實是一個空數組。不過PartitionAssignor這麼定義Subscription是有其用意的,userData是幹啥的呢?再看一下PartitionAssgnor的註釋。這也有助於瞭解ConsumerCoordinator#metadata()方法時使用的assignors是哪來的。
This interface is used to define custom partition assignment for use in org.apache.kafka.clients.consumer.KafkaConsumer. Members of the consumer group subscribe to the topics they are interested in and forward their subscriptions to a Kafka broker serving as the group coordinator. The coordinator selects one member to perform the group assignment and propagates the subscriptions of all members to it. Then assign(Cluster, Map) is called to perform the assignment and the results are forwarded back to each respective members. In some cases, it is useful to forward additional metadata to the assignor in order to make assignment decisions. For this, you can override subscription(Set) and provide custom userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation can use this user data to forward the rackId belonging to each member.
這段註釋也回答了一些以前在分析AbstractCoordinator的註釋時提出的問題。這段註釋提供瞭如下幾點信息
俺以爲,某些資源調度框架可能會受益於自定的PartitionAssignor,除了rack-aware以外,它們還能夠根據每一個機器上分配的consumer個數以及機器的性能來更好地進行負載勻衡。並且,這個東東也能夠用來實現partition分配的「粘性」,即某個consumer能夠一直被分配特定的分區,以便於它維持本地的狀態。
protected abstract Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata)
Perform assignment for the group. This is used by the leader to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)
Parameters:
leaderId - The id of the leader (which is this member)
allMemberMetadata - Metadata from all members of the group
Returns:
A map from each member to their state assignment
這裏leader Id, allMemeberMetadata都是Coordinator經過JoinGroupRespone發給leader的。leader基於這些信息作出分配,而後把分配結果寫在SyncGroupRequest裏發回給cooridnator,由Cooridnator把每一個member被分配的狀態發給這個member。
下面來看一下ConsumerCooridnator對這個方法的實現:
@Override protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) { //根據coordinator選擇的協議肯定PartitionAssignor PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); //肯定當前group訂閱的全部topic,以及每一個member訂閱了哪些topic Set<String> allSubscribedTopics = new HashSet<>(); Map<String, Subscription> subscriptions = new HashMap<>(); for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) { Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue()); subscriptions.put(subscriptionEntry.getKey(), subscription); allSubscribedTopics.addAll(subscription.topics()); } // the leader will begin watching for changes to any of the topics the group is interested in, // which ensures that all metadata changes will eventually be seen //leader會監聽這個group訂部的全部topic的metadata的變化 this.subscriptions.groupSubscribe(allSubscribedTopics); metadata.setTopics(this.subscriptions.groupSubscription()); // update metadata (if needed) and keep track of the metadata used for assignment so that // we can check after rebalance completion whether anything has changed //根據須要更新metadata,而且記錄assign時用的metadata到assignmentSnapshot裏 client.ensureFreshMetadata(); assignmentSnapshot = metadataSnapshot; log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", groupId, assignor.name(), subscriptions); //執行分配。metadata.fetch會得到當前的metadata,因爲KafkaConsumer是單線程的,因此這裏fetch的metadata和前邊保存的是一致的 Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); log.debug("Finished assignment for group {}: {}", groupId, assignment); //生成groupAssignment。它指明瞭哪一個group member該消費哪一個TopicPartition Map<String, ByteBuffer> groupAssignment = new HashMap<>(); for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) { ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); groupAssignment.put(assignmentEntry.getKey(), buffer); } return groupAssignment; }
這裏的Assignor有兩種: RangeAssignor和RoundRobinAssignor。
二者都是把一個Topic的分區依次分給全部訂閱這個topic的consumer.以t表示topic, c表示consumer,p表示partition, 字母后邊的數字表示topic, partiton, consumer的id。
RangeAssignor與RoundRobinAssignor的區別在於對於一個topic的分區的分配,是否會受到其它topic分區分配的影響。
RangeAssignor
RangeAssignor對於每一個topic,都是從consumer0開始分配。好比,topic0有3個分區,訂閱它的有兩個consumer。那麼consumer0會分到t0p0和t0p1, 而consumer1會分到t0p2.
若是它兩個consumer也都訂閱了另外一個有三個分區的topic1, 那麼consumer0還會分到t1p0和t1p1,而consumer1會分到t1p2。具體的算法RangeAssignor的JavaDoc有描述。
可見RangeAssignor有某些狀況下是不公平的,在上邊的例子中,若是這兩個consumer訂閱了更多有三個分區的topic,那麼consumer0分配的partition數量會一直是consumer1的兩倍。
RoundRobinAssignor
RoundRobinAssignor會首先把這個group訂閱的全部TopicPartition排序,排序是先按topic排序,同一個topic的分區按partition id排序。具體的算法RoundRobinAssignor的JavaDoc有描述。好比,假若有兩個各有三個分區的topic,它們的TopicPartition排序後爲t0p0 t0p1 t0p2 t1p0 t1p1 t1p2。
分配時會把這個排序後的TopicPartition列表依次分配給訂閱它們的consumer。好比c0和c1都訂閱了這兩個topic, 那麼分配結果是
t0p0 | t0p1 | t0p2 | t1p0 | t1p1 | t1p2 |
c0 | c1 | c0 | c1 | c0 | c1 |
這樣c0分到了: t0p0, t0p2, t1p2. c1分到了: t0p1, t1p0, t1p2
若是有三個consumer,
c0訂閱了t0, t1, t3.
c1訂閱了t0, t2, t4。
c2訂閱了t0, t2, t4。
t0有兩個分區,而其它topic都只有一個分區。
那麼排序後的TopicPartition以及分配的結果爲
t0p0 | t0p1 | t1p0 | t2p0 | t3p0 | t4p0 |
c0 | c1 | c0 | c1 | c0 | c1 |
可見c3乾脆就分不到分區了。因此RoundRobinAssignor也不能保證絕對公平。不過這只是比較極端的例子。
/** * Invoked when a group member has successfully joined a group. * @param generation The generation that was joined * @param memberId The identifier for the local member in the group * @param protocol The protocol selected by the coordinator * @param memberAssignment The assignment propagated from the group leader */ protected abstract void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment);
ConsumerCoordinator對它的實現是:
@Override protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { // if we were the assignor, then we need to make sure that there have been no metadata updates // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) { subscriptions.needReassignment(); return; } PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); // set the flag to refresh last committed offsets subscriptions.needRefreshCommits(); // update partition assignment subscriptions.assignFromSubscribed(assignment.partitions()); // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); // reschedule the auto commit starting from now if (autoCommitEnabled) autoCommitTask.reschedule(); // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException e) { throw e; } catch (Exception e) { log.error("User provided listener {} for group {} failed on partition assignment", listener.getClass().getName(), groupId, e); } }
首先,對於leader來講,它要檢查一下進行分配時的metadata跟當前的metadata是否一致,不一致的話,就標記下須要從新協調一次assign.
若是不存在上邊的狀況,就作如下幾個事情:
這裏須要注意的是,全部KafkaConsumer的操做都是在一個線程完成的,並且大部分都是在poll這個方法調用中完成。因此上邊代碼中的
subscriptions.needReassignment()和subscriptions.needRefreshCommits()
這些方法,都是改變了subscription對象的狀態,並無直正執行reassign和refresh commit操做。KafkaConsumer在執行poll方法時,會檢查這subscription對象的狀態,而後執行所須要的操做。因此,代碼裏這兩句
// set the flag to refresh last committed offsets subscriptions.needRefreshCommits(); // update partition assignment subscriptions.assignFromSubscribed(assignment.partitions());
當freshCommit執行時,第二句assignFromSubscribed已經執行完了,因此是獲取分配給這個consumer的全部partition的last committed offset.
Kafka Cooridnator的具體行爲,能夠參照這篇wiki。