本文主要講述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false狀況下,AckMode的選項html
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.java$AckModejava
/** * The offset commit behavior enumeration. */ public enum AckMode { /** * Commit after each record is processed by the listener. */ RECORD, /** * Commit whatever has already been processed before the next poll. */ BATCH, /** * Commit pending updates after * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed. */ TIME, /** * Commit pending updates after * {@link ContainerProperties#setAckCount(int) ackCount} has been * exceeded. */ COUNT, /** * Commit pending updates after * {@link ContainerProperties#setAckCount(int) ackCount} has been * exceeded or after {@link ContainerProperties#setAckTime(long) * ackTime} has elapsed. */ COUNT_TIME, /** * User takes responsibility for acks using an * {@link AcknowledgingMessageListener}. */ MANUAL, /** * User takes responsibility for acks using an * {@link AcknowledgingMessageListener}. The consumer is woken to * immediately process the commit. */ MANUAL_IMMEDIATE, }
默認
)跟auto commit interval有什麼區別呢?
)spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.javaspring
@Override public void run() { if (this.theListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.theListener).registerSeekCallback(this); } this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) { initPartitionsIfNeeded(); // we start the invoker here as there will be no rebalance calls to // trigger it, but only if the container is not set to autocommit // otherwise we will process records on a separate thread if (!this.autoCommit) { startInvoker(); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try { if (!this.autoCommit) { processCommits(); } processSeeks(); if (this.logger.isTraceEnabled()) { this.logger.trace("Polling (paused=" + this.paused + ")..."); } ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue if (this.autoCommit) { invokeListener(records); } else { if (sendToListener(records)) { if (this.assignedPartitions != null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused = true; this.unsent = records; } } } } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive); lastAlertAt = now; if (this.theListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } this.unsent = checkPause(this.unsent); } catch (WakeupException e) { this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getGenericErrorHandler() != null) { this.containerProperties.getGenericErrorHandler().handle(e, null); } else { this.logger.error("Container exception", e); } } } if (this.listenerInvokerFuture != null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info("Consumer stopped"); } }
這裏while循環每次都判斷是否auto commit,若是不是則processCommitside
private void processCommits() { handleAcks(); this.count += this.acks.size(); long now; AckMode ackMode = this.containerProperties.getAckMode(); if (!this.isManualImmediateAck) { if (!this.isManualAck) { updatePendingOffsets(); } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AckMode.COUNT) && countExceeded)) { if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) { this.logger.debug("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount()); } commitIfNecessary(); this.count = 0; } else { now = System.currentTimeMillis(); boolean elapsed = now - this.last > this.containerProperties.getAckTime(); if (ackMode.equals(AckMode.TIME) && elapsed) { if (this.logger.isDebugEnabled()) { this.logger.debug("Committing in AckMode.TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); } commitIfNecessary(); this.last = now; } else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) { if (this.logger.isDebugEnabled()) { if (elapsed) { this.logger.debug("Committing in AckMode.COUNT_TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); } else { this.logger.debug("Committing in AckMode.COUNT_TIME " + "because count " + this.count + " exceeds configured limit of" + this.containerProperties.getAckCount()); } } commitIfNecessary(); this.last = now; this.count = 0; } } } }
private void handleAcks() { ConsumerRecord<K, V> record = this.acks.poll(); while (record != null) { if (this.logger.isTraceEnabled()) { this.logger.trace("Ack: " + record); } processAck(record); record = this.acks.poll(); } } private void processAck(ConsumerRecord<K, V> record) { if (ListenerConsumer.this.isManualImmediateAck) { try { ackImmediate(record); } catch (WakeupException e) { // ignore - not polling } } else { addOffset(record); } }
這裏能夠看到,若是不是isManualImmediateAck,則每次是累加到offsets的map中this
private void commitIfNecessary() { Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>(); for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) { for (Entry<Integer, Long> offset : entry.getValue().entrySet()) { commits.put(new TopicPartition(entry.getKey(), offset.getKey()), new OffsetAndMetadata(offset.getValue() + 1)); } } this.offsets.clear(); if (this.logger.isDebugEnabled()) { this.logger.debug("Commit list: " + commits); } if (!commits.isEmpty()) { if (this.logger.isDebugEnabled()) { this.logger.debug("Committing: " + commits); } try { if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(commits); } else { this.consumer.commitAsync(commits, this.commitCallback); } } catch (WakeupException e) { // ignore - not polling if (this.logger.isDebugEnabled()) { this.logger.debug("Woken up during commit"); } } } }
這裏會從offsets的map組裝出commits,而後去提交(commitSync或者commitAsync),而後clear掉offsetsdebug
@KafkaListener(topics = "k010") public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception { LOGGER.info(cr.toString()); ack.acknowledge(); }
方法參數裏頭傳遞Acknowledgment,而後手工ack
前提要配置AckModecode
instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);