本文主要研究一下artemis JMSBridge的QualityOfServiceModejava
activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/QualityOfServiceMode.javagit
public enum QualityOfServiceMode { AT_MOST_ONCE(0), DUPLICATES_OK(1), ONCE_AND_ONLY_ONCE(2); private final int value; QualityOfServiceMode(final int value) { this.value = value; } public int intValue() { return value; } public static QualityOfServiceMode valueOf(final int value) { if (value == AT_MOST_ONCE.value) { return AT_MOST_ONCE; } if (value == DUPLICATES_OK.value) { return DUPLICATES_OK; } if (value == ONCE_AND_ONLY_ONCE.value) { return ONCE_AND_ONLY_ONCE; } throw new IllegalArgumentException("invalid QualityOfServiceMode value: " + value); } }
activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.javagithub
public final class JMSBridgeImpl implements JMSBridge { //...... private void sendBatchNonTransacted() { try { if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || (qualityOfServiceMode == QualityOfServiceMode.AT_MOST_ONCE && maxBatchSize > 1)) { // We client ack before sending if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session"); } messages.getLast().acknowledge(); if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session"); } } boolean exHappened; do { exHappened = false; try { sendMessages(); } catch (TransactionRolledbackException e) { ActiveMQJMSBridgeLogger.LOGGER.transactionRolledBack(e); exHappened = true; } } while (exHappened); if (maxBatchSize > 1) { // The sending session is transacted - we need to commit it if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Committing target session"); } targetSession.commit(); if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Committed target session"); } } if (qualityOfServiceMode == QualityOfServiceMode.DUPLICATES_OK) { // We client ack after sending // Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here // For a slightly less strong delivery guarantee if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session"); } messages.getLast().acknowledge(); if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session"); } } } catch (Exception e) { if (!stopping) { ActiveMQJMSBridgeLogger.LOGGER.bridgeAckError(e, bridgeName); } // We don't call failure otherwise failover would be broken with ActiveMQ // We let the ExceptionListener to deal with failures if (connectedSource) { try { sourceSession.recover(); } catch (Throwable ignored) { } } } finally { // Clear the messages messages.clear(); } } //...... }
QualityOfServiceMode定義了三個枚舉值,分別是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE;JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode爲ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE(且maxBatchSize大於1
)的時候在sendMessages以前先執行ack(若是異常在ack與sendMessages之間,則消息可能丟失;因爲ONCE_AND_ONLY_ONCE須要local transaction或者JTA處理,在沒有事務狀況下與AT_MOST_ONCE相同
);而對於qualityOfServiceMode爲DUPLICATES_OK的在sendMessages以後執行ack(若是異常在sendMessages與ack之間,則異常以後,client端因爲沒有收到ack會再次發送消息,可能形成重複
)apache