本文主要聊一下spring for kafka的retryjava
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java
主要有兩個實現類RetryingAcknowledgingMessageListenerAdapter以及RetryingMessageListenerAdapterspring
public class RetryingAcknowledgingMessageListenerAdapter<K, V> extends AbstractRetryingMessageListenerAdapter<K, V, AcknowledgingMessageListener<K, V>> implements AcknowledgingMessageListener<K, V> { private final AcknowledgingMessageListener<K, V> delegate; /** * Construct an instance with the provided template and delegate. The exception will * be thrown to the container after retries are exhausted. * @param messageListener the listener delegate. * @param retryTemplate the template. */ public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> messageListener, RetryTemplate retryTemplate) { this(messageListener, retryTemplate, null); } /** * Construct an instance with the provided template, callback and delegate. * @param messageListener the listener delegate. * @param retryTemplate the template. * @param recoveryCallback the recovery callback; if null, the exception will be * thrown to the container after retries are exhausted. */ public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> messageListener, RetryTemplate retryTemplate, RecoveryCallback<? extends Object> recoveryCallback) { super(messageListener, retryTemplate, recoveryCallback); Assert.notNull(messageListener, "'messageListener' cannot be null"); this.delegate = messageListener; } @SuppressWarnings("unchecked") @Override public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment) { getRetryTemplate().execute(new RetryCallback<Object, KafkaException>() { @Override public Void doWithRetry(RetryContext context) throws KafkaException { context.setAttribute(CONTEXT_RECORD, record); context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment); RetryingAcknowledgingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment); return null; } }, (RecoveryCallback<Object>) getRecoveryCallback()); } }
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.javaide
public class RetryingMessageListenerAdapter<K, V> extends AbstractRetryingMessageListenerAdapter<K, V, MessageListener<K, V>> implements MessageListener<K, V> { /** * Construct an instance with the provided template and delegate. The exception will * be thrown to the container after retries are exhausted. * @param messageListener the delegate listener. * @param retryTemplate the template. */ public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate) { this(messageListener, retryTemplate, null); } /** * Construct an instance with the provided template, callback and delegate. * @param messageListener the delegate listener. * @param retryTemplate the template. * @param recoveryCallback the recovery callback; if null, the exception will be * thrown to the container after retries are exhausted. */ public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate, RecoveryCallback<? extends Object> recoveryCallback) { super(messageListener, retryTemplate, recoveryCallback); Assert.notNull(messageListener, "'messageListener' cannot be null"); } @SuppressWarnings("unchecked") @Override public void onMessage(final ConsumerRecord<K, V> record) { getRetryTemplate().execute(new RetryCallback<Object, KafkaException>() { @Override public Void doWithRetry(RetryContext context) throws KafkaException { context.setAttribute(CONTEXT_RECORD, record); RetryingMessageListenerAdapter.this.delegate.onMessage(record); return null; } }, (RecoveryCallback<Object>) getRecoveryCallback()); } }
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.javathis
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { Object messageListener = createMessageListener(container, messageConverter); Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener"); if (this.retryTemplate != null) { if (messageListener instanceof AcknowledgingMessageListener) { messageListener = new RetryingAcknowledgingMessageListenerAdapter<>( (AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate, this.recoveryCallback); } else { messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener, this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback); } } if (this.recordFilterStrategy != null) { if (messageListener instanceof AcknowledgingMessageListener) { messageListener = new FilteringAcknowledgingMessageListenerAdapter<>( (AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded); } else if (messageListener instanceof MessageListener) { messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener, this.recordFilterStrategy); } else if (messageListener instanceof BatchAcknowledgingMessageListener) { messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>( (BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded); } else if (messageListener instanceof BatchMessageListener) { messageListener = new FilteringBatchMessageListenerAdapter<>( (BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy); } } container.setupMessageListener(messageListener); }
若是retryTemplate不爲null的話,會先判斷是否是AcknowledgingMessageListener的子類,若是是則建立RetryingAcknowledgingMessageListenerAdapter,若是不是則建立RetryingMessageListenerAdaptercode
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java接口
protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(MessageConverter messageConverter) { if (isBatchListener()) { BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>( this.bean, this.method); if (messageConverter instanceof BatchMessageConverter) { messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter); } return messageListener; } else { RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(this.bean, this.method); if (messageConverter instanceof RecordMessageConverter) { messageListener.setMessageConverter((RecordMessageConverter) messageConverter); } return messageListener; } }
其中RecordMessagingMessageListenerAdapter實現了AcknowledgingMessageListener接口get
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V> implements MessageListener<K, V>, AcknowledgingMessageListener<K, V> { //...... }