Springboot項目,有個需求,須要提供接口,接口調用方每一次調用時,都會上報大量的數據,接口須要知足如下要求:spring
實踐發現,即便使用批量保存,接口耗時也高達一秒多,因此須要開啓多線程來保存。如今的問題是,若是保證在開啓多線程保存的狀況下,保證數據的原子性。數據庫
標識
來標識「是否有線程操做失敗」。標識
設置爲失敗
,而後繼續拋出異常。標識
是失敗,也就是「有其餘線程有執行失敗」,就自定義拋出異常來回滾。標識
是否成功;確保不會出現「還有線程的業務未執行完成,其餘線程就已經結束工做」。失敗流程以下:
多線程
@Slf4j @Component public class AtomicConcurrentTransactionalExecutor { @Autowired private TransactionalWorker transactionalWorker; /** * @param threadWwaitTerminationTimeout * @param runnables */ public boolean execute(int threadWwaitTerminationTimeout, Runnable... runnables) { int threadSize = runnables.length; CyclicBarrier workerCyclicBarrier = new CyclicBarrier(threadSize); AtomicInteger successCounter = new AtomicInteger(threadSize); ExecutorService executorService = Executors.newFixedThreadPool(threadSize); for (Runnable runnable : runnables) { executorService.submit(() -> { try { transactionalWorker.run(workerCyclicBarrier, successCounter, runnable); } catch (Exception e) { log.error("TransactionalWorker current thread execute error before runnable.run!", e); } }); } ThreadUtils.shutdown(executorService, threadWwaitTerminationTimeout, TimeUnit.SECONDS); return successCounter.get() == 0; } /** * @param threadWwaitTerminationTimeout * @param threadPollSize * @param runnable * @return boolean * @author minchin * @date 2020/2/12 12:33 下午 */ public boolean execute(int threadWwaitTerminationTimeout, int threadPollSize, Runnable runnable) { Runnable[] runnables = IntStream.range(0, threadPollSize) .mapToObj(i -> runnable) .toArray(Runnable[]::new); return execute(threadWwaitTerminationTimeout, runnables); } } @Component @Slf4j public class TransactionalWorker { /** * @param workerCyclicBarrier * @param successCounter * @param runnable */ @Transactional(rollbackFor = Exception.class) public void run(CyclicBarrier workerCyclicBarrier, AtomicInteger successCounter, Runnable runnable) { boolean isSuccess = false; try { runnable.run(); successCounter.decrementAndGet(); isSuccess = true; } catch (Exception e) { log.error("TransactionalWorker current thread execute error!", e); isSuccess = false; throw e; } finally { try { // 若是是數據庫操做慢致使長時間阻塞,並不會被線程池中斷(Interrupt),也就是會等到數據庫操做完成以後,進入到這一步,而後直接報超時異常 workerCyclicBarrier.await(); } catch (Exception e) { // 等待其餘線程時超時 log.error("TransactionalWorker current thread execute CyclicBarrier.await error!", e); if (isSuccess) { // 要回滾計數,不然:假設所有線程都操做成功,但恰好超時,主線程shutdown線程池後,計數爲0,會返回成功 successCounter.incrementAndGet(); } } } if (successCounter.get() != 0) { log.error("TransactionalWorker other thread execute error, create SystemException to rollback!"); throw new SystemException("TransactionalWorker other thread execute error, create SystemException to rollback!"); } } } @Slf4j public class ThreadUtils { private ThreadUtils() { } /** * @param pool * @param awaitTerminationTimeout * @param timeUnit * @return 若是出現異常,則返回false */ public static boolean shutdown(ExecutorService pool, int awaitTerminationTimeout, TimeUnit timeUnit) { try { pool.shutdown(); boolean done = false; try { done = awaitTerminationTimeout > 0 && pool.awaitTermination(awaitTerminationTimeout, timeUnit); } catch (InterruptedException e) { log.error("thread pool awaitTermination error!", e); } if (!done) { pool.shutdownNow(); if(awaitTerminationTimeout > 0) { pool.awaitTermination(awaitTerminationTimeout, timeUnit); } } } catch (Exception e) { log.error("thread pool shutdown error!", e); return false; } return true; } }
return atomicConcurrentTransactionalExecutor.execute(10, 2, // 業務 () -> testService.test1() }
return atomicConcurrentTransactionalExecutor.execute(10, // 業務1 () -> testService.test1(), // 業務2 () -> testService.test2(), }
注意在使用時,同一個類內,調用內部方法,Spring事務不生效的問題。 任務超時的便捷測試還須要嚴格再測試!