【進階之路】線程池拓展與CompletionService操做異步任務

你們好,我是練習java兩年半時間的南橘,小夥伴能夠一塊兒互相交流經驗哦。java

1、擴展ThreadPoolExecutor

一、擴展方法介紹

ThreadPoolExecutor是能夠擴展的,它內部提供了幾個能夠在子類中改寫的方法(紅框內)。JDK內的註解上說,這些方法能夠用以添加日誌,計時、監視或進行統計信息的收集。是否是感受很熟悉?有沒有一種spring aop中 @Around @Before @After三個註解的既視感?spring

咱們來對比一下dom

ThreadPoolExecutor spring aop
beforeExecute()(線程執行以前調用 @Before(在所攔截的方法執行以前執行
afterExecute() (線程執行以後調用 @After (在所攔截的方法執行以後執行
terminated() (線程池退出時候調用
@Around(能夠同時在所攔截的方法先後執行

其實他們的效果是同樣的,只是一個在線程池裏,一個在攔截器中。異步

對於ThreadPoolExecutor中的這些方法,有這樣的一些特色:性能

  • 一、不管任務時從run中正常返回,仍是拋出一個異常而返回,afterExecute都會被調用(可是若是任務在完成後帶有一個Error,那麼就不會調用afterExecute)測試

  • 二、同時,若是beforeExecute拋出一個RuntimeExecption,那麼任務將不會被執行,連帶afterExecute也不會被調用了優化

  • 三、在線程池完成關閉操做時會調用terminated,相似於try-catch中的finally操做同樣。terminated能夠用來釋放Executor在其生命週期裏分配的各類資源,此外也能夠用來執行發送通知、記錄日誌亦或是收集finalize統計信息等操做線程

二、擴展方法實現

咱們先構建一個自定義的線程池,它經過擴展方法來添加日誌記錄和統計信息的收集。爲了測量任務的運行時間,beforeExecute必須記錄開始時間並把它保存到一個afterExecute能夠訪問的地方,因而用ThreadLocal來存儲變量,用afterExecute來讀取,並經過terminated來輸出平均任務和日誌消息。日誌

public class WeedThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime =new ThreadLocal<>();
    private final Logger log =Logger.getLogger("WeedThreadPool");
    //統計執行次數
    private final AtomicLong numTasks =new AtomicLong();
    //統計總執行時間
    private final AtomicLong totalTime =new AtomicLong();
    /**
     * 這裏是實現線程池的構造方法,我隨便選了一個,你們能夠根據本身的需求找到合適的構造方法
     */
    public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    //線程執行以前調用
    protected void  beforeExecute(Thread t,Runnable r){
        super.beforeExecute(t,r);
        System.out.println(String.format("Thread %s:start %s",t,r));
        //由於currentTimeMillis返回的是ms,而衆所周知ms是很難產生差別的,因此換成了nanoTime用ns來展現
        startTime.set(System.nanoTime());
    }
    //線程執行以後調用
    protected void afterExecute(Runnable r,Throwable t){
        try {
            Long endTime =System.nanoTime();
            Long taskTime =endTime-startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread(),r,taskTime));
        }finally {
            super.afterExecute(r,t);
        }
    }
    //線程池退出時候調用
   protected void terminated(){
        try{
            System.out.println(String.format("Terminated: avg time =%dns, ",totalTime.get()/numTasks.get()));
        }finally {
            super.terminated();
        }
   }

}

測試案例:code

public class WeedThreadTest {
     BlockingQueue<Runnable> taskQueue;
   final static WeedThreadPool weedThreadPool =new WeedThreadPool(3,10,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++) {
            weedThreadPool.execute(WeedThreadTest::run);
        }
        Thread.sleep(2000L);
        weedThreadPool.shutdown();
    }

    private static void run() {
        System.out.println("thread id is: " + Thread.currentThread().getId());
        try {
            Thread.sleep(1024L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

三、使用場景

用到這些方法的地方其實和用到Spring AOP中一些場景比較類似,主要在記錄跟蹤、優化等方面可使用,如日誌記錄和統計信息的收集、測量任務的運行時間,以及一些任務完成以後發送通知、郵件、信息之類的。

2、CompletionService操做異步任務

一、異步方法的原理

若是咱們意外收穫了一大批待執行的任務(舉個例子,好比去調用各大旅遊軟件的出行機票信息),爲了提升任務的執行效率,咱們可使用線程池submit異步計算任務,經過調用Future接口實現類的get方法獲取結果。

雖然使用了線程池會提升執行效率,可是調用Future接口實現類的get方法是阻塞的,也就是和當前這個Future關聯的任務所有執行完成的時候,get方法才返回結果,若是當前任務沒有執行完成,而有其它Future關聯的任務已經完成了,就會白白浪費不少等待的時間。

因此,有沒有這樣一個方法,遍歷的時候誰先執行完成就先獲取哪一個結果?

沒錯,咱們的ExecutorCompletionService就能夠實現這樣的效果,它的內部有一個先進先出的阻塞隊列,用於保存已經執行完成的Future,經過調用它的take方法或poll方法能夠獲取到一個已經執行完成的Future,進而經過調用Future接口實現類的get方法獲取最終的結果

邏輯圖以下:

ExecutorCompletionService實現了CompletionService接口,在CompletionService接口中定義了以下這些方法:

  • Future submit(Callable task): 提交一個Callable類型任務,並返回該任務執行結果關聯的Future

  • Future submit(Runnable task,V result): 提交一個Runnable類型任務,並返回該任務執行結果關聯的Future

  • Future take(): 從內部阻塞隊列中獲取並移除第一個執行完成的任務,阻塞,直到有任務完成

  • Future poll(): 從內部阻塞隊列中獲取並移除第一個執行完成的任務,獲取不到則返回null,不阻塞

  • Future poll(long timeout, TimeUnit unit): 從內部阻塞隊列中獲取並移除第一個執行完成的任務,阻塞時間爲timeout,獲取不到則返回null

二、異步方法的實現

public class WeedExecutorServiceDemo {
    /**
     * 繼續用以前建好的線程池,只是調整一下池大小
     */
    BlockingQueue<Runnable> taskQueue;
    final static WeedThreadPool weedThreadPool = new WeedThreadPool(1, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
    public static Random r = new Random();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(weedThreadPool);
        for (int i = 0; i < 3; i++) {
            cs.submit(() -> {
                //獲取計算任務
                int init = 0;
                for (int j = 0; j < 100; j++) {
                    init += r.nextInt();
                }
                Thread.sleep(1000L);
                return Integer.valueOf(init);
            });
        }
        weedThreadPool.shutdown();
        /**
         * 經過take方法獲取,阻塞,直到有任務完成
         */
        for (int i = 0; i < 3; i++) {
            Future<Integer> future = cs.take();
            if (future != null) {
                System.out.println(future.get());
            }
        }
    }
}

調用結果以下

咱們也能夠經過poll方法來獲取

/**
         * 經過poll方法獲取
         */
        for (int i = 0; i < 3; i++) {
              System.out.println(cs.poll(1200L,TimeUnit.MILLISECONDS).get());

        }

結果天然是同樣的


若是把阻塞時間改小一些,目前的代碼就會出問題

/**
         * 經過poll方法獲取
         */
        for (int i = 0; i < 3; i++) {
            System.out.println(cs.poll(800L,TimeUnit.MILLISECONDS).get());

        }

一樣的,poll方法也能夠用來打斷超時執行的業務,好比在poll超時的狀況下,直接調用線程池的shutdownNow(),殘暴地關閉整個線程池。

for (int i = 0; i < 3; i++) {
            Future<Integer> poll = cs.poll(800L, TimeUnit.MILLISECONDS);
            if (poll==null){
                System.out.println("執行結束");
                weedThreadPool.shutdownNow();
            }
        }

三、使用場景

選擇怎麼樣的方法來異步執行任務,什麼樣的方式來接收任務,也是須要根據實際狀況來考慮的。

  • 1.、須要批量提交異步任務的時候建議你使用 CompletionService。CompletionService 將線程池 Executor 和阻塞隊列 BlockingQueue 的功能融合在了一塊兒,可以讓批量異步任務的管理更簡單。

  • 二、讓異步任務的執行結果有序化。先執行完的先進入阻塞隊列,利用這個特性,你能夠輕鬆實現後續處理的有序性,避免無謂的等待。

  • 三、線程池隔離。CompletionService支持建立知己的線程池,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。

有須要的同窗能夠加個人公衆號,之後的最新的文章第一時間都在裏面,須要以前文章的思惟導圖也能夠給我留言

相關文章
相關標籤/搜索