Spring事務在多線程下保證原子性

背景

Springboot項目,有個需求,須要提供接口,接口調用方每一次調用時,都會上報大量的數據,接口須要知足如下要求:spring

  • 數據保存要保證數據原子性:要麼所有保存成功,要麼所有不保存。
  • 保證接口性能。

實踐發現,即便使用批量保存,接口耗時也高達一秒多,因此須要開啓多線程來保存。如今的問題是,若是保證在開啓多線程保存的狀況下,保證數據的原子性。數據庫

思路

  • 開啓多線程,每一個線程都是使用獨立的DB鏈接。不然因爲數據庫是串行阻塞操做,最終仍是會變成排隊操做數據庫。
  • 依賴spring事務異常回滾機制。
  • 有個統一的標識來標識「是否有線程操做失敗」。
  • 線程若是出現異常:先捕獲異常,將標識設置爲失敗,而後繼續拋出異常。
  • 線程若是沒有異常,在執行的最後,判斷標識是失敗,也就是「有其餘線程有執行失敗」,就自定義拋出異常來回滾。
  • 經過鎖來保證:全部的線程都操做完以後,一塊兒判斷標識是否成功;確保不會出現「還有線程的業務未執行完成,其餘線程就已經結束工做」。

流程圖

失敗流程以下:
58241153.png多線程

代碼

@Slf4j
@Component
public class AtomicConcurrentTransactionalExecutor {
    @Autowired
    private TransactionalWorker transactionalWorker;

    /**
     * @param threadWwaitTerminationTimeout
     * @param runnables
     */
    public boolean execute(int threadWwaitTerminationTimeout, Runnable... runnables) {
        int threadSize = runnables.length;
        CyclicBarrier workerCyclicBarrier = new CyclicBarrier(threadSize);
        AtomicInteger successCounter = new AtomicInteger(threadSize);
        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
        for (Runnable runnable : runnables) {
            executorService.submit(() -> {
                try {
                    transactionalWorker.run(workerCyclicBarrier, successCounter, runnable);
                } catch (Exception e) {
                    log.error("TransactionalWorker current thread execute error before runnable.run!", e);
                }
            });
        }
        ThreadUtils.shutdown(executorService, threadWwaitTerminationTimeout, TimeUnit.SECONDS);
        return successCounter.get() == 0;
    }

    /**
     * @param threadWwaitTerminationTimeout
     * @param threadPollSize
     * @param runnable
     * @return boolean
     * @author minchin
     * @date 2020/2/12 12:33 下午
     */
    public boolean execute(int threadWwaitTerminationTimeout, int threadPollSize, Runnable runnable) {
        Runnable[] runnables = IntStream.range(0, threadPollSize)
                .mapToObj(i -> runnable)
                .toArray(Runnable[]::new);
        return execute(threadWwaitTerminationTimeout, runnables);
    }
}


@Component
@Slf4j
public class TransactionalWorker {

    /**
     * @param workerCyclicBarrier
     * @param successCounter
     * @param runnable
     */
    @Transactional(rollbackFor = Exception.class)
    public void run(CyclicBarrier workerCyclicBarrier, AtomicInteger successCounter, Runnable runnable) {
        boolean isSuccess = false;
        try {
            runnable.run();
            successCounter.decrementAndGet();
            isSuccess = true;
        } catch (Exception e) {
            log.error("TransactionalWorker current thread execute error!", e);
            isSuccess = false;
            throw e;
        } finally {
            try {
                // 若是是數據庫操做慢致使長時間阻塞,並不會被線程池中斷(Interrupt),也就是會等到數據庫操做完成以後,進入到這一步,而後直接報超時異常
                workerCyclicBarrier.await();
            } catch (Exception e) {
                // 等待其餘線程時超時
                log.error("TransactionalWorker current thread execute CyclicBarrier.await error!", e);
                if (isSuccess) {
                    // 要回滾計數,不然:假設所有線程都操做成功,但恰好超時,主線程shutdown線程池後,計數爲0,會返回成功
                    successCounter.incrementAndGet();
                }
            }
        }
        if (successCounter.get() != 0) {
            log.error("TransactionalWorker other thread execute error, create SystemException to rollback!");
            throw new SystemException("TransactionalWorker other thread execute error, create SystemException to rollback!");
        }
    }
}


@Slf4j
public class ThreadUtils {

    private ThreadUtils() {
    }

    /**
     * @param pool
     * @param awaitTerminationTimeout
     * @param timeUnit
     * @return 若是出現異常,則返回false
     */
    public static boolean shutdown(ExecutorService pool, int awaitTerminationTimeout, TimeUnit timeUnit) {
        try {
            pool.shutdown();
            boolean done = false;
            try {
                done = awaitTerminationTimeout > 0 && pool.awaitTermination(awaitTerminationTimeout, timeUnit);
            } catch (InterruptedException e) {
                log.error("thread pool awaitTermination error!", e);
            }
            if (!done) {
                pool.shutdownNow();
                if(awaitTerminationTimeout > 0) {
                    pool.awaitTermination(awaitTerminationTimeout, timeUnit);
                }
            }
        } catch (Exception e) {
            log.error("thread pool shutdown error!", e);
            return false;
        }
        return true;
    }
}

使用例子

  • 一樣的業務,拆分多個線程來
return atomicConcurrentTransactionalExecutor.execute(10, 2,
  // 業務
  () -> testService.test1()
}
  • 不一樣的業務,每一個線程操做不一樣的業務
return atomicConcurrentTransactionalExecutor.execute(10,
  // 業務1
  () -> testService.test1(),
  // 業務2
  () -> testService.test2(),
}
注意在使用時,同一個類內,調用內部方法,Spring事務不生效的問題。 任務超時的便捷測試還須要嚴格再測試!
相關文章
相關標籤/搜索