多線程獲取結果還在使用Future輪詢嗎?CompletionService快來了解下吧。

背景

二胖上次寫完參數校驗(《二胖寫參數校驗的坎坷之路》)以後,領導一直不給他安排其餘開發任務,就一直讓他看看代碼熟悉業務。二胖天天上班除了偶爾跟坐在隔壁的前端小姐姐聊聊天,就是看看這些
枯燥無味的業務代碼,無聊的一匹。雖然二胖已經是久經職場的老油條了,可是看到同事們的週報都寫的滿滿的,而本身的週報,就一兩行,熟悉了什麼功能。內心仍是慌得一匹,畢竟公司不養閒人啊。因而乎二胖終於鼓起勇氣爲了向領導代表本身的上進心,主動向領導要開發任務。領導一看這小夥子這麼有上進心,因而就到任務看板裏面挑了一個業務邏輯比較簡單的任務分配給了二胖。二胖拿到這個任務屁顛屁顛的回到座位。任務比較簡單,就是經過爬蟲去爬取某些賣機票(某豬、某攜、某團等)的網站的一些機票,而後保存到數據庫。前端

同步入庫

二胖拿到任務,三下五除二就把任務完成了。java

public static void main(String[] args) throws InterruptedException {
        String mouZhuFlightPrice = getMouZhuFlightPrice();
        String mouXieFlightPrice = getMouXieFlightPrice();
        String mouTuanFlightPrice = getMouTuanFlightPrice();
        saveDb(mouZhuFlightPrice);
        saveDb(mouXieFlightPrice);
        saveDb(mouTuanFlightPrice);
    }

    /**
     * 模擬請求某豬網站 爬取機票信息
     *
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouZhuFlightPrice() throws InterruptedException {
        // 模擬請求某豬網站 爬取機票信息
        Thread.sleep(10000);
        return "獲取到某豬網站的機票信息了";
    }

    /**
     * 模擬請求某攜網站 爬取機票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouXieFlightPrice() throws InterruptedException {
        // 模擬請求某攜網站 爬取機票信息
        Thread.sleep(5000);
        return "獲取到某攜網站的機票信息了";
    }

    /**
     * 模擬請求團網站 爬取機票信息
     *
     * @return
     * @throws InterruptedException
     */
    public static String getMouTuanFlightPrice() throws InterruptedException {
        // 模擬請求某團網站 爬取機票信息
        Thread.sleep(3000);
        return "獲取到某團網站的機票信息了";
    }

    /**
     * 保存DB
     *
     * @param flightPriceList
     */
    public static void saveDb(String flightPriceList) {
            // 解析字符串 進行異步入庫
    }

此次二胖學乖了,任務完成了先去找下坐他對面的技術大拿(看他那髮際線就知道了)同事「二狗」讓二狗大拿幫忙指點一二,看看代碼是否還能有優化的地方。畢竟領導對代碼的性能、以及代碼的優雅是有要求的。領導屢次在部門的週會上提到讓咱們多看看「二狗」寫的代碼,學習下人家寫代碼的優雅、抽象、封裝等等。二狗大概的瞄了下二胖寫的代碼,提出了個小小的建議「這個代碼能夠採用多線程來優化下哦,你看某豬這個網站耗時是拿到結果須要10s,其餘的耗時都比它短,先有結果的咱們能夠先處理的,不須要等到你們都返回了再來處理的」。數據庫

輪循futureList獲取結果

幸虧二胖對多線程瞭解一點點,因而乎採用future的方式來實現。二胖使用一個List來保存每一個任務返回的Future,而後去輪詢這些Future,直到每一個Future都已完成。因爲須要先完成的任務須要先執行,且不但願出現由於排在前面的任務阻塞致使後面先完成的任務的結果沒有及時獲取的狀況,因此在調用get方式時,須要將超時時間設置爲0編程

public static void main(String[] args) {
        int taskSize = 3;
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());
        List<Future<String>> futureList = new ArrayList<>();
        futureList.add(mouZhuFlightPriceFuture);
        futureList.add(mouXieFlightPriceFuture);
        futureList.add(mouTuanFlightPriceFuture);
        // 輪詢,獲取完成任務的返回結果
        while (taskSize > 0) {
            for (Future<String> future : futureList) {
                String result = null;
                try {
                    result = future.get(0, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    taskSize--;
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    // 超時異常須要忽略,由於咱們設置了等待時間爲0,只要任務沒有完成,就會報該異常
                }
                // 任務已經完成
                if (result != null) {
                    System.out.println("result=" + result);
                    // 從future列表中刪除已經完成的任務
                    futureList.remove(future);
                    taskSize--;
                    // 此處必須break,不然會拋出併發修改異常。(也能夠經過將futureList聲明爲CopyOnWriteArrayList類型解決)
                    break; // 進行下一次while循環
                }
            }
        }
    }

上述代碼有兩個小細節須要注意下:api

  • 如採用ArrayList的話futureList刪除以後須要break進行下一次while循環,不然會產生咱們意想不到的ConcurrentModificationException異常。具體緣由可看下《ArrayList的刪除姿式你都掌握了嗎》這個文章,裏面有詳細的介紹。
  • 在捕獲了InterruptedExceptionExecutionException異常後記得 taskSize--不然就會發生死循環。若是生產發生了死循環你懂的,cpu被你打滿,程序假死等。你離被開除也不遠了。多線程

  • 上面輪詢future列表很是的複雜,並且還有不少異常須要處理,還有不少細節須要考慮,還有被開除的風險。因此這種方案也被pass了。

    自定義BlockingQueue實現

  • 上述方案被pass以後,二胖就在思考能夠借用哪一種數據來實現下先進先出的功能,貌似隊列能夠實現下這個功能。因此二胖又寫了一版採用隊列來實現的功能。
final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<String> mouZhuFlightPriceFuture = executor.submit(() -> getMouZhuFlightPrice());
        Future<String> mouXieFlightPriceFuture = executor.submit(() -> getMouXieFlightPrice());
        Future<String> mouTuanFlightPriceFuture = executor.submit(() -> getMouTuanFlightPrice());

        // 建立阻塞隊列
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        executor.execute(() -> run(mouZhuFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouXieFlightPriceFuture, blockingQueue));
        executor.execute(() -> run(mouTuanFlightPriceFuture, blockingQueue));
        // 異步保存全部機票價格
        for (int i = 0; i < 3; i++) {
            String result = blockingQueue.take();
            System.out.println(result);
            saveDb(result);
        }
    }

    private static void run(Future<String> flightPriceFuture, BlockingQueue<String> blockingQueue) {
        try {
            blockingQueue.put(flightPriceFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
  • 此次比上個版本好多了,代碼也簡潔多了。不過按理說這種需求應該是你們常常遇到的,應該不須要本身來實現把,JAVA這麼貼心的語言應該會有api能夠直接拿來用吧。

    CompletionService實現

  • 二胖如今畢竟也是對代碼的簡潔性有追求的人了。因而乎二胖去翻翻本身躺在書櫃裏吃灰的併發相關的書籍,看看是否有解決方案。
    在這裏插入圖片描述
    終於皇天不負有心人在二胖快要放棄的時候忽然發現了新大陸。
    《Java併發編程實戰》一書6.3.5CompletionService:ExecutorBlockingQueue,有這樣一段話:

若是向Executor提交了一組計算任務,而且但願在計算完成後得到結果,那麼能夠保留與每一個任務關聯的Future,而後反覆使用get方法,同時將參數timeout指定爲0,從而經過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionService。併發

final static ExecutorService executor = new ThreadPoolExecutor(6, 6,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletionService completionService = new ExecutorCompletionService(executor);
        completionService.submit(() -> getMouZhuFlightPrice());
        completionService.submit(() -> getMouXieFlightPrice());
        completionService.submit(() -> getMouTuanFlightPrice());
        for (int i = 0; i < 3; i++) {
            String result = (String)completionService.take().get();
            System.out.println(result);
            saveDb(result);
        }
    }

當咱們使用了CompletionService不用遍歷future列表,也不須要去自定義隊列了,代碼變得簡潔了。下面咱們就來分析下CompletionService實現的原理吧。異步

CompletionService 介紹

  • 咱們能夠先看下JDK源碼中CompletionServicejavadoc說明吧async

    /**
    * A service that decouples the production of new asynchronous tasks
    * from the consumption of the results of completed tasks.  Producers
    * {@code submit} tasks for execution. Consumers {@code take}
    * completed tasks and process their results in the order they
    * complete.

    大概意思是CompletionService實現了生產者提交任務和消費者獲取結果的解耦,生產者和消費者都不用關心任務的完成順序,由CompletionService來保證,消費者必定是按照任務完成的前後順序來獲取執行結果。ide

    成員變量

    既然須要按照任務的完成順序獲取結果,那內部應該也是經過隊列來實現的吧。打開源碼咱們能夠看到,裏面有三個成員變量

    public class ExecutorCompletionService<V> implements CompletionService<V> {
    // 執行task的線程池,建立CompletionService必須指定;
    private final Executor executor;
    //主要用於建立待執行task;
    private final AbstractExecutorService aes;
    //存儲已完成狀態的task,默認是基於鏈表結構的阻塞隊列LinkedBlockingQueue。       
    private final BlockingQueue<Future<V>> completionQueue;

    任務提交

    ExecutorCompletionService任務的提交和執行都是委託給Executor來完成。當提交某個任務時,該任務首先將被包裝爲一個QueueingFuture

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    任務完成後什麼時候進入隊列

    在這裏插入圖片描述從源碼能夠看出,QueueingFutureFutureTask的子類,實現了done方法,在task執行完成以後將當前task添加到completionQueue,將返回結果加入到阻塞隊列中,加入的順序就是任務完成的前後順序。done方法的具體調用在FutureTaskfinishCompletion方法。

    獲取已完成任務

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    
    public Future<V> poll() {
        return completionQueue.poll();
    }
    
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

    takepoll都是調用BlockingQueue提供的方法。

  • take() 獲取任務阻塞,直到能夠拿到任務爲止。
  • poll() 獲取任務不阻塞,若是沒有獲取到任務直接返回null
  • poll(long timeout, TimeUnit unit) 帶超時時間等待的獲取任務方法(通常推薦使用這種

    總結

  • CompletionService 把線程池 Executor 和阻塞隊列 BlockingQueue融合在一塊兒,可以讓批異步任務的管理更簡單,將生產者提交任務和消費者獲取結果的解耦。
  • CompletionService 可以讓異步任務的執行結果有序化,先執行完的先進入阻塞隊列,利用這個特性,咱們能夠輕鬆實現後續處理的有序性,避免無謂的等待。

結束

相關文章
相關標籤/搜索