分佈式事務之Spring事務與JMS事務(二)

著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

Spring事務

Spring事務機制主要包括聲明式事務和編程式事務,聲明式事務讓咱們從複雜的事務處理中獲得解脫,編程式事務在實際開發中得不到普遍使用,僅供學習參考。spring

事務抽象

spring的事務管理提供了統一的API接口支持不一樣的資源,提供聲明式事務管企且方便與Spring框架集成。spring的事務管理器使用抽象的設計方式實現,如下爲spring中事務管理器的邏輯實現代碼 (精簡了一部分,突出核心邏輯)sql

## 事務狀態
public interface TransactionStatus extends SavepointManager{
    boolean isNewTransaction();
    boolean hasSavepoint();
    void setRollbackOnly();
    boolean isRollbackOnly();
    boolean isCompleted();
}
## 事務定義
public interface TransactionDefinition{
    int getPropagationBehavior();
    int getIsolationLevel();
    String getName();
    int getTimeout();
    boolean isReadOnly();
}
## 事務管理器
public interface PlatformTransactionManager{
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    void commit(TransactionStatus status) throws TransactionException;
    void rollback(TransactionStatus status) throws TransactionException;
}
複製代碼

事務的傳播行爲

Spring在TransactionDefinition接口中規定了 7 種類型的事務傳播行爲,它們規定了事務方法和事務方法發生嵌套調用時事務如何進行傳播:
事務傳播行爲類型:
數據庫

事務傳播行爲類型 說明
PROPAGATION_REQUIRED 若是當前沒有事務,就新建一個事務,若是已經存在一個事務中,加入到這個事務中。這是最多見的選擇。
PROPAGATION_SUPPORTS 支持當前事務,若是當前沒有事務,就以非事務方式執行。
PROPAGATION_MANDATORY 使用當前的事務,若是當前沒有事務,就拋出異常。
PROPAGATION_REQUIRES_NEW 新建事務,若是當前存在事務,把當前事務掛起。
PROPAGATION_NOT_SUPPORTED 以非事務方式執行操做,若是當前存在事務,就把當前事務掛起。
PROPAGATION_NEVER 以非事務方式執行,若是當前存在事務,則拋出異常。
PROPAGATION_NESTED 若是當前存在事務,則在嵌套事務內執行。若是當前沒有事務,則執行與PROPAGATION_REQUIRED相似的操做。

當使用PROPAGATION_NESTED時,底層的數據源必須基於JDBC 3.0,而且實現者須要支持保存點事務機制。編程

事務隔離級別

spring若是沒有指定事務隔離級別的話,則spring的事務隔離級別跟數據庫的隔離級別走,數據庫是什麼隔離級別,spring就是什麼隔離級別。
Spring事務的隔離級別:api

  1. ISOLATION_DEFAULT: 這是一個PlatfromTransactionManager默認的隔離級別,使用數據庫默認的事務隔離級別. 另外四個與JDBC的隔離級別相對應
  2. ISOLATION_READ_UNCOMMITTED: 這是事務最低的隔離級別,它充許令外一個事務能夠看到這個事務未提交的數據。 這種隔離級別會產生髒讀,不可重複讀和幻像讀。
  3. ISOLATION_READ_COMMITTED: 保證一個事務修改的數據提交後才能被另一個事務讀取。另一個事務不能讀取該事務未提交的數據
  4. ISOLATION_REPEATABLE_READ: 這種事務隔離級別能夠防止髒讀,不可重複讀。可是可能出現幻像讀。 它除了保證一個事務不能讀取另外一個事務未提交的數據外,還保證了避免下面的狀況產生(不可重複讀)。
  5. ISOLATION_SERIALIZABLE: 這是花費最高代價可是最可靠的事務隔離級別。事務被處理爲順序執行。 除了防止髒讀,不可重複讀外,還避免了幻像讀。

接下來看一下代碼方式與標籤方式的事務實現:bash

## 代碼方式實現
public OrderService{
    @Autowire 
    PlatformTransactionManager txManager;
    void buyTicket(BuyTicketDTO dto){
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = txManager.getTransaction(def);
        try{
            ## 執行業務代碼
            txManager.commit(status);
        }catch(Exception e){
            txManager.rollback(status);
        }
    }
}

## Transaction標籤實現方式
public OrderService{
    @Transactonal
    void buyTick(BuyTicketDTO dto){
        // get and begin transaction manager from context
        try{
            /**業務代碼*/
            // commit transaction
        }catch(Exception e){
            // rollback transaction
        }
    }
}
複製代碼

策略接口 PlatformTransactionManager事務管理器的常見實現有:
DataSourceTransactionManager、JpTransactionManager、JmsTransactionManager、JtaTransactionManager

JPA簡介及事務實現

JPA是Java的一個規範(Java持久性API)。 它用於在Java對象和關係數據庫之間保存數據。 JPA充當面向對象的領域模型和關係數據庫系統之間的橋樑。 因爲JPA只是一個規範,它自己不執行任何操做。 它須要一個實現。 所以,像Hibernate,TopLink和iBatis這樣的ORM工具實現了JPA數據持久性規範。
關於JPA事務實例的代碼:
domian實體對象服務器

@Entity(name = "customer")
public class Customer {
    ## id 自增加
    @Id
    @GeneratedValue
    private Long id;
    ## 惟一索引
    @Column(name = "user_name", unique = true)
    private String username;
    private String password;
    private String role;

    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public String getRole() {
        return role;
    }
    public void setRole(String role) {
        this.role = role;
    }
}
複製代碼

dao 接口app

// 繼成JpaRepository中的方法,其中已經包含基本的CRUD
public interface CustomerRepository extends JpaRepository<Customer, Long> {
    Customer findOneByUsername(String username);
}
複製代碼

service 業務操做,如下2種事務實現的效果是同樣的,意在告訴你們如何使用代碼的方式實現與註解聲明事務相同的效果。框架

@Service
public class CustomerServiceTxInAnnotation {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
    
    @Autowired
    private CustomerRepository customerRepository;
    ## 使用註解的方式聲明事務存在
    @Transactional
    public Customer create(Customer customer) {
        LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用戶已經存在");
        }
        customer.setUsername("Annotation:" + customer.getUsername());
        return customerRepository.save(customer);
    }
}
複製代碼
@Service
public class CustomerServiceTxInCode {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);

    @Autowired
    private CustomerRepository customerRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;

    public Customer create(Customer customer) {
        LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用戶已經存在");
        }
        ## 使用代碼的方式來聲明事務存在
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
        ## 當使用ISOLATION_SERIALIZABLE級別時,若是外部沒事務存在,則自己建立事務,,因此submitError方法拋出異常能夠回滾
        //def.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
        ## 當使用PROPAGATION_REQUIRED級別時,若是外部沒事務存在,則自己也不存在事務,,因此submitError方法拋出異常依然能夠保存數據成功 
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            customer.setUsername("Code:" + customer.getUsername());
            customerRepository.save(customer);
            submitError();
            transactionManager.commit(status);
            return customer;
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
    private void submitError(){
        throw new RuntimeException("some Data error ")
    }
}
複製代碼

controller 控制層dom

@RestController
@RequestMapping("/api/customer")
public class CustomerResource {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class);

    @Autowired
    private CustomerServiceTxInAnnotation customerService;
    @Autowired
    private CustomerServiceTxInCode customerServiceInCode;
    @Autowired
    private CustomerRepository customerRepository;

    @PostMapping("/annotation")
    public Customer createInAnnotation(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername());
        return customerService.create(customer);
    }
    @PostMapping("/code")
    public Customer createInCode(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in code create customer:{}", customer.getUsername());
        return customerServiceInCode.create(customer);
    }
    @GetMapping("")
    public List<Customer> getAll() {
        return customerRepository.findAll();
    }
}
複製代碼

接下來看一下程序的執行結果及JPA事務的管理過程:

在整個事務管理過程當中使用的是Spring事務控制,而且由相關ORM框架實現JPA規範

JMS事務原理

Spring JMS Session

  • 經過Session進行事務管理操做
  • Session 是一個thread-bound(線程範圍內)
  • 事務上下文:一個線程一個Session

Spring JMS事務類型

  • Session管理的事務-原生事務
  • 外部管理的事務-JmsTransactionManager、JTA

Srping JMS事務機制過程

Session原生事務:

JmsTransactionManager事務:

##註解方式注入Spring Bean
@EnableJms
@Configuration
public class JmsConfig {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);

    @Bean
    public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
        LOG.debug("init jms template with converter.");
        JmsTemplate template = new JmsTemplate();
        ## JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必須是同一個,不能在這裏封裝成caching之類的。
        template.setConnectionFactory(connectionFactory); 
        return template;
    }

    ## 這個用於設置 @JmsListener使用的containerFactory
    @Bean
    public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory,
                                                     DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                     PlatformTransactionManager transactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setTransactionManager(transactionManager);
        factory.setCacheLevelName("CACHE_CONNECTION");
        factory.setReceiveTimeout(10000L);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}

複製代碼
@Service
public class CustomerService {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);

    @Autowired
    JmsTemplate jmsTemplate;
    @Autowired
    private PlatformTransactionManager transactionManager;

    @PostConstruct
    public void init() {
        jmsTemplate.setReceiveTimeout(3000);
    }
    ## 原生事務
    @JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory")
    public void handle(String msg) {
        LOG.debug("Get JMS message to from customer:{}", msg);
        String reply = "Replied - " + msg;
        jmsTemplate.convertAndSend("customer:msg:reply", reply);
        if (msg.contains("error")) {
            simulateError();
        }
    }
    ## JmsTransactionManager事務
    @JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory")
    public void handle2(String msg) {
        LOG.debug("Get JMS message2 to from customer:{}", msg);
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            String reply = "Replied-2 - " + msg;
            jmsTemplate.convertAndSend("customer:msg:reply", reply);
            if (!msg.contains("error")) {
                transactionManager.commit(status);
            } else {
                transactionManager.rollback(status);
            }
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }

    private void simulateError() {
        throw new RuntimeException("some Data error.");
    }
}
複製代碼

Spring 本地事務

緊密依賴於底層資源管理器(例如數據庫鏈接 ),事務處理侷限在當前事務資源內。此種事務處理方式不存在對應用服務器的依賴,於是部署靈活卻沒法支持多數據源的分佈式事務。

  • Spring容器管理事務的生命週期
  • 經過Spring事務接口調用
  • 業務代碼與具體事務的實現無關

在數據庫鏈接中使用本地事務示例以下:

public void transferAccount() { 
       Connection conn = null; 
       Statement stmt = null; 
       try{ 
           conn = getDataSource().getConnection(); 
           ## 將自動提交設置爲 false,
           ## 若設置爲 true 則數據庫將會把每一次數據更新認定爲一個事務並自動提交
           conn.setAutoCommit(false);
           stmt = conn.createStatement(); 
           ## 將 A 帳戶中的金額減小 500 
           stmt.execute("\ update t_account set amount = amount - 500 where account_id = 'A'");
           ## 將 B 帳戶中的金額增長 500 
           stmt.execute("\ update t_account set amount = amount + 500 where account_id = 'B'");
           ## 提交事務
           conn.commit();
           ## 事務提交:轉帳的兩步操做同時成功
       } catch(SQLException sqle){            
           try{ 
               ## 發生異常,回滾在本事務中的操作
              conn.rollback();
               ## 事務回滾:轉帳的兩步操做徹底撤銷
               stmt.close(); 
               conn.close(); 
           }catch(Exception ignore){ } 
           sqle.printStackTrace(); 
       } 
   }
複製代碼

本地事務機制過程圖:

Spring 外部(全局)事務

  • 外部事務管理器提供事務管理
  • 經過Spring事務接口,調用外部管理器
  • 使用JNDI等方式獲取外部事務管理器的實例
  • 外部事務管理器通常由應用服務器提供、如JBoss等

JNDI(Java Naming and Directory Interface,Java命名和目錄接口)是SUN公司提供的一種標準的Java命名系統接口,JNDI提供統一的客戶端API,經過不一樣的訪問提供者接口JNDI服務供應接口(SPI)的實現,由管理者將JNDI API映射爲特定的命名服務和目錄系統,使得Java應用程序能夠和這些命名服務和目錄服務之間進行交互。

相關文章
相關標籤/搜索