通常來講,若是爲JmsTemplate作了事務配置,那麼它將會與當前線程的數據庫事務掛鉤,而且僅在數據庫事務的afterCommit動做中提交。spring
可是,若是一個MessageListener在接收Jms消息的同時,也使用JmsTemplate發送了Jms消息;那麼它發送的Jms消息將與數據庫事務無關(即便爲JmsTemplate作了事務配置),而是與Listener接收消息保持在同一個事務中。數據庫
問題是一位同事發現的。apache
帳務系統的墊付功能存在REST和MessageListener兩個入口;兩個入口中調用的是同一套代碼和業務邏輯。可是,REST入口中發送的Jms消息會隨着數據庫事務回滾而回滾;MessageListener中卻不會回滾。相關流程圖說明以下。session
咱們指望的結果是:在還款操做中發送的Jms消息,隨還款操做的數據庫事務回滾而取消(紅色底色部分的操做);而墊付操做中發送的Jms消息,則應隨墊付操做的數據庫事務提交而提交(綠色底色部分的操做)。這一點在REST入口的相關日誌和數據中獲得了驗證。可是,從MessageListener入口調用此服務時,卻出現了問題:雖然還款服務的數據庫事務確實回滾了,可是其中的Jms消息卻成功發送了出來(參見紅色字體部分)。app
首先,REST入口的操做、結果是正確的。這說明,當數據庫事務回滾時,Jms消息確實沒有提交。那麼,能夠確定一點:必定是MessageListener後續處理中作了提交消息這個動做。ide
通過一系列的Debug和逐行執行、分析,我找到了這段代碼。測試
MessageListener接收到消息後,會進入org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute( Object invoker, Session session, MessageConsumer consumer)方法中。因爲沒有配置transactionManager,咱們會經過doReceiveAndExecute(invoker, session, consumer, null)來調用org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute( Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) 方法。字體
|
doReceiveAndExecute方法又會調用org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener( Session session, Message message)方法,以便於執行咱們編寫的業務代碼,並處理Jms相關的事務。若是業務代碼中沒有拋出異常,那麼就會進入org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary( Session session, Message message)方法中。
|
commitIfNecessary方法幾經展轉,最終會調用到org.apache.activemq.ActiveMQConnection.syncSendPacket( Command command) 方法。這個方法的做用,是將當前Connection中的數據同步到MQ服務端。也就是在這個方法執行完畢以後,不該當發送的消息被髮送了出去。
|
這就是消息被錯誤發送的緣由:MessageListener在接收消息的時候,獲取了一個Connection;後續發送消息時,用的是同一個Connection。所以,儘管中間的數據庫事務回滾了,但因爲這個Connection最終要提交(MessageListner中沒有拋出異常),用這個Connection發送的全部消息最終都被提交到了MQ上。
能夠經過如下配置,爲MessageListner注入JmsTransactionManager:
|
可是在測試後發現,這樣配置並無用。我分析,JmsTransactionManager並不能爲@Transactional(propagation = Propagation.REQUIRES_NEW)註解建立新的JmsConnection,於是,發送消息使用的仍然是接收消息時建立的connection。因爲MessageListener中並無拋出異常,JmsTransactionManager仍然會提交這個connection中的數據,並最終致使消息提交。
現有代碼中,咱們是在事務體內執行JmsTemplate.send()操做;在事務的AfterCommit操做中執行Session.commit()。
若是咱們將JmsTemplate.send()操做放到AfterCommit操做中,那麼就能夠確保只在數據庫事務提交後,纔會提交Jms消息了。
此方案驗證可行。驗證代碼以下:
|
暫未找到實現方式。
代碼流程中之因此會使用同一個Connection,是由於接收、發送消息時,都是從線程上下文中嘗試獲取JmsResourceHolder,並從其中獲取鏈接的。
那麼,簡單作法就是在接收到消息後,開啓一個子線程;複雜作法則是爲JmsTransactionManager編寫識別@Transactional(propagation = Propagation.REQUIRES_NEW)註解的功能。
開啓子線程的方案可行。驗證代碼以下:
|
已驗證,方案無效。
測試配置以下:
|
驗證可行。測試配置以下:
|
基本已經驗證完畢。
可行方案有三個,分別是方案二:手動將發送消息的操做放到數據庫事務的AfterCommit操做中、方案四:嘗試爲發送消息建立並使用新的Connection、方案六:爲jmsTemplate和MessageListener配置不一樣的ConnectionFactory。比較簡便的方式是方案六,其它方式都須要修改代碼。