本文主要研究一下storm-kafka-client的ProcessingGuaranteehtml
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutConfig.javajava
/** * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed, * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE. * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval. * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep. */ @InterfaceStability.Unstable public enum ProcessingGuarantee { /** * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined * interval. */ AT_LEAST_ONCE, /** * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done */ AT_MOST_ONCE, /** * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the * spout to control when commits occur. Commits asynchronously on the defined interval. */ NO_GUARANTEE, }
忽略interval配置
),於是該消息最多被處理一次storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaapache
public class KafkaSpout<K, V> extends BaseRichSpout { //Initial delay for the commit and subscription refresh timers public static final long TIMER_DELAY_MS = 500; // timer == null only if the processing guarantee is at-most-once private transient Timer commitTimer; // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, // or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once. private transient Map<TopicPartition, OffsetManager> offsetManagers; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit; //...... @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; // Spout internals this.collector = collector; // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); // Retries management retryService = kafkaSpoutConfig.getRetryService(); tupleListener = kafkaSpoutConfig.getTupleListener(); if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); offsetManagers = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = new HashMap<>(); commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee()); tupleListener.open(conf, context); if (canRegisterMetrics()) { registerMetric(); } LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } //...... }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/internal/Timer.javajson
public class Timer { private final long delay; private final long period; private final TimeUnit timeUnit; private final long periodNanos; private long start; //...... /** * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset * (re-initiated) and a new cycle will start. * * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false * otherwise. */ public boolean isExpiredResetOnTrue() { final boolean expired = Time.nanoTime() - start >= periodNanos; if (expired) { start = Time.nanoTime(); } return expired; } }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaapp
// ======== Next Tuple ======= @Override public void nextTuple() { try { if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { kafkaSpoutConfig.getSubscription().refreshAssignment(); } if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(kafkaConsumer.assignment()); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = createFetchedOffsetsMetadata(kafkaConsumer.assignment()); kafkaConsumer.commitAsync(offsetsToCommit, null); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } } PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo(); if (pollablePartitionsInfo.shouldPoll()) { try { setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo)); } catch (RetriableException e) { LOG.error("Failed to poll from kafka.", e); } } emitIfWaitingNotEmitted(); } catch (InterruptException e) { throwKafkaConsumerInterruptedException(); } }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java異步
private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) { Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (TopicPartition tp : assignedPartitions) { offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata())); } return offsetsToCommit; }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaasync
private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) { // Find offsets that are ready to be committed for every assigned topic partition final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>(); for (Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) { if (assignedPartitions.contains(entry.getKey())) { assignedOffsetManagers.put(entry.getKey(), entry.getValue()); } } final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) { final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata()); if (nextCommitOffset != null) { nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); } } // Commit offsets that are ready to be committed for every topic partition if (!nextCommitOffsets.isEmpty()) { kafkaConsumer.commitSync(nextCommitOffsets); LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) { //Update the OffsetManager for each committed partition, and update numUncommittedOffsets final TopicPartition tp = tpOffset.getKey(); long position = kafkaConsumer.position(tp); long committedOffset = tpOffset.getValue().offset(); if (position < committedOffset) { /* * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets. */ LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", position, committedOffset); kafkaConsumer.seek(tp, committedOffset); List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp); if (waitingToEmitForTp != null) { //Discard the pending records that are already committed List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>(); for (ConsumerRecord<K, V> record : waitingToEmitForTp) { if (record.offset() >= committedOffset) { filteredRecords.add(record); } } waitingToEmit.put(tp, filteredRecords); } } final OffsetManager offsetManager = assignedOffsetManagers.get(tp); offsetManager.commit(tpOffset.getValue()); LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp); } } else { LOG.trace("No offsets to commit. {}", this); } }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaide
private PollablePartitionsInfo getPollablePartitionsInfo() { if (isWaitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted."); return new PollablePartitionsInfo(Collections.<TopicPartition>emptySet(), Collections.<TopicPartition, Long>emptyMap()); } Set<TopicPartition> assignment = kafkaConsumer.assignment(); if (!isAtLeastOnceProcessing()) { return new PollablePartitionsInfo(assignment, Collections.<TopicPartition, Long>emptyMap()); } Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets(); Set<TopicPartition> pollablePartitions = new HashSet<>(); final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); for (TopicPartition tp : assignment) { OffsetManager offsetManager = offsetManagers.get(tp); int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets(); if (numUncommittedOffsets < maxUncommittedOffsets) { //Allow poll if the partition is not at the maxUncommittedOffsets limit pollablePartitions.add(tp); } else { long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets); Long earliestRetriableOffset = earliestRetriableOffsets.get(tp); if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) { //Allow poll if there are retriable tuples within the maxUncommittedOffsets limit pollablePartitions.add(tp); } else { LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp, numUncommittedOffsets, maxUncommittedOffsets); } } } return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets); }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javaoop
// ======== poll ========= private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) { doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets); Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment()); Iterator<TopicPartition> pausedIter = pausedPartitions.iterator(); while (pausedIter.hasNext()) { if (pollablePartitionsInfo.pollablePartitions.contains(pausedIter.next())) { pausedIter.remove(); } } try { kafkaConsumer.pause(pausedPartitions); final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka", numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = createFetchedOffsetsMetadata(kafkaConsumer.assignment()); kafkaConsumer.commitSync(offsetsToCommit); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } return consumerRecords; } finally { kafkaConsumer.resume(pausedPartitions); } } private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) { for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) { //Seek directly to the earliest retriable message for each retriable topic partition kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); } } private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets, ConsumerRecords<K, V> consumerRecords) { for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) { TopicPartition tp = entry.getKey(); List<ConsumerRecord<K, V>> records = consumerRecords.records(tp); if (!records.isEmpty()) { ConsumerRecord<K, V> record = records.get(0); long seekOffset = entry.getValue(); long earliestReceivedOffset = record.offset(); if (seekOffset < earliestReceivedOffset) { //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away. //Ack up to the first offset received if the record is not already acked or currently in the topology for (long i = seekOffset; i < earliestReceivedOffset; i++) { KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null)); if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) { LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp); retryService.remove(msgId); emitted.add(msgId); ack(msgId); } } } } } }
最後設置到waitingToEmit
),最後再resume以前pause的partitions(經過這樣避免拉取不符合提交條件的partitions的消息
);storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.javafetch
private void emitIfWaitingNotEmitted() { Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator(); outerLoop: while (waitingToEmitIter.hasNext()) { List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next(); while (!waitingToEmitForTp.isEmpty()) { final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0)); if (emittedTuple) { break outerLoop; } } waitingToEmitIter.remove(); } }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
/** * Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried. * * @param record to be emitted * @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail */ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = retryService.getMessageId(record); if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); if (isAtLeastOnceProcessing() && committedOffset != null && committedOffset.offset() > record.offset() && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) { // Ensures that after a topology with this id is started, the consumer fetch // position never falls behind the committed offset (STORM-2844) throw new IllegalStateException("Attempting to emit a message that has already been committed." + " This should never occur when using the at-least-once processing guarantee."); } final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId); // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried if (!isScheduled || retryService.isReady(msgId)) { final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; if (!isAtLeastOnceProcessing()) { if (kafkaSpoutConfig.isTupleTrackingEnforced()) { collector.emit(stream, tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } else { collector.emit(stream, tuple); LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); } } else { emitted.add(msgId); offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule. retryService.remove(msgId); } collector.emit(stream, tuple, msgId); tupleListener.onEmit(tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } return true; } } else { /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately * to allow its offset to be commited to Kafka*/ LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record); if (isAtLeastOnceProcessing()) { msgId.setNullTuple(true); offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); ack(msgId); } } } return false; }
已經acked等待commit
)以及emitted(已經emit等待ack
)進行去重判斷,若是這二者都不包含,才進行emit或者retrystorm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Ack ======= @Override public void ack(Object messageId) { if (!isAtLeastOnceProcessing()) { return; } // Only need to keep track of acked tuples if commits to Kafka are controlled by // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (msgId.isNullTuple()) { //a null tuple should be added to the ack list since by definition is a direct ack offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); tupleListener.onAck(msgId); return; } if (!emitted.contains(msgId)) { LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " + "came from a topic-partition that this consumer group instance is no longer tracking " + "due to rebalance/partition reassignment. No action taken.", msgId); } else { Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." + " This should never occur barring errors in the RetryService implementation or the spout code."); offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } tupleListener.onAck(msgId); }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Fail ======= @Override public void fail(Object messageId) { if (!isAtLeastOnceProcessing()) { return; } // Only need to keep track of failed tuples if commits to Kafka are controlled by // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { LOG.debug("Received fail for tuple this spout is no longer tracking." + " Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." + " This should never occur barring errors in the RetryService implementation or the spout code."); msgId.incrementNumFails(); if (!retryService.schedule(msgId)) { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); // this tuple should be removed from emitted only inside the ack() method. This is to ensure // that the OffsetManager for that TopicPartition is updated and allows commit progression tupleListener.onMaxRetryReached(msgId); ack(msgId); } else { tupleListener.onRetry(msgId); emitted.remove(msgId); } }
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator(); //This class assumes that there is at most one retry schedule per message id in this set at a time. private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); /** * Comparator ordering by timestamp */ private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> { @Override public int compare(RetrySchedule entry1, RetrySchedule entry2) { int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos()); if(result == 0) { //TreeSet uses compareTo instead of equals() for the Set contract //Ensure that we can save two retry schedules with the same timestamp result = entry1.hashCode() - entry2.hashCode(); } return result; } } @Override public boolean schedule(KafkaSpoutMessageId msgId) { if (msgId.numFails() > maxRetries) { LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); return false; } else { //Remove existing schedule for the message id remove(msgId); final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); retrySchedules.add(retrySchedule); toRetryMsgs.add(msgId); LOG.debug("Scheduled. {}", retrySchedule); LOG.trace("Current state {}", retrySchedules); return true; } } @Override public Map<TopicPartition, Long> earliestRetriableOffsets() { final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>(); final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { final KafkaSpoutMessageId msgId = retrySchedule.msgId; final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition()); final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage); if(currentLowestOffset != null) { tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset())); } else { tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset()); } } else { break; // Stop searching as soon as passed current time } } LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset); return tpToEarliestRetriableOffset; } @Override public boolean isReady(KafkaSpoutMessageId msgId) { boolean retry = false; if (isScheduled(msgId)) { final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { if (retrySchedule.msgId.equals(msgId)) { retry = true; LOG.debug("Found entry to retry {}", retrySchedule); break; //Stop searching if the message is known to be ready for retry } } else { LOG.debug("Entry to retry not found {}", retrySchedule); break; // Stop searching as soon as passed current time } } } return retry; }
storm-kafka-client主要針對kafka0.10及以上版本,它引入了ProcessingGuarantee枚舉,該枚舉有三個值,分別是
已經emitted但還沒有ack
),offsetManagers(已經ack但還沒有commit
)以及fail須要重試的retrySchedules忽略interval配置
),於是該消息最多被處理一次都依賴commitTimer
),不一樣的是它是異步已經emitted但還沒有ack
);在fail方法將msgId放入到retryService進行重試(這個是ProcessingGuarantee.NO_GUARANTEE所沒有的
);它跟ProcessingGuarantee.NO_GUARANTEE同樣是依賴commitTimer,在initerval期間提交offset信息,不一樣的是它是commitSync,即同步提交,並且提交的是已經acked的消息;而ProcessingGuarantee.NO_GUARANTEE是異步提交,並且提交的是offset是無論是否在storm spout已經ack,而是以consumer的poll爲準的已經ack等待commit
)以及emitted(已經emit等待ack
)進行去重判斷是否須要調用collector.emit;對於ProcessingGuarantee.AT_LEAST_ONCE類型,這裏不只調用emit方法,還須要維護offsetManagers、emitted及重試信息相關狀態,而後回調tupleListener.onEmit方法;對於非ProcessingGuarantee.AT_LEAST_ONCE類型這裏僅僅是emit。