聊聊artemis JMSBridge的QualityOfServiceMode

本文主要研究一下artemis JMSBridge的QualityOfServiceModejava

QualityOfServiceMode

activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/QualityOfServiceMode.javagit

public enum QualityOfServiceMode {
   AT_MOST_ONCE(0), DUPLICATES_OK(1), ONCE_AND_ONLY_ONCE(2);

   private final int value;

   QualityOfServiceMode(final int value) {
      this.value = value;
   }

   public int intValue() {
      return value;
   }

   public static QualityOfServiceMode valueOf(final int value) {
      if (value == AT_MOST_ONCE.value) {
         return AT_MOST_ONCE;
      }
      if (value == DUPLICATES_OK.value) {
         return DUPLICATES_OK;
      }
      if (value == ONCE_AND_ONLY_ONCE.value) {
         return ONCE_AND_ONLY_ONCE;
      }
      throw new IllegalArgumentException("invalid QualityOfServiceMode value: " + value);
   }

}
  • QualityOfServiceMode定義了三個枚舉值,分別是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE

sendBatchNonTransacted

activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.javagithub

public final class JMSBridgeImpl implements JMSBridge {

   //......

   private void sendBatchNonTransacted() {
      try {
         if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || (qualityOfServiceMode == QualityOfServiceMode.AT_MOST_ONCE && maxBatchSize > 1)) {
            // We client ack before sending

            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session");
            }

            messages.getLast().acknowledge();

            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session");
            }
         }

         boolean exHappened;

         do {
            exHappened = false;
            try {
               sendMessages();
            } catch (TransactionRolledbackException e) {
               ActiveMQJMSBridgeLogger.LOGGER.transactionRolledBack(e);
               exHappened = true;
            }
         }
         while (exHappened);

         if (maxBatchSize > 1) {
            // The sending session is transacted - we need to commit it

            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Committing target session");
            }

            targetSession.commit();

            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Committed target session");
            }
         }

         if (qualityOfServiceMode == QualityOfServiceMode.DUPLICATES_OK) {
            // We client ack after sending

            // Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
            // For a slightly less strong delivery guarantee

            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session");
            }

            messages.getLast().acknowledge();

            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
               ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session");
            }
         }
      } catch (Exception e) {
         if (!stopping) {
            ActiveMQJMSBridgeLogger.LOGGER.bridgeAckError(e, bridgeName);
         }

         // We don't call failure otherwise failover would be broken with ActiveMQ
         // We let the ExceptionListener to deal with failures

         if (connectedSource) {
            try {
               sourceSession.recover();
            } catch (Throwable ignored) {
            }
         }

      } finally {
         // Clear the messages
         messages.clear();

      }
   }

   //......
}
  • JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode爲ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE且maxBatchSize大於1的時候先執行messages.getLast().acknowledge();以後使用一個while循環執行sendMessages,循環在沒有TransactionRolledbackException異常時會終止;最後在qualityOfServiceMode爲DUPLICATES_OK的時候執行messages.getLast().acknowledge()

小結

QualityOfServiceMode定義了三個枚舉值,分別是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE;JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode爲ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE(且maxBatchSize大於1)的時候在sendMessages以前先執行ack(若是異常在ack與sendMessages之間,則消息可能丟失;因爲ONCE_AND_ONLY_ONCE須要local transaction或者JTA處理,在沒有事務狀況下與AT_MOST_ONCE相同);而對於qualityOfServiceMode爲DUPLICATES_OK的在sendMessages以後執行ack(若是異常在sendMessages與ack之間,則異常以後,client端因爲沒有收到ack會再次發送消息,可能形成重複)apache

doc

相關文章
相關標籤/搜索