ConsumerNetworkClient
。ConsumerCoordinator
初始化拉取器Fetcher
java
subscribe()、assign()
會將訂閱信息記錄到SubscriptionState
,屢次訂閱會覆蓋舊數據。若是元數據緩存Metadata
不包含訂閱的主題,則設置needUpdate=true
,標識須要更新元數據。緩存
poll()
方法指定超時時間timeoutMs
,在這個時間範圍內不斷輪詢。拉取超時後,返回空記錄。網絡
private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { long elapsedTime = 0L; do { final long metadataEnd; // 更新分配元數據,協調器、心跳、消費位置 if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return ConsumerRecords.empty(); } // 拉取消息 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { // 消息不爲空時,當即發起下一輪的拉取消息,避免阻塞等待響應處理。 // 注意,在消息返回以前,不能觸發喚醒或其餘錯誤。 if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } // 回調執行消費者攔截器後返回給消費者 return this.interceptors.onConsume(new ConsumerRecords<>(records)); } final long fetchEnd = time.milliseconds(); elapsedTime += fetchEnd - metadataEnd; } while (elapsedTime < timeoutMs); // 輪詢拉取,知道超過輸入的超時時間 return ConsumerRecords.empty(); }
PartitionRecords
存在緩存記錄,則優先會從分區記錄緩存隊列completedFetches
中拉取一部分記錄,直接返回。LinkedHashMap
。ConcurrentHashMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent
中,同時添加處理響應的監聽器。unsent
,使用NetworkClient
發送請求,這裏和生產者使用相同的方法,處理流程類似。發送完後即清空unsent
。ConcurrentLinkedQueue<CompletedFetch> completedFetches
。completedFetches
中拉取一部分記錄返回給消費者。private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { final long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 從緩存隊列拉取 if (!records.isEmpty()) { // 緩存中有數據則直接返回 return records; } // 1.將拉取請求構形成節點和請求的映射關係,並緩存在 unsent // 2.添加響應處理監聽器,處理髮送拉取請求後,從服務端返回的消息,並緩存在隊列中 fetcher.sendFetches(); // 用 NetworkClient 向服務端發送拉取請求 client.poll(pollTimeout, startMs, () -> return !fetcher.hasCompletedFetches()); return fetcher.fetchedRecords(); // 再次從緩存隊列拉取 } // 從緩存拉取隊列拉取消息 public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; while (recordsRemaining > 0) { // 在超時時間內不斷輪詢 if (nextInLineRecords == null || nextInLineRecords.isFetched) { // 分區記錄爲空,或者已拉取 CompletedFetch completedFetch = completedFetches.peek(); // 從緩存隊列拉取消息 nextInLineRecords = parseCompletedFetch(completedFetch); // 將消息解析成分區消息記錄 PartitionRecords completedFetches.poll(); // 對緩存隊列移除 } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); // 從分區記錄拉取消息 TopicPartition partition = nextInLineRecords.partition; if (!records.isEmpty()) { // 拉取到消息,方法 Map,以返回給消費者 fetched.put(partition, records); } } return fetched; }