本文主要聊一聊kafka client的auto commit的實現java
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>
kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/KafkaConsumer.javaapache
public ConsumerRecords<K, V> poll(long timeout) { acquire(); try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0) client.pollNoWakeup(); if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0); return ConsumerRecords.empty(); } finally { release(); } }
這裏調用了pollOnce方法async
/** * Do one round of polling. In addition to checking for new data, this does any needed offset commits * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined). * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}. * @return The fetched records (may be empty) */ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { coordinator.poll(time.milliseconds()); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); // if data is available already, return it immediately Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); return fetcher.fetchedRecords(); }
注意,這裏首先調用了coordinator.poll(time.milliseconds());maven
kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.javaide
/** * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits * if they are enabled. * * @param now current time in milliseconds */ public void poll(long now) { invokeCompletedOffsetCommitCallbacks(); if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) { ensureCoordinatorReady(); now = time.milliseconds(); } if (needRejoin()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) client.ensureFreshMetadata(); ensureActiveGroup(); now = time.milliseconds(); } pollHeartbeat(now); maybeAutoCommitOffsetsAsync(now); }
這裏重點看3個方法,一個是ensureActiveGroup,一個是pollHeartbeat,一個是maybeAutoCommitOffsetsAsyncfetch
protected synchronized void pollHeartbeat(long now) { if (heartbeatThread != null) { if (heartbeatThread.hasFailed()) { // set the heartbeat thread to null and raise an exception. If the user catches it, // the next call to ensureActiveGroup() will spawn a new heartbeat thread. RuntimeException cause = heartbeatThread.failureCause(); heartbeatThread = null; throw cause; } heartbeat.poll(now); } }
這裏會觸發一次心跳ui
固然,光靠poll來觸發心跳,這個是不靠譜的,單線程重走以前版本的老路了,新版本固然有個新線程來保證
kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.javathis
private class HeartbeatThread extends KafkaThread { private boolean enabled = false; private boolean closed = false; private AtomicReference<RuntimeException> failed = new AtomicReference<>(null); private HeartbeatThread() { super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId), true); } //...... } /** * Ensure that the group is active (i.e. joined and synced) */ public void ensureActiveGroup() { // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. ensureCoordinatorReady(); startHeartbeatThreadIfNeeded(); joinGroupIfNeeded(); } private synchronized void startHeartbeatThreadIfNeeded() { if (heartbeatThread == null) { heartbeatThread = new HeartbeatThread(); heartbeatThread.start(); } }
ensureActiveGroup這個方法保證heartbeatThread有啓動起來spa
private void maybeAutoCommitOffsetsAsync(long now) { if (autoCommitEnabled) { if (coordinatorUnknown()) { this.nextAutoCommitDeadline = now + retryBackoffMs; } else if (now >= nextAutoCommitDeadline) { this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync(); } } } private void doAutoCommitOffsetsAsync() { Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); } else { log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId); } } }); }
這裏會根據設定的auto.commit.interval.ms而後計算下次的auto commit時間線程