[case45]聊聊storm-kafka-client的ProcessingGuarantee

本文主要研究一下storm-kafka-client的ProcessingGuaranteehtml

ProcessingGuarantee

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutConfig.javajava

/**
     * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
     */
    @InterfaceStability.Unstable
    public enum ProcessingGuarantee {
        /**
         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
         * interval.
         */
        AT_LEAST_ONCE,
        /**
         * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
         * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
         * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
         */
        AT_MOST_ONCE,
        /**
         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
         * spout to control when commits occur. Commits asynchronously on the defined interval.
         */
        NO_GUARANTEE,
    }
  • storm-kafka-client與舊版的storm-kafka不一樣之一就是引入了ProcessingGuarantee,是的整個代碼更爲清晰
  • ProcessingGuarantee.AT_LEAST_ONCE就是開啓ack的版本,它相似kafka client的auto commit,在指定interval按期commit
  • ProcessingGuarantee.AT_MOST_ONCE,它就無論ack了,在polled out消息的時候同步commit(忽略interval配置),於是該消息最多被處理一次
  • ProcessingGuarantee.NO_GUARANTEE,這個也是無論ack的,不過它跟ProcessingGuarantee.AT_LEAST_ONCE相似,是在指定interval按期commit,不一樣的是它是異步提交

KafkaSpout.open

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaapache

public class KafkaSpout<K, V> extends BaseRichSpout {

    //Initial delay for the commit and subscription refresh timers
    public static final long TIMER_DELAY_MS = 500;

    // timer == null only if the processing guarantee is at-most-once
    private transient Timer commitTimer;

    // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
    // or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once.
    private transient Map<TopicPartition, OffsetManager> offsetManagers;

    // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
    private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;

    //......

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.context = context;

        // Spout internals
        this.collector = collector;

        // Offset management
        firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();

        // Retries management
        retryService = kafkaSpoutConfig.getRetryService();

        tupleListener = kafkaSpoutConfig.getTupleListener();

        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
            // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
            commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
        refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);

        offsetManagers = new HashMap<>();
        emitted = new HashSet<>();
        waitingToEmit = new HashMap<>();
        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());

        tupleListener.open(conf, context);
        if (canRegisterMetrics()) {
            registerMetric();
        }

        LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
    }

    //......

}
  • open的時候判斷,只要不是ProcessingGuarantee.AT_MOST_ONCE,那麼就初始化commitTimer,period值爲kafkaSpoutConfig.getPartitionRefreshPeriodMs(),若是沒有設置,默認是2000ms

Timer.isExpiredResetOnTrue

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/internal/Timer.javajson

public class Timer {
    private final long delay;
    private final long period;
    private final TimeUnit timeUnit;
    private final long periodNanos;
    private long start;

    //......

    /**
     * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
     * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
     * (re-initiated) and a new cycle will start.
     *
     * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
     * otherwise.
     */
    public boolean isExpiredResetOnTrue() {
        final boolean expired = Time.nanoTime() - start >= periodNanos;
        if (expired) {
            start = Time.nanoTime();
        }
        return expired;
    }
}
  • Timer有一個重要的方法是isExpiredResetOnTrue,用於判斷「調度時間」是否到了,這個在nextTuple裏頭有調用到

KafkaSpout.nextTuple

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaapp

// ======== Next Tuple =======
    @Override
    public void nextTuple() {
        try {
            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                kafkaSpoutConfig.getSubscription().refreshAssignment();
            }

            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
                if (isAtLeastOnceProcessing()) {
                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                    kafkaConsumer.commitAsync(offsetsToCommit, null);
                    LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
                }
            }

            PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
            if (pollablePartitionsInfo.shouldPoll()) {
                try {
                    setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
                } catch (RetriableException e) {
                    LOG.error("Failed to poll from kafka.", e);
                }
            }

            emitIfWaitingNotEmitted();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }
  • nextTuple先判斷要不要刷新subscription,而後就判斷commitTimer,判斷是否應該提交commit,這裏是調用commitTimer.isExpiredResetOnTrue()
  • ProcessingGuarantee類型若是是NO_GUARANTEE,則調用createFetchedOffsetsMetadata建立待提交的offset及partition信息,而後調用kafkaConsumer.commitAsync進行異步提交;
  • ProcessingGuarantee類型若是是AT_LEAST_ONCE,則調用commitOffsetsForAckedTuples進行提交
  • 處理完offset提交以後,經過getPollablePartitionsInfo獲取PollablePartitionsInfo,若是shouldPoll則調用pollKafkaBroker拉數據,而後經過setWaitingToEmit方法將拉取的數據放入waitingToEmit
  • 最後調用emitIfWaitingNotEmitted方法,當有數據的時候就進行emit或者retry,沒有數據時經過while循環進行waiting

createFetchedOffsetsMetadata

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java異步

private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (TopicPartition tp : assignedPartitions) {
            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
        }
        return offsetsToCommit;
    }
  • 這裏根據kafkaConsumer.assignment()的信息,經過kafkaConsumer.position(tp)提取下一步將要fetch的offset位置,經過commitMetadataManager.getCommitMetadata()提取CommitMetadata的json串做爲元信息

commitOffsetsForAckedTuples

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaasync

private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
        // Find offsets that are ready to be committed for every assigned topic partition
        final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>();
        for (Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
            if (assignedPartitions.contains(entry.getKey())) {
                assignedOffsetManagers.put(entry.getKey(), entry.getValue());
            }
        }

        final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
            if (nextCommitOffset != null) {
                nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
            }
        }

        // Commit offsets that are ready to be committed for every topic partition
        if (!nextCommitOffsets.isEmpty()) {
            kafkaConsumer.commitSync(nextCommitOffsets);
            LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
            // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
            // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
            for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
                //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
                final TopicPartition tp = tpOffset.getKey();
                long position = kafkaConsumer.position(tp);
                long committedOffset = tpOffset.getValue().offset();
                if (position < committedOffset) {
                    /*
                     * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
                     * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
                     * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
                     * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
                     */
                    LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
                        position, committedOffset);
                    kafkaConsumer.seek(tp, committedOffset);
                    List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
                    if (waitingToEmitForTp != null) {
                        //Discard the pending records that are already committed
                        List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>();
                        for (ConsumerRecord<K, V> record : waitingToEmitForTp) {
                            if (record.offset() >= committedOffset) {
                                filteredRecords.add(record);
                            }
                        }
                        waitingToEmit.put(tp, filteredRecords);
                    }
                }

                final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
                offsetManager.commit(tpOffset.getValue());
                LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
            }
        } else {
            LOG.trace("No offsets to commit. {}", this);
        }
    }
  • 這裏首先經過offsetManagers,獲取已經ack的等待commit的partition以及msgId信息,若是是ProcessingGuarantee.AT_MOST_ONCE則該集合爲空
  • 以後根據CommitMetadata經過OffsetManager.findNextCommitOffset獲取這一批待commit的消息的offset
  • 而後調用kafkaConsumer.commitSync同步提交offset,以後更新本地的OffsetManager的committed相關信息

getPollablePartitionsInfo

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaide

private PollablePartitionsInfo getPollablePartitionsInfo() {
        if (isWaitingToEmit()) {
            LOG.debug("Not polling. Tuples waiting to be emitted.");
            return new PollablePartitionsInfo(Collections.<TopicPartition>emptySet(), Collections.<TopicPartition, Long>emptyMap());
        }

        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        if (!isAtLeastOnceProcessing()) {
            return new PollablePartitionsInfo(assignment, Collections.<TopicPartition, Long>emptyMap());
        }

        Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
        Set<TopicPartition> pollablePartitions = new HashSet<>();
        final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
        for (TopicPartition tp : assignment) {
            OffsetManager offsetManager = offsetManagers.get(tp);
            int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
            if (numUncommittedOffsets < maxUncommittedOffsets) {
                //Allow poll if the partition is not at the maxUncommittedOffsets limit
                pollablePartitions.add(tp);
            } else {
                long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
                Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
                if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
                    //Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
                    pollablePartitions.add(tp);
                } else {
                    LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
                        numUncommittedOffsets, maxUncommittedOffsets);
                }
            }
        }
        return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
    }
  • 這裏對於不是ProcessingGuarantee.AT_LEAST_ONCE類型的,則直接根據kafkaConsumer.assignment()信息返回
  • 若是是ProcessingGuarantee.AT_LEAST_ONCE類型類型的,這裏會獲取retryService.earliestRetriableOffsets(),把fail相關的offset信息整合進去
  • 這裏有一個maxUncommittedOffsets參數,在numUncommittedOffsets<maxUncommittedOffsets時會進行重試,若是大於等於maxUncommittedOffsets,則會進一步判斷,若是是earliestRetriableOffset小於等於offsetAtLimit,那麼也加入重試

pollKafkaBroker

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaoop

// ======== poll =========
    private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
        doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
        Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
        Iterator<TopicPartition> pausedIter = pausedPartitions.iterator();
        while (pausedIter.hasNext()) {
            if (pollablePartitionsInfo.pollablePartitions.contains(pausedIter.next())) {
                pausedIter.remove();
            }
        }
        try {
            kafkaConsumer.pause(pausedPartitions);
            final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
            ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
            final int numPolledRecords = consumerRecords.count();
            LOG.debug("Polled [{}] records from Kafka",
                numPolledRecords);
            if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                //Commit polled records immediately to ensure delivery is at-most-once.
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                kafkaConsumer.commitSync(offsetsToCommit);
                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
            }
            return consumerRecords;
        } finally {
            kafkaConsumer.resume(pausedPartitions);
        }
    }

    private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
        for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
            //Seek directly to the earliest retriable message for each retriable topic partition
            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
        }
    }

    private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
        ConsumerRecords<K, V> consumerRecords) {
        for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
            if (!records.isEmpty()) {
                ConsumerRecord<K, V> record = records.get(0);
                long seekOffset = entry.getValue();
                long earliestReceivedOffset = record.offset();
                if (seekOffset < earliestReceivedOffset) {
                    //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
                    //Ack up to the first offset received if the record is not already acked or currently in the topology
                    for (long i = seekOffset; i < earliestReceivedOffset; i++) {
                        KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null));
                        if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
                            LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp);
                            retryService.remove(msgId);
                            emitted.add(msgId);
                            ack(msgId);
                        }
                    }
                }
            }
        }
    }
  • 若是PollablePartitionsInfo的pollablePartitions不爲空,則會調用pollKafkaBroker拉取消息
  • 首先調用了doSeekRetriableTopicPartitions,根據要重試的partition及offset信息,進行seek操做,對每一個parition移動到要重試的最先的offset位置
  • 拉取消息的時候,先pause不符合maxUncommitted等條件的paritions,而後進行poll消息,poll拉取消息以後判斷若是是ProcessingGuarantee.AT_MOST_ONCE類型的,則調用kafkaConsumer.commitSync同步提交,而後返回拉取的記錄(最後設置到waitingToEmit),最後再resume以前pause的partitions(經過這樣避免拉取不符合提交條件的partitions的消息);
  • 注意這裏的pollablePartitionsInfo是根據getPollablePartitionsInfo()獲取的,它是遍歷kafkaConsumer.assignment()根據offsetManager及maxUncommittedOffsets等相關參數進行過濾,所以能夠認爲pollablePartitionsInfo.pollablePartitions是kafkaConsumer.assignment()的子集,而pausedPartitions是根據kafkaConsumer.assignment()過濾掉pollablePartitionsInfo.pollablePartitions得來的,於是pausedPartitions就是getPollablePartitionsInfo()中不知足條件被剔除的partitions,針對這些partitions,先pause再調用poll,最後再resume,也就是這次poll不會從pausedPartitions拉取消息
  • 在poll消息以後還有一個動做就是調用ackRetriableOffsetsIfCompactedAway,針對已經compacted的消息進行ack處理

emitIfWaitingNotEmitted

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javafetch

private void emitIfWaitingNotEmitted() {
        Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator();
        outerLoop:
        while (waitingToEmitIter.hasNext()) {
            List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
            while (!waitingToEmitForTp.isEmpty()) {
                final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));
                if (emittedTuple) {
                    break outerLoop;
                }
            }
            waitingToEmitIter.remove();
        }
    }
  • emitIfWaitingNotEmitted主要是判斷waitingToEmit有無數據,有則取出來觸發emitOrRetryTuple,沒有則不斷循環進行waiting

emitOrRetryTuple

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java

/**
     * Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried.
     *
     * @param record to be emitted
     * @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail
     */
    private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
        final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);

        if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
        } else {
            final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
            if (isAtLeastOnceProcessing()
                && committedOffset != null 
                && committedOffset.offset() > record.offset()
                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                // Ensures that after a topology with this id is started, the consumer fetch
                // position never falls behind the committed offset (STORM-2844)
                throw new IllegalStateException("Attempting to emit a message that has already been committed."
                    + " This should never occur when using the at-least-once processing guarantee.");
            }

            final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
            if (isEmitTuple(tuple)) {
                final boolean isScheduled = retryService.isScheduled(msgId);
                // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
                if (!isScheduled || retryService.isReady(msgId)) {
                    final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;

                    if (!isAtLeastOnceProcessing()) {
                        if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
                            collector.emit(stream, tuple, msgId);
                            LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                        } else {
                            collector.emit(stream, tuple);
                            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
                        }
                    } else {
                        emitted.add(msgId);
                        offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                        if (isScheduled) {  // Was scheduled for retry and re-emitted, so remove from schedule.
                            retryService.remove(msgId);
                        }
                        collector.emit(stream, tuple, msgId);
                        tupleListener.onEmit(tuple, msgId);
                        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                    }
                    return true;
                }
            } else {
                /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
                * to allow its offset to be commited to Kafka*/
                LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                if (isAtLeastOnceProcessing()) {
                    msgId.setNullTuple(true);
                    offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                    ack(msgId);
                }
            }
        }
        return false;
    }
  • emitOrRetryTuple是整個nextTuple的核心,這裏包含了emit操做以及retry操做
  • 因爲針對fail的消息,是使用seek方法進行從新拉取的,於是這裏要使用offsetManagers(已經acked等待commit)以及emitted(已經emit等待ack)進行去重判斷,若是這二者都不包含,才進行emit或者retry
  • 進行emit處理時,先經過retryService.isScheduled(msgId)判斷是不是失敗重試的,若是不是失敗重試的,或者是失敗重試的且已經到期了,那麼就是進行下面的emit處理
  • 針對ProcessingGuarantee.AT_LEAST_ONCE類型的,這裏要維護emitted以及offsetManagers,而後進行emit操做,回調tupleListener.onEmit(tuple, msgId)方法;若是不是ProcessingGuarantee.AT_LEAST_ONCE類型的,則僅僅是進行collector.emit操做

KafkaSpout.ack

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java

// ======== Ack =======
    @Override
    public void ack(Object messageId) {
        if (!isAtLeastOnceProcessing()) {
            return;
        }

        // Only need to keep track of acked tuples if commits to Kafka are controlled by
        // tuple acks, which happens only for at-least-once processing semantics
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;

        if (msgId.isNullTuple()) {
            //a null tuple should be added to the ack list since by definition is a direct ack
            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
            LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
            tupleListener.onAck(msgId);
            return;
        }

        if (!emitted.contains(msgId)) {
            LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
                        + "came from a topic-partition that this consumer group instance is no longer tracking "
                        + "due to rebalance/partition reassignment. No action taken.", msgId);
        } else {
            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
                        + " This should never occur barring errors in the RetryService implementation or the spout code.");
            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
            emitted.remove(msgId);
        }
        tupleListener.onAck(msgId);
    }
  • ack的時候,若是不是ProcessingGuarantee.AT_LEAST_ONCE類型,就立馬返回
  • 以後將已經acked的msgId放入到offsetManagers這個map中,等待在nextTuple中進行commit,而後將其從emitted中移除
  • 這裏有一個emitted的去重判斷,若是不是以前emit過的就不處理,這種一般是rebalance/partition reassignment引發的

KafkaSpout.fail

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java

// ======== Fail =======
    @Override
    public void fail(Object messageId) {
        if (!isAtLeastOnceProcessing()) {
            return;
        }
        // Only need to keep track of failed tuples if commits to Kafka are controlled by
        // tuple acks, which happens only for at-least-once processing semantics
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
        if (!emitted.contains(msgId)) {
            LOG.debug("Received fail for tuple this spout is no longer tracking."
                + " Partitions may have been reassigned. Ignoring message [{}]", msgId);
            return;
        }
        Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
            + " This should never occur barring errors in the RetryService implementation or the spout code.");

        msgId.incrementNumFails();

        if (!retryService.schedule(msgId)) {
            LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
            // this tuple should be removed from emitted only inside the ack() method. This is to ensure
            // that the OffsetManager for that TopicPartition is updated and allows commit progression
            tupleListener.onMaxRetryReached(msgId);
            ack(msgId);
        } else {
            tupleListener.onRetry(msgId);
            emitted.remove(msgId);
        }
    }
  • fail的時候也先判斷,若是不是ProcessingGuarantee.AT_LEAST_ONCE類型,就立馬返回
  • 而後判斷emitted中是否存在,若是不存在,則馬上返回,這一般是partition reassigned引發的
  • fail的時候,調用retryService.schedule(msgId),若是不成功,則觸發tupleListener.onMaxRetryReached,而後進行ack;若是成功則調用tupleListener.onRetry回調,而後從emitted中刪除

KafkaSpoutRetryExponentialBackoff.schedule

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java

private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();

    //This class assumes that there is at most one retry schedule per message id in this set at a time.
    private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);

    /**
     * Comparator ordering by timestamp 
     */
    private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
        @Override
        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
            
            if(result == 0) {
                //TreeSet uses compareTo instead of equals() for the Set contract
                //Ensure that we can save two retry schedules with the same timestamp
                result = entry1.hashCode() - entry2.hashCode();
            }
            return result;
        }
    }

    @Override
    public boolean schedule(KafkaSpoutMessageId msgId) {
        if (msgId.numFails() > maxRetries) {
            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
            return false;
        } else {
            //Remove existing schedule for the message id
            remove(msgId);
            final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
            retrySchedules.add(retrySchedule);
            toRetryMsgs.add(msgId);
            LOG.debug("Scheduled. {}", retrySchedule);
            LOG.trace("Current state {}", retrySchedules);
            return true;
        }
    }

    @Override
    public Map<TopicPartition, Long> earliestRetriableOffsets() {
        final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
        final long currentTimeNanos = Time.nanoTime();
        for (RetrySchedule retrySchedule : retrySchedules) {
            if (retrySchedule.retry(currentTimeNanos)) {
                final KafkaSpoutMessageId msgId = retrySchedule.msgId;
                final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
                final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
                if(currentLowestOffset != null) {
                    tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
                } else {
                    tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
                }
            } else {
                break;  // Stop searching as soon as passed current time
            }
        }
        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
        return tpToEarliestRetriableOffset;
    }

    @Override
    public boolean isReady(KafkaSpoutMessageId msgId) {
        boolean retry = false;
        if (isScheduled(msgId)) {
            final long currentTimeNanos = Time.nanoTime();
            for (RetrySchedule retrySchedule : retrySchedules) {
                if (retrySchedule.retry(currentTimeNanos)) {
                    if (retrySchedule.msgId.equals(msgId)) {
                        retry = true;
                        LOG.debug("Found entry to retry {}", retrySchedule);
                        break; //Stop searching if the message is known to be ready for retry
                    }
                } else {
                    LOG.debug("Entry to retry not found {}", retrySchedule);
                    break;  // Stop searching as soon as passed current time
                }
            }
        }
        return retry;
    }
  • schedule首先判斷失敗次數是否超過maxRetries,若是超過了則返回false,表示再也不調度了,以後KafkaSpout在fail方法回調tupleListener.onMaxRetryReached方法,而後進行ack,表示再也不處理了
  • 沒有超過maxRetries的話,則建立retrySchedule信息,而後添加到retrySchedules中;retrySchedules是一個TreeSet,默認使用RetryEntryTimeStampComparator,根據nextRetryTimeNanos進行排序,若是相等則按hashCode進行排序
  • earliestRetriableOffsets以及isReady都會用到retrySchedules的信息

小結

  • storm-kafka-client主要針對kafka0.10及以上版本,它引入了ProcessingGuarantee枚舉,該枚舉有三個值,分別是

    • ProcessingGuarantee.AT_LEAST_ONCE就是開啓ack的版本,它相似kafka client的auto commit,在指定interval按期commit;它會維護已經emitted(已經emitted但還沒有ack),offsetManagers(已經ack但還沒有commit)以及fail須要重試的retrySchedules
    • ProcessingGuarantee.AT_MOST_ONCE,它就無論ack了,在polled out消息的時候同步commit(忽略interval配置),於是該消息最多被處理一次
    • ProcessingGuarantee.NO_GUARANTEE,這個也是無論ack的,不過它跟ProcessingGuarantee.AT_LEAST_ONCE相似,是在指定interval按期commit(都依賴commitTimer),不一樣的是它是異步
  • ProcessingGuarantee.AT_LEAST_ONCE它結合了storm的ack機制,在spout的ack方法維護emitted(已經emitted但還沒有ack);在fail方法將msgId放入到retryService進行重試(這個是ProcessingGuarantee.NO_GUARANTEE所沒有的);它跟ProcessingGuarantee.NO_GUARANTEE同樣是依賴commitTimer,在initerval期間提交offset信息,不一樣的是它是commitSync,即同步提交,並且提交的是已經acked的消息;而ProcessingGuarantee.NO_GUARANTEE是異步提交,並且提交的是offset是無論是否在storm spout已經ack,而是以consumer的poll爲準的
  • ProcessingGuarantee.AT_MOST_ONCE是在pollKafkaBroker方法裏頭,在調用完kafkaConsumer.poll以後,調用kafkaConsumer.commitSync進行同步提交commit;它是同步提交,並且不依賴commitTimer,即不是interval提交offset
  • ProcessingGuarantee.NO_GUARANTEE在nextTuple中判斷須要commit的時候,調用kafkaConsumer.commitAsync進行異步提交,它跟ProcessingGuarantee.AT_LEAST_ONCE同樣,都依賴commitTimer,在initerval期間提交offset,可是它是異步提交,而ProcessingGuarantee.AT_LEAST_ONCE是同步提交
  • nextTuple()方法會pollKafkaBroker會調用kafkaConsumer.poll方法拉取消息,而後將拉取到的消息放入waitingToEmit,以後調用emitIfWaitingNotEmitted方法進行emit或者waiting,若是emit則是調用emitOrRetryTuple方法;因爲pollKafkaBroker會執行seek操做將offset移動到每一個parition中失敗的offset中最小的位置,從那個位置開始從新拉取消息,拉取消息調用了kafkaConsumer.poll方法,KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE是在這裏進行kafkaConsumer.commitSync同步提交offset的;因爲包含了要重試的消息,emitOrRetryTuple這裏要根據offsetManagers(已經ack等待commit)以及emitted(已經emit等待ack)進行去重判斷是否須要調用collector.emit;對於ProcessingGuarantee.AT_LEAST_ONCE類型,這裏不只調用emit方法,還須要維護offsetManagers、emitted及重試信息相關狀態,而後回調tupleListener.onEmit方法;對於非ProcessingGuarantee.AT_LEAST_ONCE類型這裏僅僅是emit。

doc

相關文章
相關標籤/搜索