spring沒法對多線程進行事務控制,緣由是:java
多線程底層鏈接數據庫的時候,是使用的線程變量(TheadLocal),因此,開多少線程理論上就會創建多少個鏈接,每一個線程有本身的鏈接,事務確定不是同一個了。
解決辦法:我強制手動把每一個線程的事務狀態放到一個同步集合裏面。而後若是有單個異常,循環回滾每一個線程。spring
假如service中的一個方法由如下邏輯構成:數據庫
1.前面的是調用多線程前的操做安全
2.調用多線程的操做多線程
假設其中任何一個與數據庫的更新操做發生了異常,想要總體回滾怎麼辦?那麼就要用到如下的方式了:app
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>()); // 在每組邏輯操做以前加入如下代碼 // 使用這種方式將事務狀態都放在同一個事務裏面 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啓新事務,這樣會比較安全些。 TransactionStatus status = transactionManager.getTransaction(def); // 得到事務狀態
詳細DEMO:ide
TestServiceImpl:spa
package com.test.impl; import com.test.entity.User2; import com.test.entity.User3; import com.test.mapper.User2Mapper; import com.test.mapper.User3Mapper; import com.test.service.TestBService; import com.test.service.TestService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.interceptor.TransactionAspectSupport; import org.springframework.transaction.support.DefaultTransactionDefinition; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * Created by liuyachao on 2018/9/3. */ @Slf4j @Service public class TestServiceImpl implements TestService { @Autowired private User2Mapper user2Mapper; @Autowired private User3Mapper user3Mapper; @Autowired private TestBService testBService; @Autowired private PlatformTransactionManager transactionManager; List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>()); int count = 112; static int countTest = 0; @Override @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public int saveUser2(User2 user2) { Integer result = 0; try{ result = user2Mapper.insertSelective(user2); //int i = 1/0; if(user2.getId() == 114){ int i = 1/0; } }catch (Exception e){ log.error("插入異常",e); TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); return result; } return result; } @Override public User3 getUser3List(User3 user3) { User3 result =user3Mapper.selectByPrimaryKey(user3.getId()); return result; } @Override @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public void threadMethod(){ User2 user1 = new User2(); user1.setId(111); user1.setPassword("1"); user1.setUsername("1"); try{ // 使用這種方式將事務狀態都放在同一個事務裏面 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啓新事務,這樣會比較安全些。 TransactionStatus status = transactionManager.getTransaction(def); // 得到事務狀態 transactionStatuses.add(status); testBService.saveUser2(user1); }catch (Exception e){ e.printStackTrace(); TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } System.out.println("main insert is over"); try{ for(int a=0 ;a<3;a++){ ThreadOperation threadOperation= new ThreadOperation(); Thread innerThread = new Thread(threadOperation); /*innerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e){ *//*throw new RuntimeException(); log.error("###內部線程發生異常"); e.printStackTrace();*//* // 這邊回滾很差使,須要用邏輯刪除處理增長的數據 TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } });*/ innerThread.start(); } }catch (Exception e){ log.error("###線程異常"); e.printStackTrace(); TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } } public class ThreadOperation implements Runnable { @Override public void run() { try{ // 使用這種方式將事務狀態都放在同一個事務裏面 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啓新事務,這樣會比較安全些。 TransactionStatus status = transactionManager.getTransaction(def); // 得到事務狀態 transactionStatuses.add(status); User2 user2 = new User2(); user2.setId(count++); user2.setPassword("10"); user2.setUsername("10"); /** * 1.這裏若是用其餘類的saveUser2方法,在這個線程內事務生效,其餘線程不受影響 * 2.若是是用本類的方法,這個線程內的事務不生效,其餘線程也不受影響 */ testBService.saveUser2(user2); // testBService. System.out.println("thread insert is over"); }catch (Exception e){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); //throw new RuntimeException(); // 事務回滾無論用 /*TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); throw new RuntimeException();*/ /*for (TransactionStatus transactionStatus:transactionStatuses) { transactionStatus.setRollbackOnly(); }*/ } } } /** * 多線程爭奪全局資源 * @param args */ public static void main(String[] args){ for(int a=0 ;a<100;a++){ ThreadOperation2 threadOperation2 = new ThreadOperation2(); Thread innerThread = new Thread(threadOperation2); innerThread.start(); } System.out.println(countTest); } public static class ThreadOperation2 implements Runnable { @Override public void run() { countTest++; } } }
TestService:線程
package com.test.service; import com.test.entity.User2; import com.test.entity.User3; /** * Created by liuyachao on 2018/9/3. */ public interface TestService { int saveUser2(User2 user2); User3 getUser3List(User3 user3); void threadMethod(); }
TestBService:code
package com.test.service; import com.test.entity.User2; import com.test.entity.User3; /** * Created by liuyachao on 2018/9/3. */ public interface TestBService { int saveUser2 (User2 user2); User3 getUser3List(User3 user3); }
TestBServiceImpl:
package com.test.impl; import com.test.entity.User2; import com.test.entity.User3; import com.test.mapper.User2Mapper; import com.test.mapper.User3Mapper; import com.test.service.TestBService; import com.test.service.TestService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.interceptor.TransactionAspectSupport; /** * Created by liuyachao on 2018/9/3. */ @Slf4j @Service public class TestBServiceImpl implements TestBService { @Autowired private User2Mapper user2Mapper; @Autowired private User3Mapper user3Mapper; int count = 112; static int countTest = 0; @Override @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public int saveUser2(User2 user2){ Integer result = 0; /*try{*/ result = user2Mapper.insertSelective(user2); if(user2.getId() == 114){ int i = 1/0; } /*}catch (Exception e){ log.error("插入異常",e); TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); return result; }*/ return result; } @Override public User3 getUser3List(User3 user3) { User3 result =user3Mapper.selectByPrimaryKey(user3.getId()); return result; } }
User2:
package com.test.entity; import lombok.Data; import java.io.Serializable; @Data public class User2 implements Serializable{ private static final long serialVersionUID = 9085886691811169694L; private Integer id; private String username; private String password; }
具體的mapper等方法本身能夠作一個屬於本身的demo來驗證事務是否總體回滾:
此demo操做均爲新增數據的操做,調用多線程前、調用多線程均爲新增數據。
在多線程中的testBService.saveUser2(user2); 中saveUser2方法中模擬一個異常如:int i = 1/0;,來驗證當其中一個線程知足條件下發生異常的時候,事務總體回滾,數據庫中並無新增數據