Jms的MessageListener中的Jms事務

摘要

通常來講,若是爲JmsTemplate作了事務配置,那麼它將會與當前線程的數據庫事務掛鉤,而且僅在數據庫事務的afterCommit動做中提交。spring

可是,若是一個MessageListener在接收Jms消息的同時,也使用JmsTemplate發送了Jms消息;那麼它發送的Jms消息將與數據庫事務無關(即便爲JmsTemplate作了事務配置),而是與Listener接收消息保持在同一個事務中。數據庫


 

問題

問題是一位同事發現的。apache

帳務系統的墊付功能存在REST和MessageListener兩個入口;兩個入口中調用的是同一套代碼和業務邏輯。可是,REST入口中發送的Jms消息會隨着數據庫事務回滾而回滾;MessageListener中卻不會回滾。相關流程圖說明以下。session

咱們指望的結果是:在還款操做中發送的Jms消息,隨還款操做的數據庫事務回滾而取消(紅色底色部分的操做);而墊付操做中發送的Jms消息,則應隨墊付操做的數據庫事務提交而提交(綠色底色部分的操做)。這一點在REST入口的相關日誌和數據中獲得了驗證。可是,從MessageListener入口調用此服務時,卻出現了問題:雖然還款服務的數據庫事務確實回滾了,可是其中的Jms消息卻成功發送了出來(參見紅色字體部分)app

spacer.gifwKioL1iG-gjzuyXgAAEzpZarhtw511.png-wh_50


 

分析

首先,REST入口的操做、結果是正確的。這說明,當數據庫事務回滾時,Jms消息確實沒有提交。那麼,能夠確定一點:必定是MessageListener後續處理中作了提交消息這個動做。ide

通過一系列的Debug和逐行執行、分析,我找到了這段代碼。測試

 

MessageListener接收到消息後,會進入org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute( Object invoker,  Session session,  MessageConsumer consumer)方法中。因爲沒有配置transactionManager,咱們會經過doReceiveAndExecute(invoker, session, consumer, null)來調用org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute( Object invoker,  Session session,  MessageConsumer consumer,  TransactionStatus status) 方法。字體

/**this

 * Execute the listener for a message received from the given consumer,url

 * wrapping the entire operation in an external transaction if demanded.

 * @param session the JMS Session to work on

 * @param consumer the MessageConsumer to work on

 * @return whether a message has been received

 * @throws JMSException if thrown by JMS methods

 * @see #doReceiveAndExecute

 */

protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)

        throws JMSException {

    if (this.transactionManager != null) {

        // Execute receive within transaction.

        TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);

        boolean messageReceived;

        try {

            messageReceived = doReceiveAndExecute(invoker, session, consumer, status);

        }

        catch (JMSException ex) {

            rollbackOnException(status, ex);

            throw ex;

        }

        catch (RuntimeException ex) {

            rollbackOnException(status, ex);

            throw ex;

        }

        catch (Error err) {

            rollbackOnException(status, err);

            throw err;

        }

        this.transactionManager.commit(status);

        return messageReceived;

    }

    else {

        // Execute receive outside of transaction.

        return doReceiveAndExecute(invoker, session, consumer, null);

    }

}

 

doReceiveAndExecute方法又會調用org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener( Session session,  Message message)方法,以便於執行咱們編寫的業務代碼,並處理Jms相關的事務。若是業務代碼中沒有拋出異常,那麼就會進入org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary( Session session,  Message message)方法中。

/**

 * Execute the specified listener,

 * committing or rolling back the transaction afterwards (if necessary).

 * @param session the JMS Session to operate on

 * @param message the received JMS Message

 * @throws JMSException if thrown by JMS API methods

 * @see #invokeListener

 * @see #commitIfNecessary

 * @see #rollbackOnExceptionIfNecessary

 * @see #convertJmsAccessException

 */

protected void doExecuteListener(Session session, Message message) throws JMSException {

    if (!isAcceptMessagesWhileStopping() && !isRunning()) {

        if (logger.isWarnEnabled()) {

            logger.warn("Rejecting received message because of the listener container " +

                    "having been stopped in the meantime: " + message);

        }

        rollbackIfNecessary(session);

        throw new MessageRejectedWhileStoppingException();

    }

    try {

        invokeListener(session, message);

    }

    catch (JMSException ex) {

        rollbackOnExceptionIfNecessary(session, ex);

        throw ex;

    }

    catch (RuntimeException ex) {

        rollbackOnExceptionIfNecessary(session, ex);

        throw ex;

    }

    catch (Error err) {

        rollbackOnExceptionIfNecessary(session, err);

        throw err;

    }

    commitIfNecessary(session, message);

}

 

commitIfNecessary方法幾經展轉,最終會調用到org.apache.activemq.ActiveMQConnection.syncSendPacket( Command command) 方法。這個方法的做用,是將當前Connection中的數據同步到MQ服務端。也就是在這個方法執行完畢以後,不該當發送的消息被髮送了出去。

public Response syncSendPacket(Command command) throws JMSException {

    if (isClosed()) {

        throw new ConnectionClosedException();

    else {

        try {

            Response response = (Response)this.transport.request(command);

            if (response.isException()) {

                ExceptionResponse er = (ExceptionResponse)response;

                if (er.getException() instanceof JMSException) {

                    throw (JMSException)er.getException();

                else {

                    if (isClosed()||closing.get()) {

                        LOG.debug("Received an exception but connection is closing");

                    }

                    JMSException jmsEx = null;

                    try {

                        jmsEx = JMSExceptionSupport.create(er.getException());

                    catch(Throwable e) {

                        LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);

                    }

                    if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){

                        forceCloseOnSecurityException(er.getException());

                    }

                    if (jmsEx !=null) {

                        throw jmsEx;

                    }

                }

            }

            return response;

        catch (IOException e) {

            throw JMSExceptionSupport.create(e);

        }

    }

 

這就是消息被錯誤發送的緣由:MessageListener在接收消息的時候,獲取了一個Connection;後續發送消息時,用的是同一個Connection。所以,儘管中間的數據庫事務回滾了,但因爲這個Connection最終要提交(MessageListner中沒有拋出異常),用這個Connection發送的全部消息最終都被提交到了MQ上。


 

解決方案

方案一:使用JmsTransactionManager來管理Jms事務

能夠經過如下配置,爲MessageListner注入JmsTransactionManager:

<bean id="jmsTransactionManager"

    class="org.springframework.jms.connection.JmsTransactionManager">

    <property name="connectionFactory" ref="jmsConnectionFactory" />

</bean

 

 

<jms:listener-container destination-type="queue"

    transaction-manager="jmsTransactionManager" concurrency="4"

    acknowledge="transacted" connection-factory="jmsConnectionFactory">

    <jms:listener destination="queue.thread.autopay" ref="autoPayListener" />

</jms:listener-container>

可是在測試後發現,這樣配置並無用。我分析,JmsTransactionManager並不能爲@Transactional(propagation = Propagation.REQUIRES_NEW)註解建立新的JmsConnection,於是,發送消息使用的仍然是接收消息時建立的connection。因爲MessageListener中並無拋出異常,JmsTransactionManager仍然會提交這個connection中的數據,並最終致使消息提交。

方案二:手動將發送消息的操做放到數據庫事務的AfterCommit操做中

現有代碼中,咱們是在事務體內執行JmsTemplate.send()操做;在事務的AfterCommit操做中執行Session.commit()。

若是咱們將JmsTemplate.send()操做放到AfterCommit操做中,那麼就能夠確保只在數據庫事務提交後,纔會提交Jms消息了。

此方案驗證可行。驗證代碼以下:

public void send(Event event, List<TransferParam> transferParams) {

    TransactionSynchronizationManager

        .registerSynchronization(new TransactionSynchronizationAdapter() {

            @Override

            public void afterCommit() {

                System.out.println(this.getClass() + " - " + event + "--"

                    + transferParams);

                try {

                    event.getTychoOperType().ifPresent(

                        (value) -> {

                            TychoProductor4Account.this.doSend(event,

                                transferParams);

                        });

                catch (Exception e) {

                    System.out.println(e.getMessage());

                    TychoProductor4Account.LOGGER.error("發送數據到tycho異常:{}",

                        e);

                }

            }

        });

}

 

方案三:手動在數據庫事務的RollBack操做中回滾Jms消息

暫未找到實現方式。


方案四:嘗試爲發送消息建立並使用新的Connection

代碼流程中之因此會使用同一個Connection,是由於接收、發送消息時,都是從線程上下文中嘗試獲取JmsResourceHolder,並從其中獲取鏈接的。

那麼,簡單作法就是在接收到消息後,開啓一個子線程;複雜作法則是爲JmsTransactionManager編寫識別@Transactional(propagation = Propagation.REQUIRES_NEW)註解的功能。

開啓子線程的方案可行。驗證代碼以下:

Future<Event> actualResult = this.keplerRestExecutor.submit(() -> {

            Event4Reserve event4Reserve = new Event4Reserve();

            event4Reserve.setRecordId(recordId);

            event4Reserve.setUserId(ThreadConsts.SYSTEM_USER_ID);

            AutoPayListener4BaeEvent.LOGGER.debug("event4Reserve={}",

                event4Reserve);

            Event result = this.bizAccountEventService.handle(event4Reserve);

            AutoPayListener4BaeEvent.LOGGER.info("result={}", result);

            return result;

        });

        try {

            actualResult.get();

        catch (InterruptedException e) {

            AutoPayListener4BaeEvent.LOGGER.error("線程被中斷!", e);

            throw new RuntimeException("墊付線程中斷!", e);

        catch (ExecutionException e) {

            AutoPayListener4BaeEvent.LOGGER.error("執行過程出錯!", e);

            Throwable real = e.getCause();

            if (real instanceof RuntimeException) {

                throw (RuntimeException) real;

            else {

                throw new RuntimeException(real);

            }

        }

 

方案五:使用org.springframework.jms.connection.CachingConnectionFactory

已驗證,方案無效。

測試配置以下:

<bean id="jmsTransactionManager"

    class="org.springframework.jms.connection.JmsTransactionManager">

    <property name="connectionFactory" ref="jmsConnectionFactory" />

</bean>

<bean id="jmsConnectionFactory"

    class="org.springframework.jms.connection.CachingConnectionFactory">

    <property name="targetConnectionFactory" ref="targetActiveMqConnectionFactory" />

    <property name="sessionCacheSize" value="10" />

</bean>

<amq:connectionFactory id="targetActiveMqConnectionFactory"

    brokerURL="${jms.url.failover}">

    <amq:redeliveryPolicyMap>

        <amq:redeliveryPolicyMap>

            <amq:defaultEntry>

                <!-- 5次,每次30秒 -->

                <amq:redeliveryPolicy maximumRedeliveries="5"

                    initialRedeliveryDelay="30000" />

            </amq:defaultEntry>

            <amq:redeliveryPolicyEntries>

                <!-- 5次,每次10秒 -->

                <amq:redeliveryPolicy queue="queue.thread.autopay"

                    maximumRedeliveries="5" initialRedeliveryDelay="10000" />

            </amq:redeliveryPolicyEntries>

            <amq:redeliveryPolicyEntries>

                <!-- 銀聯實時劃扣超時限制 ,5次,每次90秒 -->

                <amq:redeliveryPolicy queue="queue.thread.instantUnionpay"

                    maximumRedeliveries="5" initialRedeliveryDelay="90000" />

            </amq:redeliveryPolicyEntries>

        </amq:redeliveryPolicyMap>

    </amq:redeliveryPolicyMap>

</amq:connectionFactory>

 

<jms:listener-container destination-type="queue"

    transaction-manager="jmsTransactionManager" concurrency="4"

    acknowledge="transacted" connection-factory="jmsConnectionFactory">

    <jms:listener destination="queue.thread.autopay" ref="autoPayListener" />

</jms:listener-container>

 

方案六:爲jmsTemplate和MessageListener配置不一樣的ConnectionFactory

驗證可行。測試配置以下:

<bean id="newJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="targetActiveMqConnectionFactory" />

    <property name="sessionTransacted" value="true" />

    <property name="explicitQosEnabled" value="${activemq.explicitQosEnabled}" />

    <property name="timeToLive" value="86400000" />

</bean>

<amq:connectionFactory id="targetActiveMqConnectionFactory"

    brokerURL="${jms.url.failover}">

</amq:connectionFactory>

 

<jms:listener-container destination-type="queue"

    concurrency="4" acknowledge="transacted" connection-factory="jmsConnectionFactory">

    <jms:listener destination="queue.thread.autopay" ref="autoPayListener" />

</jms:listener-container>

<amq:connectionFactory id="jmsConnectionFactory"

    brokerURL="${jms.url.failover}">

</amq:connectionFactory>

 


 

後續工做

基本已經驗證完畢。

 

小結

可行方案有三個,分別是方案二:手動將發送消息的操做放到數據庫事務的AfterCommit操做中方案四:嘗試爲發送消息建立並使用新的Connection方案六:爲jmsTemplate和MessageListener配置不一樣的ConnectionFactory。比較簡便的方式是方案六,其它方式都須要修改代碼。

相關文章
相關標籤/搜索