consumer提交offset原理

1 數據結構

消費者的消費狀態是保存在SubscriptionState類中的,而SubscriptionState有個重要的屬性那就是assignment保存了消費者消費的partition及其partition的狀態java

public class SubscriptionState {

    /* the pattern user has requested */
    private Pattern subscribedPattern;

    /* the list of topics the user has requested */
    private final Set<String> subscription;

    /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
    private final Set<String> groupSubscription;

    /* the list of partitions the user has requested */
    private final Set<TopicPartition> userAssignment;

    /* the list of partitions currently assigned */
    private final Map<TopicPartition, TopicPartitionState> assignment; // 關鍵, 保存了消費者消費的partition及其partition的狀態

    //  ...

看下TopicPartitionState。TopicPartitionState用於表示消費者消費到該partition哪一個位置了,須要注意的是position表示下一條須要消費的位置而不是已經消費的位置,拉取消息的時候就是根據position來肯定須要拉取的第一條消息的offset網絡

private static class TopicPartitionState {
        private Long position; // 下一條消費哪一個offset
        private OffsetAndMetadata committed;  // 已經提交的position
        private boolean paused;  // whether this partition has been paused by the user
        private OffsetResetStrategy resetStrategy;  // 重置position的時候的策略

        // ...
}

public class OffsetAndMetadata implements Serializable {
    private final long offset;
    private final String metadata;
}

2 commit offset

以KafkaConsumer#commitSync爲例來看下客戶端是如何提交offset的數據結構

KafkaConsumer#commitSyncide

public void commitSync() {
        acquire();
        try {
            commitSync(subscriptions.allConsumed()); // 調用SubscriptionState#allConsumed來獲取已經消費的消息的位置,而後將其提交
        } finally {
            release();
        }
}

public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        acquire();
        try {
            coordinator.commitOffsetsSync(offsets);
        } finally {
            release();
        }
}

2.1 獲取已經消費的位置

來看下SubscriptionState#allConsumed,從哪獲取到消費到的位置。從下面的代碼能夠看出提交的offset就是TopicPartitionState#positionui

public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
        Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
            TopicPartitionState state = entry.getValue();
            if (state.hasValidPosition())
                allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));// 關鍵,原來是將TopicPartitionState中的position封裝成OffsetAndMetadata,即提交的是TopicPartitionState#position
        }
        return allConsumed;
    }

2.2 發送到網絡

獲取到消費到的offset位置後,最終是經過ConsumerCoordinator#sendOffsetCommitRequest將offset發送到coordinator的this

private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (coordinatorUnknown()) // 必須獲取coordinator
            return RequestFuture.coordinatorNotAvailable();

        if (offsets.isEmpty())
            return RequestFuture.voidSuccess();

        // create the offset commit request
        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                    offsetAndMetadata.offset(), offsetAndMetadata.metadata())); // 以TopicPartition爲key, offsetAndMetadat組成request中的數據
        }

        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
                this.generation,
                this.memberId,
                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                offsetData); 

        log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);

        return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                .compose(new OffsetCommitResponseHandler(offsets));// 發送到coordinator
    }

2.3 處理response

從上面代碼最後一行能夠看出處理response的邏輯在OffsetCommitResponseHandler中。若是提交成功,那麼會將TopicPartitionState#position更新到TopicPartitionState#commitcode

private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {

        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public OffsetCommitResponse parse(ClientResponse response) {
            return new OffsetCommitResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            sensors.commitLatency.record(response.requestLatencyMs());
            Set<String> unauthorizedTopics = new HashSet<>();

            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); // this.offsets即sendOffsetCommitRequest中的入參,這點很關鍵
                long offset = offsetAndMetadata.offset();

                Errors error = Errors.forCode(entry.getValue());
                if (error == Errors.NONE) {
                    if (subscriptions.isAssigned(tp))
                        subscriptions.committed(tp, offsetAndMetadata); // 更新TopicPartitionState#committed爲發送的時候的TopicPartitionState#position
                } 
                // ...
            }
        }
    }

3 總結

  1. 下一條要消費的消息的offset就是TopicPartitionState#position
  2. 提交offset的時候即將TopicPartitionState#position發送到coordinator
  3. 提交成功後則將TopicPartitionState#committed更新爲TopicPartitionState#position
相關文章
相關標籤/搜索