定時任務使用多線程注意事項

在定時任務中爲了加快處理速度,通常都會使用多線程處理業務。須要注意一下事項:java

1. 定時任務是否容許上一個定時任務未結束,下一個定時任務能夠啓動,經過Scheduled中的配置在決定。服務器

2. 主線程已經關閉,線程池中的線程還在運行問題。線程池的關閉方法問題多線程

3. 定時任務有大量數據,致使服務沒法中止問題。線程

4. 如何獲取線程的處理結果code

 

以下代碼是示例,stop狀態的使用和線程池shutdown的處理邏輯須要依據本身的業務來處理。orm

 

@PreDestroy
    public void destory(){
        stop = true;
    }

    //線程中止狀態, 經過註解檢測到服務器中止時修改stop狀態
    boolean stop = false;

    //服務器啓動後延遲1分鐘執行,定時任務結束後延遲1分鐘執行下一次
    @Scheduled(initialDelay = 60*1000L, fixedDelay = 60*1000L)
    public void scheduling(){
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            dataList.add("data_"+i);
        }



        int threadSize = 10;
        ExecutorService esPool = Executors.newFixedThreadPool(threadSize);

        //接收線程的完成時間 或者其餘返回結果
        CompletionService<String> ecs = new ExecutorCompletionService<String>(esPool);

        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(dataList);
        logger.info("===============start {}==================", System.currentTimeMillis());
        //啓動線程時修改退出線程狀態
        stop = false;
        for (int i = 0; i < threadSize; i++) {
            ecs.submit(()->{
                long count = 0;
                //線程處理加try catch 防止拋出異常中斷線程,可能會致使線程池中全部的線程都中斷,無可用線程
                try{
                    // !queue.isEmpty()比queue.size()>0效率高不少 .size() 是要遍歷一遍集合的
                    while (!stop && !queue.isEmpty()){
                        String data = queue.poll();

                        //500 能夠在60秒內完成處理,正常退出
                        //改爲 1000 若是不使用下面的收集結果代碼,60秒內沒法處理完,會強制shutdown 拋出異常
                        Thread.sleep(1000L);
                        logger.info("data {} ok.",data);
                        count++;
                    }
                }catch (Exception e){
                    logger.error("",e);
                }

                //這裏範圍線程處理的結果
                return System.currentTimeMillis()+"_"+count;
            });
        }

        //獲取線程的返回結果 會阻塞主線程,直到線程池中全部的線程返回結果
        /*try {
            for (int i = 0; i < threadSize; i++) {
                String threadTime = ecs.take().get();
                logger.info("thread run ok time:{}"+threadTime);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }catch (ExecutionException e) {
            e.printStackTrace();
        }*/

        //關閉線程池
        try {
            esPool.shutdown();
            logger.info("esPoll shutdown:{}", DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT));
            //線程池阻塞,到指定的時間退出,若是全部線程執行完成返回true 不然返回false
            boolean await = esPool.awaitTermination(60*1000L,TimeUnit.MILLISECONDS);
            logger.info("esPool.awaitTermination 1:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT));
            if(!await) {
                stop = true;
                await = esPool.awaitTermination(10*1000L,TimeUnit.MILLISECONDS);
                logger.info("esPool.awaitTermination 2:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT));
            }

            if(!await){
                logger.info("wait 60s not stop, shutdownNow");
                // 超時的時候向線程池中全部的線程發出中斷(interrupted)。
                // 讓線程池中的全部線程當即中斷。 會拋出異常
                esPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            //awaitTermination方法被中斷的時候也停止線程池中所有的線程的執行。
            esPool.shutdownNow();
            logger.error("awaitTermination",e);
        }

        logger.info("===============end {}==================", System.currentTimeMillis());

    }
相關文章
相關標籤/搜索