場景
在一些活動場景中,咱們常常須要處理一些營銷活動,通常都有中途暫停,中途重啓的需求。好比一個派券需求,我須要給1千萬的用戶派券,因爲某些緣由,須要暫停這個活動一會,而後再從新接着派發。redis
原理
重寫 beforeExecute線程
實現
@Configuration @EnableScheduling @EnableAsync public class MultiThreadExecutorMonitor { private static final Logger LOG = LoggerFactory.getLogger(MultiThreadExecutorMonitor.class); private static PausableThreadPoolExecutor es = new PausableThreadPoolExecutor( 50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), // 生產的業務 能夠經過 redis Queue 來處理 new ThreadPoolExecutor.DiscardPolicy() // 直接掉棄 ); private static ExecutorService peopler = new ThreadPoolExecutor( 50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100) // 定量的無界阻塞隊列 ); private static AtomicInteger counter = new AtomicInteger(0); private static AtomicBoolean isPase = new AtomicBoolean(true); /** * 監控線程執行器 * * @throws InterruptedException */ @Async @Scheduled(fixedDelay = 3000) public void cronJob() throws InterruptedException { threadMonitor(); LOG.info("監控結束 {}", counter.get()); } /** * 嘗試任務 * * @throws InterruptedException */ @Async @Scheduled(fixedDelay = 9000) public void cronTryJob() throws Exception { runThread(); LOG.info(" 嘗試任務 {}", counter.get()); } /** * 模擬人爲的暫停,重啓 * * @throws InterruptedException */ @Async @Scheduled(fixedDelay = 15000) public void cronPaseOrResumeJob() throws InterruptedException { controllerThreadExecitor(); LOG.info("操做控制線程執行器 {}", counter.get()); } public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( MultiThreadExecutorMonitor.class); try { isPase.getAndSet(false); runThread(); Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); } finally { context.close(); } } public static void runThread() throws Exception { while (!isPase.get()) { // ToDo: 斷點重試工做 ... es.execute(() -> { // LOG.info("執行:{}", counter.incrementAndGet()); counter.incrementAndGet(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } public static void controllerThreadExecitor() { if(isPase.get()) { LOG.info("=============================== 重啓線程池 :{}", isPase.get()); isPase.getAndSet(false); es.resume(); } else { LOG.info("=============================== 暫停線程池 :{}", isPase.get()); isPase.getAndSet(true); es.pause(); } } public static void threadMonitor() { ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es); int queueSize = tpe.getQueue().size(); LOG.info("當前排隊線程數:{}" , queueSize); int activeCount = tpe.getActiveCount(); LOG.info("當前活動線程數:{}" , activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); LOG.info("執行完成線程數:{}" , completedTaskCount); boolean isShutdown = tpe.isShutdown(); LOG.info("是否暫停現場執行:{}" , isShutdown); boolean isTerminated = tpe.isTerminated(); LOG.info("isTerminated:{}" , isTerminated); boolean isTerminating = tpe.isTerminating(); LOG.info("isTerminating:{}" , isTerminating); long taskCount = tpe.getTaskCount(); LOG.info("總線程數:{}" , taskCount); } }