消費者的消費狀態是保存在SubscriptionState類中的,而SubscriptionState有個重要的屬性那就是assignment保存了消費者消費的partition及其partition的狀態java
public class SubscriptionState { /* the pattern user has requested */ private Pattern subscribedPattern; /* the list of topics the user has requested */ private final Set<String> subscription; /* the list of topics the group has subscribed to (set only for the leader on join group completion) */ private final Set<String> groupSubscription; /* the list of partitions the user has requested */ private final Set<TopicPartition> userAssignment; /* the list of partitions currently assigned */ private final Map<TopicPartition, TopicPartitionState> assignment; // 關鍵, 保存了消費者消費的partition及其partition的狀態 // ...
看下TopicPartitionState。TopicPartitionState用於表示消費者消費到該partition哪一個位置了,須要注意的是position表示下一條須要消費的位置而不是已經消費的位置,拉取消息的時候就是根據position來肯定須要拉取的第一條消息的offset網絡
private static class TopicPartitionState { private Long position; // 下一條消費哪一個offset private OffsetAndMetadata committed; // 已經提交的position private boolean paused; // whether this partition has been paused by the user private OffsetResetStrategy resetStrategy; // 重置position的時候的策略 // ... } public class OffsetAndMetadata implements Serializable { private final long offset; private final String metadata; }
以KafkaConsumer#commitSync爲例來看下客戶端是如何提交offset的數據結構
KafkaConsumer#commitSyncide
public void commitSync() { acquire(); try { commitSync(subscriptions.allConsumed()); // 調用SubscriptionState#allConsumed來獲取已經消費的消息的位置,而後將其提交 } finally { release(); } } public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { acquire(); try { coordinator.commitOffsetsSync(offsets); } finally { release(); } }
來看下SubscriptionState#allConsumed,從哪獲取到消費到的位置。從下面的代碼能夠看出提交的offset就是TopicPartitionState#positionui
public Map<TopicPartition, OffsetAndMetadata> allConsumed() { Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>(); for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) { TopicPartitionState state = entry.getValue(); if (state.hasValidPosition()) allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));// 關鍵,原來是將TopicPartitionState中的position封裝成OffsetAndMetadata,即提交的是TopicPartitionState#position } return allConsumed; }
獲取到消費到的offset位置後,最終是經過ConsumerCoordinator#sendOffsetCommitRequest將offset發送到coordinator的this
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (coordinatorUnknown()) // 必須獲取coordinator return RequestFuture.coordinatorNotAvailable(); if (offsets.isEmpty()) return RequestFuture.voidSuccess(); // create the offset commit request Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size()); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue(); offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata())); // 以TopicPartition爲key, offsetAndMetadat組成request中的數據 } OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.memberId, OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets));// 發送到coordinator }
從上面代碼最後一行能夠看出處理response的邏輯在OffsetCommitResponseHandler中。若是提交成功,那麼會將TopicPartitionState#position更新到TopicPartitionState#commitcode
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { private final Map<TopicPartition, OffsetAndMetadata> offsets; public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) { this.offsets = offsets; } @Override public OffsetCommitResponse parse(ClientResponse response) { return new OffsetCommitResponse(response.responseBody()); } @Override public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); // this.offsets即sendOffsetCommitRequest中的入參,這點很關鍵 long offset = offsetAndMetadata.offset(); Errors error = Errors.forCode(entry.getValue()); if (error == Errors.NONE) { if (subscriptions.isAssigned(tp)) subscriptions.committed(tp, offsetAndMetadata); // 更新TopicPartitionState#committed爲發送的時候的TopicPartitionState#position } // ... } } }