Kafka的coordinator

(基於0.10版本)node

Group Management Protocol

Kafka的coordiantor要作的事情就是group management,就是要對一個團隊(或者叫組)的成員進行管理。Group management就是要作這些事情:算法

  • 維持group的成員組成。這包括容許新的成員加入,檢測成員的存活性,清除再也不存活的成員。
  • 協調group成員的行爲。

Kafka爲其設計了一個協議,就收作Group Management Protocol.apache

很明顯,consumer group所要作的事情,是能夠用group management 協議作到的。而cooridnator, 及這個協議,也是爲了實現不依賴Zookeeper的高級消費者而提出並實現的。只不過,Kafka對高級消費者的成員管理行爲進行了抽象,抽象出來group management功能共有的邏輯,以此設計了Group Management Protocol, 使得這個協議不僅適用於Kafka consumer(目前Kafka Connect也在用它),也能夠做爲其它"group"的管理協議。數組

那麼,這個協議抽象出來了哪些group management共有的邏輯呢? Kafka Consumer的AbstractCoordinator的註釋給出了一些答案。網絡

AbstractCoordinator


AbstractCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. app

See ConsumerCoordinator for example usage.框架

From a high level, Kafka's group management protocol consists of the following sequence of actions:ide

  • Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
  • Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
  • State Assignment: The leader collects the metadata from all the members of the group and assigns state.
  • Group Stabilization: Each member receives the state assigned by the leader and begins processing.

To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in performAssignment(String, String, Map) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer).性能

首先,AbstractorCoordinator是位於broker端的coordinator的客戶端。這段註釋裏的"The cooridnator"都是指broker端的那個cooridnator,而不是AbstractCoordiantor。AbstractCoordinator和broker端的coordinator的分工,能夠從註釋裏大體看出來。這段註釋說,Kafka的group management protocol包括如下的動做序列:fetch

  • Group Registration:Group的成員須要向cooridnator註冊本身,而且提供關於成員自身的元數據(好比,這個消費成員想要消費的topic)
  • Group/Leader Selection:cooridnator肯定這個group包括哪些成員,而且選擇其中的一個做爲leader。
  • State Assignment: leader收集全部成員的metadata,而且給它們分配狀態(state,能夠理解爲資源,或者任務)。
  • Group Stabilization: 每一個成員收到leader分配的狀態,而且開始處理。

這裏邊有三個角色:coordinator, group memeber, group leader. 

有這麼幾個狀況:

  1. 全部的成員要先向coordinator註冊,由coordinator選出leader, 而後由leader來分配state。這裏存在着3個角色,其分工並不像storm的nimbus和supervisor或者其它的master-slave系統同樣,而更相似於Yarn的resource manager, application master和node manager. 它們也都是爲了解決擴展性的問題。單個Kafka集羣可能會存在着比broker的數量大得多的消費者和消費者組,而消費者的狀況多是不穩定的,可能會頻繁變化,每次變化都須要一次協調,若是由broker來負責實際的協調工做,會給broker增長不少負擔。因此,從group memeber裏選出來一個作爲leader,由leader來執行性能開銷大的協調任務, 這樣把負載分配到client端,能夠減輕broker的壓力,支持更多數量的消費組。
  2. 可是leader和follower具體的行爲是怎麼樣的呢?follower的心跳直接發給leader嗎?state assign是leader直接發送給follower的嗎?
    1. 這裏確定與YARN有所不一樣,畢竟Kafka並不存在像NodeManager同樣的東西。也就是說若是leader至少須要向coordinator發heartbeat。
    2. YARN的RM是隻負責資源分配的,Kafka的coordinator按照上面註釋的說法還須要肯定group的成員,即便在leader肯定後,leader也不負責肯定group的成員,能夠推斷出,全部group member都須要發心跳給coordinator,這樣coordinator才能肯定group的成員。爲何心跳不直接發給leader呢?或許是爲了可靠性。畢竟,leader和follower之間是可能存在着網絡分區的狀況的。可是,coordinator做爲broker,若是任何group member沒法與coordinator通信,那也就確定不能做爲這個group的成員了。這也決定了,這個Group Management Protocol不該依賴於follower和leader之間可靠的網絡通信,由於leader不該該與follower直接交互。而應該經過coordinator來管理這個組。這種行爲與YARN有明顯的區別,由於YARN的每一個節點都在集羣內部,而Kafka的client卻不是集羣的一部分,可能存在於這種網絡環境和地理位置。
    3. 對於Kafka consumer,它的實際上必須跟coordinator保持鏈接,由於它還須要提交offset給coordinator。因此coordinator實際上負責commit offset,那麼,即便leader來肯定狀態的分配,可是每一個partition的消費起始點,還須要coordinator來肯定。這就帶來了一問題,每一個partition的消費開始的offset是由leader向coordinator請求,而後作爲state分配,仍是leader只分配partition,而follower去coordinator處請求開始消費的offset?

要回答這些問題,就要看代碼了。AbstractCoordinator的註釋還沒完,它接下來這麼說:

To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in performAssignment(String, String, Map) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer).

這是說AbstractCoordinator的實現必須實現三個方法: metadata(), performAssignment(String, String, Map)和onJoinComplete(int, String, String, ByteBuffer)。

從這三個方法入手,能夠了解Group Management Protocol的一些細節。

Metadata

metadata()

protected abstract List<ProtocolMetadata> metadata();

Get the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).
Returns:
Non-empty map of supported protocols and metadata

這個方法返回的是這個group member所支持的協議,以及適用於生個協議的protocol。這些數據會提交給coordinator,coordinator會考慮到全部成員支持的協議,來爲它們選擇一個通用的協議。

下面看一下ConsumerCoordinator對它的實現:

    @Override
    public List<ProtocolMetadata> metadata() {
        List<ProtocolMetadata> metadataList = new ArrayList<>();
        for (PartitionAssignor assignor : assignors) {
            Subscription subscription = assignor.subscription(subscriptions.subscription());
            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
            metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
        }
        return metadataList;
    }

在這裏,consumer提供給每一個協議的metadata都是同樣的,是Subscription對象包含的數據。Subscription是PartitionAssignor的一個內部類,它有兩個field

    class Subscription {
        private final List<String> topics;
        private final ByteBuffer userData;
     ...
    }

也就是說,consumer提供給coordinator的有兩部分信息:1. 它訂閱了哪些topic 2. userData。對於consumer, userData其實是一個空數組。不過PartitionAssignor這麼定義Subscription是有其用意的,userData是幹啥的呢?再看一下PartitionAssgnor的註釋。這也有助於瞭解ConsumerCoordinator#metadata()方法時使用的assignors是哪來的。

PartitionAssignor

This interface is used to define custom partition assignment for use in org.apache.kafka.clients.consumer.KafkaConsumer. Members of the consumer group subscribe to the topics they are interested in and forward their subscriptions to a Kafka broker serving as the group coordinator. The coordinator selects one member to perform the group assignment and propagates the subscriptions of all members to it. Then assign(Cluster, Map) is called to perform the assignment and the results are forwarded back to each respective members. In some cases, it is useful to forward additional metadata to the assignor in order to make assignment decisions. For this, you can override subscription(Set) and provide custom userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation can use this user data to forward the rackId belonging to each member.

這段註釋也回答了一些以前在分析AbstractCoordinator的註釋時提出的問題。這段註釋提供瞭如下幾點信息

  1. PartitionAssignor這個接口是用來定義KafkaConsumer所用的「分區分配策略」. 用戶能夠實現這個接口,以定義本身所需的策略。
  2. consumer group的成員把它們所訂閱的topic發送給coordinator。而後coordinator來選擇一個leader, 而後由coordinator把這個group的全部成員的訂閱狀況發給leader,由leader來執行分區的分配。
  3. leader調用PartitionAssignor的assign方法,來執行分區,而後把結果發給coordinator。由coordinator來轉發分配的結果到每一個group的成員。
  4. 有時候,須要利用各個consumer的額外的信息來決定分配結果,好比consumer所在的機架狀況。這時候,在實現PartitionAssignor時,就能夠覆蓋subscription(Set)方法,在其返回的Subscription對象中提供本身須要的userData。

俺以爲,某些資源調度框架可能會受益於自定的PartitionAssignor,除了rack-aware以外,它們還能夠根據每一個機器上分配的consumer個數以及機器的性能來更好地進行負載勻衡。並且,這個東東也能夠用來實現partition分配的「粘性」,即某個consumer能夠一直被分配特定的分區,以便於它維持本地的狀態。

performAssignment

protected abstract Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata)


Perform assignment for the group. This is used by the leader to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)


Parameters:
leaderId - The id of the leader (which is this member)
allMemberMetadata - Metadata from all members of the group


Returns:
A map from each member to their state assignment

這裏leader Id, allMemeberMetadata都是Coordinator經過JoinGroupRespone發給leader的。leader基於這些信息作出分配,而後把分配結果寫在SyncGroupRequest裏發回給cooridnator,由Cooridnator把每一個member被分配的狀態發給這個member。

下面來看一下ConsumerCooridnator對這個方法的實現:

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions) {
        //根據coordinator選擇的協議肯定PartitionAssignor
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

        //肯定當前group訂閱的全部topic,以及每一個member訂閱了哪些topic
        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, Subscription> subscriptions = new HashMap<>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }

        // the leader will begin watching for changes to any of the topics the group is interested in,
        // which ensures that all metadata changes will eventually be seen
        //leader會監聽這個group訂部的全部topic的metadata的變化
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        metadata.setTopics(this.subscriptions.groupSubscription());

        // update metadata (if needed) and keep track of the metadata used for assignment so that
        // we can check after rebalance completion whether anything has changed
        //根據須要更新metadata,而且記錄assign時用的metadata到assignmentSnapshot裏
        client.ensureFreshMetadata();
        assignmentSnapshot = metadataSnapshot;

        log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
                groupId, assignor.name(), subscriptions);

        //執行分配。metadata.fetch會得到當前的metadata,因爲KafkaConsumer是單線程的,因此這裏fetch的metadata和前邊保存的是一致的
        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

        log.debug("Finished assignment for group {}: {}", groupId, assignment);

        //生成groupAssignment。它指明瞭哪一個group member該消費哪一個TopicPartition
        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }

        return groupAssignment;
    }

這裏的Assignor有兩種: RangeAssignor和RoundRobinAssignor。

二者都是把一個Topic的分區依次分給全部訂閱這個topic的consumer.以t表示topic, c表示consumer,p表示partition, 字母后邊的數字表示topic, partiton, consumer的id。

RangeAssignor與RoundRobinAssignor的區別在於對於一個topic的分區的分配,是否會受到其它topic分區分配的影響。

RangeAssignor

RangeAssignor對於每一個topic,都是從consumer0開始分配。好比,topic0有3個分區,訂閱它的有兩個consumer。那麼consumer0會分到t0p0和t0p1, 而consumer1會分到t0p2.

若是它兩個consumer也都訂閱了另外一個有三個分區的topic1, 那麼consumer0還會分到t1p0和t1p1,而consumer1會分到t1p2。具體的算法RangeAssignor的JavaDoc有描述。

可見RangeAssignor有某些狀況下是不公平的,在上邊的例子中,若是這兩個consumer訂閱了更多有三個分區的topic,那麼consumer0分配的partition數量會一直是consumer1的兩倍。

RoundRobinAssignor

RoundRobinAssignor會首先把這個group訂閱的全部TopicPartition排序,排序是先按topic排序,同一個topic的分區按partition id排序。具體的算法RoundRobinAssignor的JavaDoc有描述。好比,假若有兩個各有三個分區的topic,它們的TopicPartition排序後爲t0p0 t0p1 t0p2 t1p0 t1p1 t1p2。

分配時會把這個排序後的TopicPartition列表依次分配給訂閱它們的consumer。好比c0和c1都訂閱了這兩個topic, 那麼分配結果是

t0p0 t0p1  t0p2 t1p0 t1p1 t1p2
c0 c1 c0 c1 c0 c1

這樣c0分到了: t0p0, t0p2, t1p2.    c1分到了: t0p1, t1p0, t1p2

若是有三個consumer,

c0訂閱了t0, t1, t3.

c1訂閱了t0, t2, t4。

c2訂閱了t0, t2, t4。

t0有兩個分區,而其它topic都只有一個分區。

那麼排序後的TopicPartition以及分配的結果爲

t0p0 t0p1 t1p0 t2p0 t3p0 t4p0
c0 c1 c0 c1 c0 c1

可見c3乾脆就分不到分區了。因此RoundRobinAssignor也不能保證絕對公平。不過這只是比較極端的例子。

onJoinComplete

    /**
     * Invoked when a group member has successfully joined a group.
     * @param generation The generation that was joined
     * @param memberId The identifier for the local member in the group
     * @param protocol The protocol selected by the coordinator
     * @param memberAssignment The assignment propagated from the group leader
     */
    protected abstract void onJoinComplete(int generation,
                                           String memberId,
                                           String protocol,
                                           ByteBuffer memberAssignment);

ConsumerCoordinator對它的實現是:

    @Override
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
        // if we were the assignor, then we need to make sure that there have been no metadata updates
        // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
            subscriptions.needReassignment();
            return;
        }

        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

        // set the flag to refresh last committed offsets
        subscriptions.needRefreshCommits();

        // update partition assignment
        subscriptions.assignFromSubscribed(assignment.partitions());

        // give the assignor a chance to update internal state based on the received assignment
        assignor.onAssignment(assignment);

        // reschedule the auto commit starting from now
        if (autoCommitEnabled)
            autoCommitTask.reschedule();

        // execute the user's callback after rebalance
        ConsumerRebalanceListener listener = subscriptions.listener();
        log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
        try {
            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
            listener.onPartitionsAssigned(assigned);
        } catch (WakeupException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} for group {} failed on partition assignment",
                    listener.getClass().getName(), groupId, e);
        }
    }

 首先,對於leader來講,它要檢查一下進行分配時的metadata跟當前的metadata是否一致,不一致的話,就標記下須要從新協調一次assign.

若是不存在上邊的狀況,就作如下幾個事情:

  • 設置「須要刷新last committed offset"的標誌
  • 更新這個conumser所採集的TopicPartition集合
  • 調用Assignor的onAssignment方法,設Assignor來處理一下本身的內部狀態
  • 從新調度autoCommit任務。這個任務用於週期性地 commit offset
  • 調用ConsumerRebalanceListener。這個Listener是用戶傳給KafkaConsumer的。

這裏須要注意的是,全部KafkaConsumer的操做都是在一個線程完成的,並且大部分都是在poll這個方法調用中完成。因此上邊代碼中的

subscriptions.needReassignment()和subscriptions.needRefreshCommits()

 這些方法,都是改變了subscription對象的狀態,並無直正執行reassign和refresh commit操做。KafkaConsumer在執行poll方法時,會檢查這subscription對象的狀態,而後執行所須要的操做。因此,代碼裏這兩句

        // set the flag to refresh last committed offsets
        subscriptions.needRefreshCommits();

        // update partition assignment
        subscriptions.assignFromSubscribed(assignment.partitions());

當freshCommit執行時,第二句assignFromSubscribed已經執行完了,因此是獲取分配給這個consumer的全部partition的last committed offset.

Kafka Client-side Assignment Proposal

Kafka Cooridnator的具體行爲,能夠參照這篇wiki。

相關文章
相關標籤/搜索