多線程事務控制

spring沒法對多線程進行事務控制,緣由是:java

多線程底層鏈接數據庫的時候,是使用的線程變量(TheadLocal),因此,開多少線程理論上就會創建多少個鏈接,每一個線程有本身的鏈接,事務確定不是同一個了。
解決辦法:我強制手動把每一個線程的事務狀態放到一個同步集合裏面。而後若是有單個異常,循環回滾每一個線程。spring

假如service中的一個方法由如下邏輯構成:數據庫

1.前面的是調用多線程前的操做安全

2.調用多線程的操做多線程

假設其中任何一個與數據庫的更新操做發生了異常,想要總體回滾怎麼辦?那麼就要用到如下的方式了:app

List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>()); // 在每組邏輯操做以前加入如下代碼 // 使用這種方式將事務狀態都放在同一個事務裏面 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啓新事務,這樣會比較安全些。 TransactionStatus status = transactionManager.getTransaction(def); // 得到事務狀態

詳細DEMO:ide

TestServiceImpl:spa

package com.test.impl;


import com.test.entity.User2;
import com.test.entity.User3;
import com.test.mapper.User2Mapper;
import com.test.mapper.User3Mapper;
import com.test.service.TestBService;
import com.test.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created by liuyachao on 2018/9/3.
 */
@Slf4j
@Service
public class TestServiceImpl implements TestService {
    @Autowired
    private User2Mapper user2Mapper;

    @Autowired
    private User3Mapper user3Mapper;

    @Autowired
    private TestBService testBService;
    @Autowired
    private PlatformTransactionManager transactionManager;

    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());

    int count = 112;

    static int countTest = 0;

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
    public int saveUser2(User2 user2) {
        Integer result = 0;
        try{
            result = user2Mapper.insertSelective(user2);
            //int i = 1/0;
            if(user2.getId() == 114){
                int i = 1/0;
            }
        }catch (Exception e){
            log.error("插入異常",e);
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return result;
        }
        return result;
    }

    @Override
    public User3 getUser3List(User3 user3) {
        User3 result =user3Mapper.selectByPrimaryKey(user3.getId());
        return result;
    }

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
    public void threadMethod(){
        User2 user1 = new User2();
        user1.setId(111);
        user1.setPassword("1");
        user1.setUsername("1");
        try{
            // 使用這種方式將事務狀態都放在同一個事務裏面
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啓新事務,這樣會比較安全些。
            TransactionStatus status = transactionManager.getTransaction(def); // 得到事務狀態
 transactionStatuses.add(status);             testBService.saveUser2(user1);
        }catch (Exception e){
            e.printStackTrace();
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        }

        System.out.println("main insert is over");
        try{
            for(int a=0 ;a<3;a++){
                ThreadOperation threadOperation= new ThreadOperation();
                Thread innerThread = new Thread(threadOperation);
                /*innerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread t, Throwable e){
                        *//*throw new RuntimeException();
                        log.error("###內部線程發生異常");
                        e.printStackTrace();*//*
                        // 這邊回滾很差使,須要用邏輯刪除處理增長的數據
                        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                    }
                });*/
                innerThread.start();
            }
        }catch (Exception e){
            log.error("###線程異常");
            e.printStackTrace();
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        }
    }

    public class ThreadOperation implements Runnable {
        @Override
        public void run() {
            try{
                // 使用這種方式將事務狀態都放在同一個事務裏面
                DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啓新事務,這樣會比較安全些。
                TransactionStatus status = transactionManager.getTransaction(def); // 得到事務狀態
 transactionStatuses.add(status);                 User2 user2 = new User2();
                user2.setId(count++);
                user2.setPassword("10");
                user2.setUsername("10");
                /**
                 * 1.這裏若是用其餘類的saveUser2方法,在這個線程內事務生效,其餘線程不受影響
                 * 2.若是是用本類的方法,這個線程內的事務不生效,其餘線程也不受影響
                 */
                testBService.saveUser2(user2); // testBService.
                System.out.println("thread insert is over");
            }catch (Exception e){
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                //throw new RuntimeException();
                // 事務回滾無論用
                /*TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                throw new RuntimeException();*/
                /*for (TransactionStatus transactionStatus:transactionStatuses) {
                    transactionStatus.setRollbackOnly();
                }*/
            }
        }
    }

    /**
     * 多線程爭奪全局資源
     * @param args
     */
    public static void main(String[] args){
        for(int a=0 ;a<100;a++){
            ThreadOperation2 threadOperation2 = new ThreadOperation2();
            Thread innerThread = new Thread(threadOperation2);
            innerThread.start();
        }
        System.out.println(countTest);
    }

    public static class ThreadOperation2 implements Runnable {
        @Override
        public void run() {
            countTest++;
        }
    }

}

TestService:線程

package com.test.service;

import com.test.entity.User2;
import com.test.entity.User3;

/**
 * Created by liuyachao on 2018/9/3.
 */
public interface TestService {
    int saveUser2(User2 user2);

    User3 getUser3List(User3 user3);

    void threadMethod();
}

TestBService:code

package com.test.service;

import com.test.entity.User2;
import com.test.entity.User3;

/**
 * Created by liuyachao on 2018/9/3.
 */
public interface TestBService {
    int saveUser2 (User2 user2);

    User3 getUser3List(User3 user3);
}

TestBServiceImpl:

package com.test.impl;


import com.test.entity.User2;
import com.test.entity.User3;
import com.test.mapper.User2Mapper;
import com.test.mapper.User3Mapper;
import com.test.service.TestBService;
import com.test.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;

/**
 * Created by liuyachao on 2018/9/3.
 */
@Slf4j
@Service
public class TestBServiceImpl implements TestBService {
    @Autowired
    private User2Mapper user2Mapper;

    @Autowired
    private User3Mapper user3Mapper;

    int count = 112;

    static int countTest = 0;

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
    public int saveUser2(User2 user2){
        Integer result = 0;
        /*try{*/
            result = user2Mapper.insertSelective(user2);
            if(user2.getId() == 114){
                int i = 1/0;
            }
        /*}catch (Exception e){
            log.error("插入異常",e);
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return result;
        }*/
        return result;
    }

    @Override
    public User3 getUser3List(User3 user3) {
        User3 result =user3Mapper.selectByPrimaryKey(user3.getId());
        return result;
    }

}

User2:

package com.test.entity;

import lombok.Data;

import java.io.Serializable;

@Data
public class User2 implements Serializable{
    private static final long serialVersionUID = 9085886691811169694L;
    private Integer id;

    private String username;

    private String password;
}


具體的mapper等方法本身能夠作一個屬於本身的demo來驗證事務是否總體回滾:

此demo操做均爲新增數據的操做,調用多線程前、調用多線程均爲新增數據。

在多線程中的testBService.saveUser2(user2); 中saveUser2方法中模擬一個異常如:int i = 1/0;,來驗證當其中一個線程知足條件下發生異常的時候,事務總體回滾,數據庫中並無新增數據

相關文章
相關標籤/搜索