ActiveMq中Session的事務與消息過時

ActiveMQ有支持兩種事務,html

  • JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
  • XA Transactions - where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.

在支持事務的session中,producer發送message時在message中帶有transaction ID。broker收到message後判斷是否有transaction ID,若是有就把message保存在transaction store中,等待commit或者rollback消息。因此ActiveMq的事務是針對broker而不是producer的,無論session是否commit,broker都會收到message。java

若是producer發送模式選擇了persistent,那麼message過時後會進入死亡隊列。在message進入死亡隊列以前,ActiveMQ會刪除message中的transaction ID,這樣過時的message就不在事務中了,不會保存在transaction store中,會直接進入死亡隊列。具體刪除transaction ID的地方是在apache

org.apache.activemq.util.BrokerSupport的doResend,將transaction ID保存在了originalTransactionID中,刪除了transaction IDapi

public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {         
        Message message = copy ? originalMessage.copy() : originalMessage;  
        message.setOriginalDestination(message.getDestination());  
        <strong>message.setOriginalTransactionId(message.getTransactionId());</strong>  
        message.setDestination(deadLetterDestination);  
        <strong>message.setTransactionId(null);</strong>  
        message.setMemoryUsage(null);  
        message.setRedeliveryCounter(0);  
        boolean originalFlowControl = context.isProducerFlowControl();  
        try {  
            context.setProducerFlowControl(false);  
            ProducerInfo info = new ProducerInfo();  
            ProducerState state = new ProducerState(info);  
            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();  
            producerExchange.setProducerState(state);  
            producerExchange.setMutable(true);  
            producerExchange.setConnectionContext(context);  
            context.getBroker().send(producerExchange, message);  
        } finally {  
            context.setProducerFlowControl(originalFlowControl);  
        }
相關文章
相關標籤/搜索