【java併發與多線程系列】可暫停線程執行器

場景

在一些活動場景中,咱們常常須要處理一些營銷活動,通常都有中途暫停,中途重啓的需求。好比一個派券需求,我須要給1千萬的用戶派券,因爲某些緣由,須要暫停這個活動一會,而後再從新接着派發。redis

原理

重寫 beforeExecute線程

實現

@Configuration
@EnableScheduling
@EnableAsync
public class MultiThreadExecutorMonitor {

   private static final Logger LOG = LoggerFactory.getLogger(MultiThreadExecutorMonitor.class);

   private static PausableThreadPoolExecutor es = new PausableThreadPoolExecutor(
           50,
           100,
           0L,
           TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(100), // 生產的業務 能夠經過 redis Queue 來處理
           new ThreadPoolExecutor.DiscardPolicy()  // 直接掉棄
   );

   private static ExecutorService peopler = new ThreadPoolExecutor(
           50,
           100,
           0L,
           TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(100)  // 定量的無界阻塞隊列
   );

   private static AtomicInteger counter = new AtomicInteger(0);

   private static AtomicBoolean isPase = new AtomicBoolean(true);

   /**
    * 監控線程執行器
    *
    * @throws InterruptedException
    */
   @Async
   @Scheduled(fixedDelay = 3000)
   public void cronJob() throws InterruptedException {
       threadMonitor();
       LOG.info("監控結束 {}", counter.get());
   }

   /**
    *  嘗試任務
    *
    * @throws InterruptedException
    */
   @Async
   @Scheduled(fixedDelay = 9000)
   public void cronTryJob() throws Exception {
       runThread();
       LOG.info(" 嘗試任務 {}", counter.get());
   }


   /**
    * 模擬人爲的暫停,重啓
    *
    * @throws InterruptedException
    */
   @Async
   @Scheduled(fixedDelay = 15000)
   public void cronPaseOrResumeJob() throws InterruptedException {
         controllerThreadExecitor();
         LOG.info("操做控制線程執行器 {}", counter.get());
   }

   public static void main(String[] args) throws Exception {
       AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(
               MultiThreadExecutorMonitor.class);
       try {
           isPase.getAndSet(false);
           runThread();
           Thread.currentThread().join();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } finally {
           context.close();
       }
   }

   public static void runThread() throws Exception {
       while (!isPase.get()) {
               // ToDo: 斷點重試工做 ...
               es.execute(() -> {
//                  LOG.info("執行:{}", counter.incrementAndGet());
                   counter.incrementAndGet();
                   try {
                       Thread.sleep(1000);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               });
       }
   }

   public static void controllerThreadExecitor() {
       if(isPase.get()) {
           LOG.info("=============================== 重啓線程池 :{}", isPase.get());
           isPase.getAndSet(false);
           es.resume();
       } else  {
           LOG.info("=============================== 暫停線程池 :{}", isPase.get());
           isPase.getAndSet(true);
           es.pause();
       }

   }

    public  static void  threadMonitor() {
        ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);
        int queueSize = tpe.getQueue().size();
        LOG.info("當前排隊線程數:{}" ,  queueSize);
        int activeCount = tpe.getActiveCount();
        LOG.info("當前活動線程數:{}" ,  activeCount);
        long completedTaskCount = tpe.getCompletedTaskCount();
        LOG.info("執行完成線程數:{}" ,  completedTaskCount);
        boolean isShutdown = tpe.isShutdown();
        LOG.info("是否暫停現場執行:{}" ,  isShutdown);
        boolean  isTerminated = tpe.isTerminated();
        LOG.info("isTerminated:{}" ,  isTerminated);
        boolean isTerminating = tpe.isTerminating();
        LOG.info("isTerminating:{}" ,  isTerminating);
        long taskCount = tpe.getTaskCount();
        LOG.info("總線程數:{}" ,  taskCount);
    }
}
相關文章
相關標籤/搜索