Kafka2.0消費者客戶端源碼分析

1 KafkaConsumer 構造器

  1. 初始化參數配置。
  2. 初始化消費者網絡客戶端ConsumerNetworkClient
  3. 初始化消費者協調器ConsumerCoordinator
  4. 初始化拉取器Fetcherjava

    2 訂閱主題

  5. 調用訂閱方法subscribe()、assign()會將訂閱信息記錄到SubscriptionState,屢次訂閱會覆蓋舊數據。
  6. 若是元數據緩存Metadata不包含訂閱的主題,則設置needUpdate=true,標識須要更新元數據。緩存

    3 拉取消息

  7. poll()方法指定超時時間timeoutMs,在這個時間範圍內不斷輪詢。
  8. 更新分配給消費者的數據,包括消費者協調器偏移量心跳等。
  9. 根據超時時間拉取消息。
  10. 若是拉取的消息不爲空,當即出發下一輪的拉取,能夠避免因處理消息響應,而阻塞等待。
  11. 拉取的消息會先反序列化,再調用消費者攔截器,最後返回給消費者。
  12. 拉取超時後,返回空記錄。網絡

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
    long elapsedTime = 0L;
    do {
        final long metadataEnd;
        // 更新分配元數據,協調器、心跳、消費位置
        if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
            return ConsumerRecords.empty();
        }
        // 拉取消息
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));

        if (!records.isEmpty()) {
            // 消息不爲空時,當即發起下一輪的拉取消息,避免阻塞等待響應處理。
            // 注意,在消息返回以前,不能觸發喚醒或其餘錯誤。
            if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                client.pollNoWakeup();
            }
            // 回調執行消費者攔截器後返回給消費者
            return this.interceptors.onConsume(new ConsumerRecords<>(records));
        }
        final long fetchEnd = time.milliseconds();
        elapsedTime += fetchEnd - metadataEnd;
    } while (elapsedTime < timeoutMs); // 輪詢拉取,知道超過輸入的超時時間

    return ConsumerRecords.empty();
}

3.1 拉取消息的詳細流程

  1. 若是分區記錄緩存PartitionRecords存在緩存記錄,則優先會從分區記錄緩存隊列completedFetches中拉取一部分記錄,直接返回。
  2. 不然,向服務端發送拉取請求,消費者並不會當即發送請求,而是先構造 Node 和請求的緩存LinkedHashMap
  3. 遍歷上述緩存,構形成能夠直接發送的請求,並緩存到ConcurrentHashMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent中,同時添加處理響應的監聽器。
  4. 遍歷unsent,使用NetworkClient發送請求,這裏和生產者使用相同的方法,處理流程類似。發送完後即清空unsent
  5. 當拉取到消息,會回調第3步中的監聽器,將消息緩存到隊列ConcurrentLinkedQueue<CompletedFetch> completedFetches
  6. 相似第1步,從分區記錄緩存隊列completedFetches中拉取一部分記錄返回給消費者。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
    final long startMs = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 從緩存隊列拉取
    if (!records.isEmpty()) { // 緩存中有數據則直接返回
        return records;
    }

    // 1.將拉取請求構形成節點和請求的映射關係,並緩存在 unsent
    // 2.添加響應處理監聽器,處理髮送拉取請求後,從服務端返回的消息,並緩存在隊列中
    fetcher.sendFetches();

    // 用 NetworkClient 向服務端發送拉取請求
    client.poll(pollTimeout, startMs, () -> return !fetcher.hasCompletedFetches());

    return fetcher.fetchedRecords(); // 再次從緩存隊列拉取
}
// 從緩存拉取隊列拉取消息
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    while (recordsRemaining > 0) { // 在超時時間內不斷輪詢
        if (nextInLineRecords == null || nextInLineRecords.isFetched) { // 分區記錄爲空,或者已拉取
            CompletedFetch completedFetch = completedFetches.peek(); // 從緩存隊列拉取消息
            nextInLineRecords = parseCompletedFetch(completedFetch); // 將消息解析成分區消息記錄 PartitionRecords
            completedFetches.poll(); // 對緩存隊列移除
        } else {
            List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); // 從分區記錄拉取消息
            TopicPartition partition = nextInLineRecords.partition;
            if (!records.isEmpty()) { // 拉取到消息,方法 Map,以返回給消費者
                fetched.put(partition, records);
        }
    }
    return fetched;
}

4 總體流程

總體流程

相關文章
相關標籤/搜索