CompletableFuture是高級的多線程功能,支持自定義線程池和系統默認的線程池,是多線程,高併發裏面,常常須要用到的比直接建立線程,要簡單易用的方法。bash
CompletableFuture主要是用於異步調用,內部封裝了線程池,能夠將請求或者處理過程,進行異步處理。建立線程有3種方式,直接繼承Thread、實現Runnable接口、實現Callable接口。以生活中的一個例子來講明異步行爲:電飯煲蒸飯。多線程
之前呀,都是大鍋飯,放上米,放上水,而後須要不斷地加柴火,人要看着火,具體何時煮熟,也得偶爾打開看看,看看開沒開鍋,煮沒煮熟。這種就是沒有任何通知方式,沒有返回值的Runnable,只管煮飯,煮沒煮熟須要本身判斷。併發
一個老闆發現了這個商機,說能不能作一個東西,不用人一直看着,自動就能把米飯作好,因此電飯煲就出現了。 初代電飯煲的出現,算是解放了人力,不再用看着火了,方便了不少,本身能夠去作點其餘的事情,熱個牛奶,剪個雞蛋什麼的,可是至於飯何時熟,還得本身隔一段時間就得過去看一看。這就是Future的方式,雖然任務是異步執行的,可是要想得到這個結果,還得須要本身取。app
時間繼續推動,這個老闆又有了新的想法,每隔一段時間,看看飯熟沒熟仍是有點浪費我看電視的時間,這個電飯煲能不能作好了,告訴我呢,這樣我就直接來吃就好了。所以就有了這種能夠預定、能夠定時、能夠保溫的高級電飯煲。這個就對應着CompletableFuture,全部事情都是能夠自動完成,便可以在完成以後,回調通知,也能夠本身去等待。異步
示例代碼:
public static void main(String[] args){
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("電飯煲開始作飯");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "白米飯";
}).thenAccept(result -> {
System.out.println("開始吃米飯");
});
System.out.println("我先去搞點牛奶和雞蛋");
future.join();
}
結果輸出:
電飯煲開始作飯
我先去搞點牛奶和雞蛋
開始吃米飯
複製代碼
這樣就能夠一邊等待米飯煮熟,一邊去作其餘事情。ide
CompletableFuture提供了方法大約有50多個,單純一個個記憶,是很麻煩的,所以將其劃分爲如下幾類:高併發
上面的方法不少,咱們不必死記硬背,按照以下規律,會方便不少,記憶規則:測試
記住上面幾條,基本上就能夠記住大部分的方法,剩下的其餘方法,就能夠單獨記憶了。ui
建立CompletableFuture,其實就是將咱們要煮的米飯,委託給電飯煲;要煮米飯,咱們要準備這麼幾件事情,其一咱們要制定製做米飯的方式,其二,咱們要指定電飯煲。除此以外,咱們也能夠委託其餘的事情,最後能夠經過all或者any進行組合。this
// 異步任務,無返回值,採用內部的forkjoin線程池
CompletableFuture c1 = CompletableFuture
.runAsync(()->{System.out.println("打開開關,開始製做,就不用管了")});
// 異步任務,無返回值,使用自定義的線程池
CompletableFuture c11 = CompletableFuture
.runAsync(()->{System.out.println("打開開關,開始製做,就不用管了")},newSingleThreadExecutor());
// 異步任務,有返回值,使用內部默認的線程池
CompletableFuture<String> c2 = CompletableFuture
.supplyAsync(()->{System.out.println("清洗米飯");return "乾淨的米飯";});
// 只要有一個完成,則完成,有一個拋出異常,則攜帶異常
CompletableFuture.anyOf(c1,c2);
// 必須等待全部的future所有完成才能夠
CompletableFuture.allOf(c1,c2);
複製代碼
經常使用的是下面的這幾種
// 不拋出異常,阻塞的等待
future.join()
// 有異常則拋出異常,阻塞的等待,無限等待
future.get()
// 有異常則拋出異常,最長等待1個小時,一個小時以後,若是尚未數據,則異常。
future.get(1,TimeUnit.Hours)
複製代碼
3種方式:
// 完成
future.complete("米飯");
// 異常
future.completeExceptionally();
// 取消,參數並無實際意義,沒任何卵用。
future.cancel(false);
複製代碼
接續方式有不少種,能夠總結爲一下三類:
CompletableFuture future = CompletableFuture.supplyAsync(()->{
System.out.println("投放和清洗製做米飯的材料");
return "乾淨的沒有新冠病毒的大米";
}).thenAcceptAsync(result->{
System.out.println("通電,設定模式,開始煮米飯");
}).thenRunAsync(()->{
System.out.println("米飯作好了,能夠吃了");
})
複製代碼
假如蒸米飯和、熱牛奶、炒菜等已是3個不一樣的CompletableFuture,可使用接續方式2,將兩個或者多個CompletableFuture組合在一塊兒使用。
CompletableFuture rice = CompletableFuture.supplyAsync(()->{
System.out.println("開始製做米飯,並得到煮熟的米飯");
return "煮熟的米飯";
})
//煮米飯的同時呢,我又作了牛奶
CompletableFuture mike = CompletableFuture.supplyAsync(()->{
System.out.println("開始熱牛奶,並得到加熱的牛奶");
return "加熱的牛奶";
});
// 我想兩個都好了,才吃早飯,thenCombineAsync有入參,有返回值
mike.thenCombineAsync(rice,(m,r)->{
System.out.println("我收穫了早飯:"+m+","+r);
return m+r;
})
// 有入參,無返回值
mike.thenAcceptBothAsync(rice,(m,r)->{
System.out.println("我收穫了早飯:"+m+","+r);
});
// 無入參,入參會之
mike.runAfterBothAsync(rice,()->{
System.out.println("我收穫了早飯");
});
// 或者直接鏈接兩個CompletableFuture
rice.thenComposeAsync(r->CompletableFuture.supplyAsync(()->{
System.out.println("開始煮牛奶");
System.out.println("同時開始煮米飯");
return "mike";
}))
複製代碼
若是咱們只想作結果處理,也沒有其餘的接續動做,而且咱們想要判斷異常的狀況,那麼能夠用接續方式3
whenCompleteAsync:處理完成或異常,無返回值
handleAsync:處理完成或異常,有返回值
CompletableFuture.supplyAsync(()->{
System.out.println("開始蒸米飯");
return "煮熟的米飯";
}).whenCompleteAsync((rich,exception)->{
if (exception!=null){
System.out.println("電飯煲壞了,米飯沒作熟");
}else{
System.out.println("米飯熟了,能夠吃了");
}
})
// 有返回值
CompletableFuture.supplyAsync(()->{
System.out.println("開始蒸米飯");
return "煮熟的米飯";
}).handleAsync((rich,exception)->{
if (exception!=null){
System.out.println("電飯煲壞了,米飯沒作熟");
}else{
System.out.println("米飯熟了,能夠吃了");
}
return "準備冷一冷再吃米飯";
})
// 異常處理
CompletableFuture.supplyAsync(()->{
System.out.println("開始蒸米飯");
return "煮熟的米飯";
}).handleAsync((rich,exception)->{
if (exception!=null){
System.out.println("電飯煲壞了,米飯沒作熟");
}else{
System.out.println("米飯熟了,能夠吃了");
}
return "準備冷一冷再吃米飯";
}).exceptionally((exception)->{
// 前置動做必須的是一個又返回值的操做,不能是那種返回值的那種
return "";
});
複製代碼
CompletableFuture用了以後,才以爲這個東西,確實好用一些,不經意間就作成了異步處理,並且支持自定義的線程池,若是結合stream,能夠輕鬆地實現多線程併發處理。
List<CompletableFuture<YoutubeVideoEntity>> futures = subVideosList.stream()
.map(item ->
CompletableFuture.supplyAsync(() -> this.getRetry(item)
, ThreadPoolHolder.BG_CRAWLER_POOL)
).collect(Collectors.toList());
List<YoutubeVideoEntity> videoEntities = futures.stream().map(CompletableFuture::join)
.filter(item -> item != null && item.getVideoId() != null).collect(Collectors.toList());
複製代碼