呵呵,面試官問我知不知道CompletionService?

這是why的第 61 篇原創文章面試

荒腔走板

你們好,我是 why,歡迎來到我連續周更優質原創文章的第 61 篇。編程

老規矩,先荒腔走板聊聊其餘的。服務器

上面這圖是上週六,成都叄缺壹演唱會的現場。正在唱歌的這個男人叫作李志。去年他由於某些緣由被 404 了。上週六他忽然出如今成都,忽然宣告着解除封印。框架

我知道這個消息是由於週六的晚上我正在看《樂隊的夏天》。恰好是達達樂隊正在唱《南方》,而後上面有一個彈幕:李志在成都復出啦!異步

因而我打開了微博,搜索:李志成都。果真看到了他的演出視頻。視頻裏面唱的是《關於鄭州我知道的很少》。異步編程

只是李志把歌詞裏面的鄭州都改爲了「成都」。一邊是達達樂隊唱着《南方》,一邊是李志的《關於「成都」我知道的很少》。那一刻,我把這個消息分享給坐在旁邊的女友,我張了張口,居然忽然一下有點說不出話來,只好把手機在她面前晃了一晃。編碼

記得我以前在網抑雲聽李志的《梵高先生》的時候,無心間看了一下下面的評論,有一個評論的ID叫作放羊的壯年,他說:spa

今年五月十六日被確診肺癌晚期,醫生說還有三到五個月,然而到今天我還呼吸着這世界的空氣,人生大起大落每每都是本身體會,這算是一種孤獨嗎?也許有一天我女兒也會愛上這首歌,那是否是比我更加孤獨?怎麼去想…線程

他的留言日期是2015年12月30日。3d

我點進他的主頁查看,他的頭像是一個很可愛的小女孩,扎着兩個小辮子。可能就是他的女兒吧,我看了他的聽歌排行,點了最近一週,顯示沒有聽歌排行數據。看了他的僅有的一條動態,裏面有4000多個評論,

熱評第一是這樣說的:多少人和我同樣看到梵高先生下的評論而後點進來點進最近一週聽歌爲空白心頭一涼,不知道你如今還好很差,衷心祝願你的女兒幸福。

他的評論裏面充滿了陌生人,言語之間充滿了關懷和愛,有人和他像朋友同樣留言聊天,有人說他換了播放器,有人說但願你是騙咱們的...

評論看着看着就以爲很溫暖。

隔了幾天以後我也去留言了:咱們生來就是孤獨,可是人間溫暖。

好了,說迴文章。

再談Future

上週不是寫了《笑了,面試官問我知不知道異步編程的Future》這一篇文章聊 Future 嘛。

而後有讀者就給我留言了:why 哥,你都寫到 Future 了,應該再寫一下 CompletionService 的。上次面試就被面試官追問了。

我笑着說:哎呀,實在是沒有時間寫了。文章已經很長了,再把這個東西補充上去,更長了,沒人看的。

讀者說:沒事,我就是一個小建議。

好的,接受建議。本文就來聊聊 CompletionService 這個東西。

在聊它以前,咱們先回顧一下 Future 的用法。

我先問問你,當你往線程池裏面提交了一組計算任務之後,你想要得到返回值。

你應該用 Executor 的什麼提交方法?這個提交方法的什麼重載類型?

什麼?你答不上來?呸,你個渣男,上週白嫖完了就忘了?

上週的文章裏面寫了啊:

用 submit 的任務類型爲 Callable 的或者任務類型爲 Runable,還能夠再傳一個返回值的:

因爲是一組計算任務,你想拿到返回值去搞事情。這個返回值就被封裝在 Future 裏面。

怎麼獲取呢?

調用 Future 的 get 方法,有不帶超時時間的無限等待的舔狗類型的 get,也有帶超時時間、到點就放棄的渣男類型的 get:

來一塊兒看個例子吧:

public class JDKThreadPoolExecutorTest {  
  
    public static void main(String[] args) throws Exception {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        ArrayList<Future<Integer>> list = new ArrayList<>();  
        Future<Integer> future_15 = executorService.submit(() -> {  
            TimeUnit.SECONDS.sleep(15);  
            System.out.println("執行時長爲15s的執行完成。");  
            return 15;  
        });  
        list.add(future_15);  
          
        Future<Integer> future_5 = executorService.submit(() -> {  
            TimeUnit.SECONDS.sleep(5);  
            System.out.println("執行時長爲5s的執行完成。");  
            return 5;  
        });  
        list.add(future_5);  
          
        Future<Integer> future_10 = executorService.submit(() -> {  
            TimeUnit.SECONDS.sleep(10);  
            System.out.println("執行時長爲10s的執行完成。");  
            return 10;  
        });  
        list.add(future_10);  
          
        System.out.println("開始準備獲取結果");  
        for (Future<Integer> future : list) {  
            System.out.println("future.get() = " + future.get());  
        }  
        Thread.currentThread().join();  
    }  
}

如今有三個任務,執行時間分別是 15s/10s/5s 。經過 JDK 線程池的 submit 方法提交了這三個 Callable 類型的任務。

你先眼神編譯一下,內心輸出一下,你想這個代碼的輸出結果是什麼。

首先主線程把三個任務提交到線程池裏面去,把對應返回的 Future 放到 List 裏面存起來,而後執行「開始準備獲取結果」的輸出語句。

接着進入 for 循環,在循環裏面執行 future.get() 操做,阻塞等待。

看看你內心想的輸出結果是否是這樣的:

從這個輸出結果裏面,咱們能夠看出問題了。很明顯的木桶效應。

三個異步任務,耗時最長的最早執行,因此最早進入 list,所以當在循環中獲取這個任務結果的時候 get 操做會一直阻塞,即便執行時間爲 5s/10s 的任務已經執行完成。

好的,舉個例子。想象一個場景:

假設你是一個海王,你擁有衆多普通女性朋友。你同時邀約了三位女性朋友一塊兒吃飯。分別給她們說:你先化妝吧,好了給我說一聲,我開着個人蘭博基尼來接你。

小紅化妝要 2 小時。小花化妝要 1小時。小媛化妝要 30 分鐘。

因爲你最早給小紅說的,你就一直在小紅家門口等小紅化妝完成。當小紅化妝完成後,你接到車上,其餘兩位朋友早就準備好了,在家乾巴巴的等着你來接她。

這不是一個合格的海王應該有的樣子。

這就是 future 在這種場景下的侷限性。

根據上面的場景編碼可得(代碼都是直接複製粘貼就能夠用的,建議你拿出來跑一下):

public class ExecutorCompletionServiceTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ExecutorCompletionService<String> completionService =
                new ExecutorCompletionService<>(executorService);
        System.out.println("約幾個妹子一塊兒吃個飯吧。");
        completionService.submit(() -> {
            System.out.println("小紅:好的,哥哥。我化妝要2個小時。等一下哦。");
            TimeUnit.SECONDS.sleep(15);
            System.out.println("小紅:我2個小時準時化好了,哥哥來接我吧。");
            return "小紅化完了。";
        });
        completionService.submit(() -> {
            System.out.println("小媛:好的,哥哥。我化妝要30分鐘。等一下哦。");
            TimeUnit.SECONDS.sleep(5);
            System.out.println("小媛:我30分鐘準時化好了,哥哥來接我吧。");
            return "小媛化完了。";
        });
        completionService.submit(() -> {
            System.out.println("小花:好的,哥哥。我化妝要1個小時。等一下哦。");
            TimeUnit.SECONDS.sleep(10);
            System.out.println("小花:我1個小時準時化好了,哥哥來接我吧。");
            return "小花化完了。";
        });
        TimeUnit.SECONDS.sleep(1);
        System.out.println("都通知完,等着吧。");
        //循環3次是由於上面提交了3個異步任務
        for (int i = 0; i < 3; i++) {
            String returnStr = completionService.take().get();
            System.out.println(returnStr + "我去接她");
        }
        Thread.currentThread().join();
    }
}

輸出結果以下:

說好都是同樣的普通朋友的,爲何你恰恰要一直等化妝時間最長的小紅?爲何不誰動做快,就先接誰?

你看你這樣操做,讓小媛、小花怎麼想?只能說:你是一個好人了。

什麼?你個海王還僞裝問我「什麼是海王」?

CompletionService拯救海王

仍是上面的場景,當咱們引入了 CompletionService 後就顯得不同了。

先直接看用法:

`ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
`

用起來很是的方便,只須要用 ExecutorCompletionService 把線程池包起來。

而後提交任務的時候用 competitionService 的 submit 方法。代碼以下:

public class JDKThreadPoolExecutorTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ArrayList<Future<String>> list = new ArrayList<>();
        System.out.println("約幾個妹子一塊兒吃個飯吧。");
        Future<String> future_15 = executorService.submit(() -> {
            System.out.println("小紅:好的,哥哥。我化妝要2個小時。等一下哦。");
            TimeUnit.SECONDS.sleep(15);
            System.out.println("小紅:我2個小時準時化好了,哥哥來接我吧。");
            return "小紅化完了。";
        });
        list.add(future_15);
        Future<String> future_5 = executorService.submit(() -> {
            System.out.println("小媛:好的,哥哥。我化妝要30分鐘。等一下哦。");
            TimeUnit.SECONDS.sleep(5);
            System.out.println("小媛:我30分鐘準時化好了,哥哥來接我吧。");
            return "小媛化完了。";
        });
        list.add(future_5);

        Future<String> future_10 = executorService.submit(() -> {
            System.out.println("小花:好的,哥哥。我化妝要1個小時。等一下哦。");
            TimeUnit.SECONDS.sleep(10);
            System.out.println("小花:我1個小時準時化好了,哥哥來接我吧。");
            return "小花化完了。";
        });
        list.add(future_10);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("都通知完,等着吧。");
        for (Future<String> future : list) {
            System.out.println(future.get()+"我去接她。");
        }
        Thread.currentThread().join();
    }
}

你先眼神編譯一下,內心輸出一下...

算了,別編譯了,直接帶你們看結果吧,我已經火燒眉毛了:

誰先化完妝,就先去接誰。

寫到這裏,看到這個輸出結果的時候我不由鼓起掌來。

真正的海王應該是一個時間管理大師。

先對比一下輸出結果:

而後對比一下兩個版本代碼的差別:

變化不大,甚至說微乎其微。

執行 submit 方法的對象變成了 ExecutorCompletionService 。

獲取任務結果的方法變成了:

`String returnStr = completionService.take().get();
`

先不看原理。你就細細的品這個獲取結果的方法。

completionService.take() 了個什麼玩意出來,而後調用了 get 方法。

根據這個 get ,直覺就告訴我 take 出來的確定是一個 future 對象。而這個 future 對象確定是放在一個隊列裏面的。

下一小節,帶你們去證明一下。

CompletionService原理

首先 CompletionService 是一個接口:

ExecutorCompletionService 是這個接口的實現類:

看一下 ExecutorCompletionService 的構造方法:

能夠看到是須要傳入一個線程池對象的。隊列默認使用的是 LinkedBlockingQueue 。

固然,咱們也能夠指定使用什麼隊列:

而後再看一下它的任務提交方式:

因爲用 ExecutorCompletionService 主要是爲了優雅的處理返回值。因此它支持兩種 submit 類型的提交,都是有返回值的。

上面時間管理大師版本海王使用的就是 Callable 類型的方法。

咱們先對比一下 Executor 直接提交和 ExecutorCompletionService 提交的差別:

差別就在 execute 方法裏面。

ExecutorCompletionService 提交任務的時候是這樣的:

`executor.execute(new QueueingFuture(f));
`

差別就在 execute 方法裏面的 Runable:

看一下這個 QueueingFuture 是個什麼東西:

祕密基本上就在這個裏面了。

QueueingFuture 繼承自 FutureTask。重寫了 done 方法,而後把 task 放到 queue 裏面。

這個方法的含義就是當任務執行完成後,就會被放到隊列裏面去了。也就是說隊列裏面的 task 都是已經 done 了的 task,而這個 task 就是一個個 future。

若是調用 queue 的 task 方法,就是阻塞等待。等到的必定是就緒了的 future,調用 get 就能立馬得到結果。

你說這一套操做是在幹啥?

這不就是在作解耦嗎?

以前你提交任務後還須要直接關心每一個任務返回的 future。如今 CompletionService 幫你對這些 future 進行了跟蹤。

完成了調用者和 future 之間的解耦。

原理分析完了,說一個須要注意的地方。

當你的使用場景是不關心返回值的時候千萬不要閒的蛋疼的用 CompletionService 去提交任務。

爲何?

由於前面說了,裏面有個隊列。而當你不關心返回值的時候也就是不會去處理這個隊列,致使這個隊列裏面的對象堆積的愈來愈多。

最後,炸了,OOM了。

在開源框架中的應用

前面說了 CompletionService 是一個接口。除了 JDK 的 ExecutorCompletionService 實現了這個接口。

在開源框架裏面也有相應的實現。好比 Redisson:

你去看這個實現,和 ExecutorCompletionService 思想是如出一轍的,可是有些許的不同。

它把 future 放到隊列面的時候,沒有重寫 done 方法,而是使用了響應式編程的 onComplete:

而 CompletionService 的思想核心是:Executor 加 Queue 。

這個思想,讓我想起了在 Dubbo 中看到過的一段代碼:

我曾經在《Dubbo Cluster集羣那點你不知道的事》這篇文章中提到過這個類。

這個類的 doInvoker 方法中的核心邏輯以下:

首先標號爲 ① 的地方定義了一個隊列。

標號爲 ② 的地方在循環體中提交異步任務。有幾個服務提供者就有幾回循環。

子線程在標號爲 ③ 的地方把返回結果放到隊列裏面。

只要一放進去,就能被標號爲 ④ 的地方獲取到(指定時間內),而後程序當即返回。

這樣就能實現並行調用多個服務器,只要有一個服務器返回就當即返回的功能。

我以爲這個思想和 CompletionService 的思想有一點點的相通之處的。

咱們要學 CompletionService ,也要學它的思想。

最後說一句(求關注)

嗯,寫完了。這周是瘋狂連軸旋轉的一週,我在公司裏面走動,都不是走路,都是在一路小跑的,停更一週的想法從週一就出現了無數次。最後昨天加班加點的把公司安排的「政治編程任務」完成以後,開始快馬加鞭的準備這篇文章,我仍是懟出來了,畢竟我也是一個時間管理大師。

下週就是連續周更文章一年整啦。過去的 365 天,過去的 52 周,我保持了一週至少輸出一篇優質原創文章的節奏。雖然沒有多少粉絲(一年了都沒過一萬關注,真的是沒臉見人),可是立下的 flag 算是超額完成了。

堅持不住的時候再堅持一下,確實是一種難能難得的品質。

好啦,感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注。

我是 why,一個被代碼耽誤的文學創做者,不是大佬,可是喜歡分享,是一個又暖又有料的四川好男人。

相關文章
相關標籤/搜索