分佈式事務 Spring+JTA+Atomikos+Hibernate+JMS

請經過如下方式下載github源代碼: html

git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git

本地事務和分佈式事務的區別在於:本地事務只用於處理單一數據源事務(好比單個數據庫),分佈式事務能夠處理多種異構的數據源,好比某個業務操做中同時包含了JDBC和JMS或者某個操做須要訪問多個不一樣的數據庫。 java

  Java經過JTA完成分佈式事務,JTA自己只是一種規範,不一樣的應用服務器都包含有本身的實現(好比JbossJTA),同時還存在獨立於應用服務器的單獨JTA實現,好比本篇中要講到的Atomikos。對於JTA的原理,這裏不細講,讀者能夠經過這篇文章瞭解相關知識。 git

  在本篇文章中,咱們將實現如下一個應用場景:你在網上購物,下了訂單以後,訂單數據將保存在系統的數據庫中,同時爲了安排物流,訂單信息將以消息(Message)的方式發送到物流部門以便送貨。 github

  以上操做同時設計到數據庫操做和JMS消息發送,爲了使整個操做成爲一個原子操做,咱們只能選擇分佈式事務。咱們首先設計一個service層,定義OrderService接口: spring

package davenkin;

public interface OrderService {

    public void makeOrder(Order order);
}

爲了簡單起見,咱們設計一個很是簡單的領域對象Order: sql

@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {

    @XmlElement(name = "Id",required = true)
    private long id;

    @XmlElement(name = "ItemName",required = true)
    private String itemName;

    @XmlElement(name = "Price",required = true)
    private double price;

    @XmlElement(name = "BuyerName",required = true)
    private String buyerName;

    @XmlElement(name = "MailAddress",required = true)
    private String mailAddress;

    public Order() {
    }

爲了採用JAXB對Order對象進行Marshal和Unmarshal,咱們在Order類中加入了JAXB相關的Annotation。 咱們將使用Hibernate來完成數據持久化,而後使用Spring提供的JmsTemplate將Order轉成xml後以TextMessage的形式發送到物流部門的ORDER.QUEUE中。 數據庫

(一)準備數據庫
  爲了方便,咱們將採用Spring提供的embedded數據庫,默認狀況下Spring採用HSQL做爲後臺數據庫,雖然在本例中咱們將採用HSQL的非XA的DataSource,可是經過Atomikos包裝以後依然能夠參與分佈式事務。
  SQL腳本包含在createDB.sql文件中:
apache

CREATE TABLE USER_ORDER(
ID INT NOT NULL,
ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,
PRICE DOUBLE NOT NULL,
BUYER_NAME CHAR (32) NOT NULL,
MAIL_ADDRESS VARCHAR(500) NOT NULL,
PRIMARY KEY(ID)
);
在Spring中配置DataSource以下:

<jdbc:embedded-database id="dataSource">
    <jdbc:script location="classpath:createDB.sql"/>
</jdbc:embedded-database>

(二)啓動ActiveMQ 服務器

  咱們將採用embedded的ActiveMQ,在測試以前啓動ActiveMQ提供的BrokerService,在測試執行完以後關閉BrokerService。 session

@BeforeClass
    public static void startEmbeddedActiveMq() throws Exception {
        broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }

    @AfterClass
    public static void stopEmbeddedActiveMq() throws Exception {
        broker.stop();
    }

(三)實現OrderService

  建立一個DefaultOrderService,該類實現了OrderService接口,並維護一個JmsTemplate和一個Hibernate的SessionFactory實例變量,分別用於Message的發送和數據庫處理。

package davenkin;

import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;

public class DefaultOrderService  implements OrderService{
    private JmsTemplate jmsTemplate;
    private SessionFactory sessionFactory;

    @Override
    @Transactional
    public void makeOrder(Order order) {
        Session session = sessionFactory.getCurrentSession();
        session.save(order);
        jmsTemplate.convertAndSend(order);

    }

    @Required
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Required
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
}

(四)建立Order的Mapping配置文件

<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
        "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">

<hibernate-mapping>
    <class name="davenkin.Order" table="USER_ORDER">
        <id name="id" type="long">
            <column name="ID" />
            <generator class="increment" />
        </id>
        <property name="itemName" type="string">
            <column name="ITEM_NAME" />
        </property>
        <property name="price" type="double">
            <column name="PRICE"/>
        </property>
        <property name="buyerName" type="string">
            <column name="BUYER_NAME"/>
        </property>
        <property name="mailAddress" type="string">
            <column name="MAIL_ADDRESS"/>
        </property>
    </class>
</hibernate-mapping>

(五)配置Atomikos事務

  在Spring的IoC容器中,咱們須要配置由Atomikos提供的UserTransaction和TransactionManager,而後再配置Spring的JtaTransactionManager:

<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">
        <constructor-arg>
            <props>
                <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
            </props>
        </constructor-arg>
    </bean>

    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">
        <property name="forceShutdown" value="false" />
    </bean>

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
        <property name="transactionTimeout" value="300" />
    </bean>

    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
        <property name="transactionManager" ref="atomikosTransactionManager" />
        <property name="userTransaction" ref="atomikosUserTransaction" />
    </bean>

    <tx:annotation-driven transaction-manager="jtaTransactionManager" />

(六)配置JMS

  對於JMS,爲了能使ActiveMQ加入到分佈式事務中,咱們須要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,而後再配置JmsTemplate,此外還須要配置MessageConvertor在Order對象和XML之間互轉。

<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
        <property name="uniqueResourceName" value="XAactiveMQ" />
        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
        <property name="poolSize" value="5"/>
    </bean>


    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="receiveTimeout" value="2000" />
        <property name="defaultDestination" ref="orderQueue"/>
        <property name="sessionTransacted" value="true" />
        <property name="messageConverter" ref="oxmMessageConverter"/>
    </bean>

    <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="ORDER.QUEUE"/>
    </bean>


    <bean id="oxmMessageConverter"
          class="org.springframework.jms.support.converter.MarshallingMessageConverter">
        <property name="marshaller" ref="marshaller"/>
        <property name="unmarshaller" ref="marshaller"/>
    </bean>

    <oxm:jaxb2-marshaller id="marshaller">
        <oxm:class-to-be-bound name="davenkin.Order"/>
    </oxm:jaxb2-marshaller>

(七)測試

  在測試中,咱們首先經過(二)中的方法啓動ActiveMQ,再調用DefaultOrderService,最後對數據庫和QUEUE進行驗證:

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息