Kafka 源碼解析之 Producer Metadata 更新機制(二)

在上一篇文章中,已經介紹了 Producer 的發送模型,Producer dosend() 方法中的第一步,就是獲取相關的 topic 的 metadata,但在上篇中並無深刻展開,由於這部分的內容比較多,因此本文單獨一篇文章進行介紹,本文主要來說述如下三個問題:node

  1. metadata 內容是什麼;
  2. Producer 更新 metadata 的流程;
  3. Producer 在什麼狀況下會去更新 metadata;

Metadata 內容

Metadata 信息的內容能夠經過源碼看明白:git

// 這個類被 client 線程和後臺 sender 所共享,它只保存了全部 topic 的部分數據,當咱們請求一個它上面沒有的 topic meta 時,它會經過發送 metadata update 來更新 meta 信息,
// 若是 topic meta 過時策略是容許的,那麼任何 topic 過時的話都會被從集合中移除,
// 可是 consumer 是不容許 topic 過時的由於它明確地知道它須要管理哪些 topic
public final class Metadata {
    private static final Logger log = LoggerFactory.getLogger(Metadata.class);

    public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
    private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;

    private final long refreshBackoffMs; // metadata 更新失敗時,爲避免頻繁更新 meta,最小的間隔時間,默認 100ms
    private final long metadataExpireMs; // metadata 的過時時間, 默認 60,000ms
    private int version; // 每更新成功1次,version自增1,主要是用於判斷 metadata 是否更新
    private long lastRefreshMs; // 最近一次更新時的時間(包含更新失敗的狀況)
    private long lastSuccessfulRefreshMs; // 最近一次成功更新的時間(若是每次都成功的話,與前面的值相等, 不然,lastSuccessulRefreshMs < lastRefreshMs)
    private Cluster cluster; // 集羣中一些 topic 的信息
    private boolean needUpdate; // 是都須要更新 metadata
    /* Topics with expiry time */
    private final Map<String, Long> topics; // topic 與其過時時間的對應關係
    private final List<Listener> listeners; // 事件監控者
    private final ClusterResourceListeners clusterResourceListeners; //當接收到 metadata 更新時, ClusterResourceListeners的列表
    private boolean needMetadataForAllTopics; // 是否強制更新全部的 metadata
    private final boolean topicExpiryEnabled; // 默認爲 true, Producer 會定時移除過時的 topic,consumer 則不會移除

關於 topic 的詳細信息(leader 所在節點、replica 所在節點、isr 列表)都是在 Cluster 實例中保存的。github

// 並非一個全集,metadata的主要組成部分
public final class Cluster {

    // 從命名直接就看出了各個變量的用途
    private final boolean isBootstrapConfigured;
    private final List<Node> nodes; // node 列表
    private final Set<String> unauthorizedTopics; // 未認證的 topic 列表
    private final Set<String> internalTopics; // 內置的 topic 列表
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // partition 的詳細信息
    private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic 與 partition 的對應關係
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; //  可用(leader 不爲 null)的 topic 與 partition 的對應關係
    private final Map<Integer, List<PartitionInfo>> partitionsByNode; // node 與 partition 的對應關係
    private final Map<Integer, Node> nodesById; // node 與 id 的對應關係
    private final ClusterResource clusterResource;

// org.apache.kafka.common.PartitionInfo
// topic-partition: 包含 topic、partition、leader、replicas、isr
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;

Cluster 實例主要是保存:apache

  1. broker.id 與 node 的對應關係;
  2. topic 與 partition (PartitionInfo)的對應關係;
  3. node 與 partition (PartitionInfo)的對應關係。

Producer 的 Metadata 更新流程

Producer 在調用 dosend() 方法時,第一步就是經過 waitOnMetadata 方法獲取該 topic 的 metadata 信息.api

// 等待 metadata 的更新
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    metadata.add(topic);// 在 metadata 中添加 topic 後,若是 metadata 中沒有這個 topic 的 meta,那麼 metadata 的更新標誌設置爲了 true
    Cluster cluster = metadata.fetch();
    Integer partitionsCount = cluster.partitionCountForTopic(topic);// 若是 topic 已經存在 meta 中,則返回該 topic 的 partition 數,不然返回 null

    // 當前 metadata 中若是已經有這個 topic 的 meta 的話,就直接返回
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;

    // 發送 metadata 請求,直到獲取了這個 topic 的 metadata 或者請求超時
    do {
        log.trace("Requesting metadata update for topic {}.", topic);
        int version = metadata.requestUpdate();// 返回當前版本號,初始值爲0,每次更新時會自增,並將 needUpdate 設置爲 true
        sender.wakeup();// 喚起 sender,發送 metadata 請求
        try {
            metadata.awaitUpdate(version, remainingWaitMs);// 等待 metadata 的更新
        } catch (TimeoutException ex) {
            // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 超時
        if (cluster.unauthorizedTopics().contains(topic))// 認證失敗,對當前 topic 沒有 Write 權限
            throw new TopicAuthorizationException(topic);
        remainingWaitMs = maxWaitMs - elapsed;
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null);// 不停循環,直到 partitionsCount 不爲 null(即直到 metadata 中已經包含了這個 topic 的相關信息)

    if (partition != null && partition >= partitionsCount) {
        throw new KafkaException(
                String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));

    return new ClusterAndWaitTime(cluster, elapsed);

若是 metadata 中不存在這個 topic 的 metadata,那麼就請求更新 metadata,若是 metadata 沒有更新的話,方法就一直處在 do ... while 的循環之中,在循環之中,主要作如下操做:markdown

  1. metadata.requestUpdate() 將 metadata 的 needUpdate 變量設置爲 true(強制更新),並返回當前的版本號(version),經過版本號來判斷 metadata 是否完成更新;數據結構

  2. sender.wakeup() 喚醒 sender 線程,sender 線程又會去喚醒 NetworkClient 線程,NetworkClient 線程進行一些實際的操做(後面詳細介紹);app

  3. metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新。oop

    // 更新 metadata 信息(根據當前 version 值來判斷) public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) {// 不斷循環,直到 metadata 更新成功,version 自增 if (remainingWaitMs != 0) wait(remainingWaitMs);// 阻塞線程,等待 metadata 的更新 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs)// timeout throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }

Metadata.awaitUpdate() 方法中,線程會阻塞在 while 循環中,直到 metadata 更新成功或者 timeout。

從前面能夠看出,此時 Producer 線程會阻塞在兩個 while 循環中,直到 metadata 信息更新,那麼 metadata 是如何更新的呢?若是有印象的話,前面應該已經介紹過了,主要是經過 sender.wakeup() 來喚醒 sender 線程,間接喚醒 NetworkClient 線程,NetworkClient 線程來負責發送 Metadata 請求,並處理 Server 端的響應。

Kafka 源碼分析之 Producer 發送模型(一) 中介紹 Producer 發送模型時,在第五步 sender 線程會調用 NetworkClient.poll() 方法進行實際的操做,其源碼以下:

public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);// 判斷是否須要更新 meta,若是須要就更新(請求更新 metadata 的地方)
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);// 經過 selector 中獲取 Server 端的 response
        handleCompletedReceives(responses, updatedNow);// 在返回的 handler 中,會處理 metadata 的更新
        handleDisconnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
        return responses;


  • metadataUpdater.maybeUpdate(now):判斷是否須要更新 Metadata,若是須要更新的話,先與 Broker 創建鏈接,而後發送更新 metadata 的請求;
  • 處理 Server 端的一些響應,這裏主要討論的是 handleCompletedReceives(responses, updatedNow) 方法,它會處理 Server 端返回的 Metadata 結果。

先看一下 metadataUpdater.maybeUpdate() 的具體實現:

public long maybeUpdate(long now) {
        // should we update our metadata?
        // metadata 是否應該更新
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);// metadata 下次更新的時間(須要判斷是強制更新仍是 metadata 過時更新,前者是立馬更新,後者是計算 metadata 的過時時間)
        // 若是一條 metadata 的 fetch 請求還未從 server 收到恢復,那麼時間設置爲 waitForMetadataFetch(默認30s)
        long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;

        long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
        if (metadataTimeout > 0) {// 時間未到時,直接返回下次應該更新的時間
            return metadataTimeout;

        Node node = leastLoadedNode(now);// 選擇一個鏈接數最小的節點
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            return reconnectBackoffMs;

        return maybeUpdate(now, node); // 能夠發送 metadata 請求的話,就發送 metadata 請求

     * Add a metadata request to the list of sends if we can make one
    // 判斷是否能夠發送請求,能夠的話將 metadata 請求加入到發送列表中
    private long maybeUpdate(long now, Node node) {
        String nodeConnectionId = node.idString();

        if (canSendRequest(nodeConnectionId)) {// 通道已經 ready 而且支持發送更多的請求
            this.metadataFetchInProgress = true; // 準備開始發送數據,將 metadataFetchInProgress 置爲 true
            MetadataRequest.Builder metadataRequest; // 建立 metadata 請求
            if (metadata.needMetadataForAllTopics())// 強制更新全部 topic 的 metadata(雖然默認不會更新全部 topic 的 metadata 信息,可是每一個 Broker 會保存全部 topic 的 meta 信息)
                metadataRequest = MetadataRequest.Builder.allTopics();
            else // 只更新 metadata 中的 topics 列表(列表中的 topics 由 metadata.add() 獲得)
                metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));

            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
            sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);/ 發送 metadata 請求
            return requestTimeoutMs;

        // If there's any connection establishment underway, wait until it completes. This prevents
        // the client from unnecessarily connecting to additional nodes while a previous connection
        // attempt has not been completed.
        if (isAnyNodeConnecting()) {// 若是 client 正在與任何一個 node 的鏈接狀態是 connecting,那麼就進行等待
            // Strictly the timeout we should return here is "connect timeout", but as we don't
            // have such application level configuration, using reconnect backoff instead.
            return reconnectBackoffMs;

        if (connectionStates.canConnect(nodeConnectionId, now)) {// 若是沒有鏈接這個 node,那就初始化鏈接
            // we don't have a connection to this node right now, make one
            log.debug("Initialize connection to node {} for sending metadata request", node.id());
            initiateConnect(node, now);// 初始化鏈接
            return reconnectBackoffMs;
        return Long.MAX_VALUE;

 // 發送 Metadata 請求   
 private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
                                         String nodeConnectionId, long now) {
    ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);// 建立 metadata 請求
    doSend(clientRequest, true, now);

因此,每次 Producer 請求更新 metadata 時,會有如下幾種狀況:

  1. 若是 node 能夠發送請求,則直接發送請求;
  2. 若是該 node 正在創建鏈接,則直接返回;
  3. 若是該 node 還沒創建鏈接,則向 broker 初始化連接。

而 KafkaProducer 線程以前是一直阻塞在兩個 while 循環中,直到 metadata 更新

  1. sender 線程第一次調用 poll() 方法時,初始化與 node 的鏈接;
  2. sender 線程第二次調用 poll() 方法時,發送 Metadata 請求;
  3. sender 線程第三次調用 poll() 方法時,獲取 metadataResponse,並更新 metadata。

通過上述 sender 線程三次調用 poll()方法,所請求的 metadata 信息纔會獲得更新,此時 Producer 線程也不會再阻塞,開始發送消息。

NetworkClient 接收到 Server 端對 Metadata 請求的響應後,更新 Metadata 信息。

// 處理任何已經完成的接收響應
    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            InFlightRequest req = inFlightRequests.completeNext(source);
            AbstractResponse body = parseResponse(receive.payload(), req.header);
            log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
            if (req.isInternalRequest && body instanceof MetadataResponse)// 若是是 meta 響應
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); // 若是是其餘響應
                responses.add(req.completed(body, now));

        // 處理 Server 端對 Metadata 請求處理後的 response
        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            this.metadataFetchInProgress = false;
            Cluster cluster = response.cluster();
            // check if any topics metadata failed to get updated
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty())
                log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);

            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
            // created which means we will get errors and no nodes until it exists
            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, now);// 更新 meta 信息
            } else {// 若是 metadata 中 node 信息無效,則不更新 metadata 信息
                log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());

Producer Metadata 的更新策略

Metadata 會在下面兩種狀況下進行更新

  1. KafkaProducer 第一次發送消息時強制更新,其餘時間週期性更新,它會經過 Metadata 的 lastRefreshMs, lastSuccessfulRefreshMs 這2個字段來實現;
  2. 強制更新: 調用 Metadata.requestUpdate()needUpdate 置成了 true 來強制更新。

在 NetworkClient 的 poll() 方法調用時,就會去檢查這兩種更新機制,只要達到其中一種,就行觸發更新操做。

Metadata 的強制更新會在如下幾種狀況下進行:

  1. initConnect 方法調用時,初始化鏈接;
  2. poll() 方法中對 handleDisconnections() 方法調用來處理鏈接斷開的狀況,這時會觸發強制更新;
  3. poll() 方法中對 handleTimedOutRequests() 來處理請求超時時;
  4. 發送消息時,若是沒法找到 partition 的 leader;
  5. 處理 Producer 響應(handleProduceResponse),若是返回關於 Metadata 過時的異常,好比:沒有 topic-partition 的相關 meta 或者 client 沒有權限獲取其 metadata。



