*注:別人那複製來的mysql
Spring提供了一個JmsTransactionManager用於對JMS ConnectionFactory作事務管理。這將容許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory綁定一個ConnectionFactory/Session這樣的配對到線程中。JmsTemplate會自動檢測這樣的事務資源,並對它們進行相應操做。spring
在Java EE環境中,ConnectionFactory會池化Connection和Session,這樣這些資源將會在整個事務中被有效地重複利用。在一個獨立的環境中,使用Spring的SingleConnectionFactory時全部的事務將公用一個Connection,可是每一個事務將保留本身獨立的Session。sql
JmsTemplate能夠利用JtaTransactionManager和可以進行分佈式的 JMS ConnectionFactory處理分佈式事務。數據庫
在Spring整合JMS的應用中,若是咱們要進行本地的事務管理的話很是簡單,只須要在定義對應的消息監聽容器時指定其sessionTransacted屬性爲true,如:服務器
- <bean id="jmsContainer"
- class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="queueDestination" />
- <property name="messageListener" ref="consumerMessageListener" />
- <property name="sessionTransacted" value="true"/>
- </bean>
該屬性值默認爲false,這樣JMS在進行消息監聽的時候就會進行事務控制,當在接收消息時監聽器執行失敗時JMS就會對接收到的消息進行回滾,對於SessionAwareMessageListener在接收到消息後發送一個返回消息時也處於同一事務下,可是對於其餘操做如數據庫訪問等將不屬於該事務控制。session
這裏咱們能夠來作一個這樣的測試:咱們如上配置監聽在queueDestination的消息監聽容器的sessionTransacted屬性爲true,而後把咱們前面提到的消息監聽器ConsumerMessageListener改爲這樣:app
- public class ConsumerMessageListener implements MessageListener {
-
- public void onMessage(Message message) {
- //這裏咱們知道生產者發送的就是一個純文本消息,因此這裏能夠直接進行強制轉換,或者直接把onMessage方法的參數改爲Message的子類TextMessage
- TextMessage textMsg = (TextMessage) message;
- System.out.println("接收到一個純文本消息。");
- try {
- System.out.println("消息內容是:" + textMsg.getText());
- if (1 == 1) {
- throw new RuntimeException("Error");
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
-
- }
咱們能夠看到在上述代碼中咱們的ConsumerMessageListener在進行消息接收的時候拋出了一個RuntimeException,根據咱們上面說的,由於咱們已經在對應的監聽容器上定義了其sessionTransacted屬性爲true,因此當這裏拋出異常的時候JMS將對接收到的消息進行回滾,即下次進行消息接收的時候該消息仍然可以被接收到。爲了驗證這一點,咱們先執行一遍測試代碼,往queueDestination發送一個文本消息,這個時候ConsumerMessageListener在進行接收的時候將會拋出一個RuntimeException,已經接收到的純文本消息將進行回滾;接着咱們去掉上面代碼中拋出異常的語句,即ConsumerMessageListener可以正常的進行消息接收,這個時候咱們再運行一次測試代碼,往ConsumerMessageListener監聽的queueDestination發送一條消息。若是以前在接手時拋出了異常的那條消息已經回滾了的話,那麼這個時候將可以接收到兩條消息,控制檯將輸出接收到的兩條消息的內容。具體結果有興趣的朋友能夠本身驗證一下。分佈式
若是想接收消息和數據庫訪問處於同一事務中,那麼咱們就能夠配置一個外部的事務管理同時配置一個支持外部事務管理的消息監聽容器(如DefaultMessageListenerContainer)。要配置這樣一個參與分佈式事務管理的消息監聽容器,咱們能夠配置一個JtaTransactionManager,固然底層的JMS ConnectionFactory須要可以支持分佈式事務管理,並正確地註冊咱們的JtaTransactionManager。這樣消息監聽器進行消息接收和對應的數據庫訪問就會處於同一數據庫控制下,當消息接收失敗或數據庫訪問失敗都會進行事務回滾操做。工具
- <bean id="jmsContainer"
- class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="queueDestination" />
- <property name="messageListener" ref="consumerMessageListener" />
- <property name="transactionManager" ref="jtaTransactionManager"/>
- </bean>
-
- <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
當給消息監聽容器指定了transactionManager時,消息監聽容器將忽略sessionTransacted的值。 測試
關於使用JtaTransactionManager來管理上述分佈式事務,咱們這裏也能夠來作一個試驗。
首先:往Spring配置文件applicationContext.xml中添加以下配置:
- <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
- <property name="dataSource" ref="dataSource"/>
- </bean>
-
- <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/>
- <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
-
- <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
咱們能夠看到,在這裏咱們引入了一個jndi數據源,定義了一個JtaTransactionManager,定義了Spring基於註解的聲明式事務管理,定義了一個Spring提供的進行Jdbc操做的工具類jdbcTemplate。
接下來把咱們的ConsumerMessageListener改成以下形式:
- public class ConsumerMessageListener implements MessageListener {
-
- @Autowired
- private TestDao testDao;
-
- private int count = 0;
-
- public void onMessage(Message message) {
- //這裏咱們知道生產者發送的就是一個純文本消息,因此這裏能夠直接進行強制轉換,或者直接把onMessage方法的參數改爲Message的子類TextMessage
- TextMessage textMsg = (TextMessage) message;
- System.out.println(new Date().toLocaleString() + "接收到一個純文本消息。");
- try {
- String text = textMsg.getText();
- System.out.println("消息內容是:" + text);
- System.out.println("當前count的值是:" + count);
- testDao.insert(text + count);
- if (count == 0) {
- count ++;
- throw new RuntimeException("Error! 出錯啦!");
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
-
- }
咱們能夠看到,在ConsumerMessageListener中咱們定義了一個實例變量count,其初始值爲0;在onMessage裏面,咱們能夠看到咱們把接收到的消息內容做爲參數調用了testDao的insert方法;當count值爲0,也就是進行第一次消息接收的時候會將count的值加1,同時拋出一個運行時異常。那麼咱們這裏要測試的就是進行第一次接收的時候testDao已經把相關內容插入數據庫了,接着在onMessage裏面拋出了一個異常同時count加1,咱們預期的結果應該是此時數據庫進行回滾,同時JMS也回滾,這樣JMS將繼續嘗試接收該消息,此時一樣會調用testDao的insert方法將內容插入數據庫,再接着count已經不爲0了,因此此時將再也不拋出異常,JMS成功進行消息的接收,testDao也成功的將消息內容插入到了數據庫。要證實這個預期咱們除了看數據庫中插入的數據外,還能夠看控制檯的輸出,正常狀況控制檯將輸出兩次消息接收的內容,且第一次時count爲0,第二次count爲1。
TestDao是一個接口,其TestDaoImpl對insert的方法實現以下:
- @Transactional(readOnly=false)
- public void insert(final String name) {
-
- jdbcTemplate.update("insert into test(name) values(?)", name);
-
- }
這裏咱們使用支持JtaTransactionManager的Weblogic來進行測試,由於是Web容器,因此咱們這裏定義了一個Controller來進行消息的發送,具體代碼以下:
- @Controller
- @RequestMapping("test")
- public class TestController {
-
- @Autowired
- @Qualifier("queueDestination")
- private Destination destination;
-
- @Autowired
- private ProducerService producerService;
-
- @RequestMapping("first")
- public String first() {
- producerService.sendMessage(destination, "你好,如今是:" + new Date().toLocaleString());
- return "/test/first";
- }
-
- }
接下來就是啓用Weblogic服務器,進入其控制檯,定義一個名叫「jdbc/mysql」的JNDI數據源,而後把該項目部署到Weblogic服務器上並進行啓動。接下來咱們就能夠訪問/test/first.do訪問到上述first方法。以後控制檯會輸出以下信息:
咱們能夠看到當count爲0時接收了一次,並隨後拋出了異常,以後count爲1又接收了一次,這說明在count爲0時拋出異常後咱們的JMS進行回滾了,那麼咱們的數據庫是否有進行回滾呢?接着咱們來看數據庫中的內容:
咱們能夠看到數據庫表中只有一條記錄,並且最後一位表示count的值的爲1,這說明在JMS進行消息接收拋出異常時咱們的數據庫也回滾了。關於使用JtaTransactionManager進行分佈式事務管理的問題就說到這裏了,有興趣的朋友能夠本身試驗一下。