目錄html
瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 -17【 博客園 總入口 】java
源碼IDEA工程獲取連接:Java 聊天室 實戰 源碼面試
你們好,我是做者尼恩。 目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】。正在開始 高併發、億級流程的 IM 聊天程序 學習和實戰,此文是:編程
瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 -17多線程
前面,已經完成一個高性能的 Java 聊天程序的四件大事:架構
完成了協議選型,選擇了性能更佳的 Protobuf協議。具體的文章爲: Netty+Protobuf 整合一:實戰案例,帶源碼併發
介紹了 通信消息數據包的幾條設計準則。具體的文章爲: Netty +Protobuf 整合二:protobuf 消息通信協議設計的幾個準則框架
解決了一個很是基礎的問題,這就是通信的 粘包和半包問題。具體的文章爲:Netty 粘包/半包 全解 | 史上最全解讀異步
前一篇文件,已經完成了 系統三大組成模塊的組成介紹。 具體的文章爲:Netty聊天程序(實戰一):從0開始實戰100w級流量應用分佈式
在設計客戶端以前,發現一個很是重要的基礎知識點,沒有講到。這個知識點就是異步回調。
因爲異步回調使用頻率是如此之高,因此不得不停下來,詳細介紹一下。
隨着移動互聯網的蓬勃發展,業務架構也隨之變得錯綜複雜,業務系統愈來愈多。打個簡單的比方:以前一個業務只須要調取一次第三方接口,現在,該業務需調取多個甚至N個不一樣的第三方接口,獲取N種上遊數據。一般,咱們處理方法是異步去調取這些接口。
問題就來了,如何獲取處理異步調用的結果呢 ?
或者說,異步線程執行完成後,如何與發起線程交互呢?
這就涉及到線程的異步回調問題,這也是大流量高併發不可迴避的問題。
首先,瞭解下同步、異步、阻塞、非阻塞、回調等相關概念;
其次,簡單介紹java future和guava future相關技術,並經過示例代碼進一步對其進行理解;
最後,對java future和guava future進行比較。
寫到這裏,尼恩就想到了在中學8年級的語文課。在課本中,有一篇華羅庚的課文——《統籌方法》,課文介紹的是統籌方法,該方法的主要目的是合理安排工做流程中的各道工序。
裏邊舉了一個泡茶的例子。列出了三種泡茶的工序模型。在文中的三種工序流程中,有多重排列組合的模式。
工序模型一:順序模式
洗好水壺,灌上涼水,放在火上;
等水開,洗茶壺、洗茶杯;
洗完茶杯後,泡茶喝。
工序模型二:併發模式
洗好水壺,灌上涼水,放在火上;
在等待水開的時間裏,洗茶壺、洗茶杯;
等水開了,泡茶喝。
《統籌方法》這篇文章中,忽略了一個很很重要的問題: 就是等水開是一段數量級最大的時間,這個時間,遠遠超過了準備水、準備茶杯的時間。
從實際出發,爲了避免浪費等水開時間,尼恩在這裏增長一個動做 —— 讀書。而且,當水燒好後,通知做者中止讀書,去泡茶喝。這就至關於回調模式。
工序模式三:回調模式
洗好水壺,灌上涼水,放在火上;
在等待水開的時間裏,洗茶壺、洗茶杯;
在等水開的時間裏,讀書;
水開了,通知做者泡茶喝。
對比起來:順序模式效率最低,回調模式效率最高。
以上三種模式泡茶喝的方式,使用Java,如何實現呢?
先來看一些基本的概念吧!
前面只是一個例子,對併發的主要模式進行形象的說明。
下面正式來講下經常使用的幾個和併發相關的概念。
一:同步
所謂同步,就是在發出一個功能調用時,在沒有獲得結果以前,該調用就不返回。也就是必須一件一件事作,等前一件作完了才能作下一件事。
單線程模式,就是絕對同步的。
二: 異步
異步首先必須是多線程模式。是指當前線程,向其餘的異步線程發出調用指令。當前線程和異步線程,邏輯上同時執行。
三:阻塞
在異步的場景下,當前線程阻塞住,等待異步線程的執行結果。阻塞是指線程進入非可執行狀態,在這個狀態下,cpu不會給線程分配時間片,即線程暫停運行。
阻塞模式是效率比較低的,若是阻塞嚴重的話,至關於又回到了同步的時代。
四:非阻塞
非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前,當前線程不會阻塞住,而會繼續向下執行。
回調就是一種非阻塞的異步模式。併發線程經過回調,能夠將結果返回給發起線程。除了回調,還有其餘的非阻塞異步模式,好比消息通信、信號量等等。
阻塞模式的泡茶模型,對應到前面的第二種泡茶喝的工序模型。
在阻塞模式泡茶喝的模型中,有三條線程,他們分別是:
線程一:燒水線程
洗好水壺,灌上涼水,放在火上;
線程二:清洗線程
洗茶壺、洗茶杯;
線程三:主線程
分別啓動燒水線程、清洗線程。等水開了,等水杯洗好了,而後泡茶喝。
具體以下圖:
前面提到,阻塞模式的效率不是最高的。
更高效率的是回調模式。主線程在等待的時間了,不是死等,而是去幹讀書的活兒。等其餘兩條線程完成後,經過回調方式,去完成泡茶的動做。
在回調模式泡茶喝的模型中,仍是三條線程,他們的工做稍微有些變更:
線程一:燒水線程
洗好水壺,灌上涼水,放在火上;燒好水後,去執行泡茶回調。
線程二:清洗線程
洗茶壺、洗茶杯;清洗完成後,也去執行一下泡茶的動做。
線程三:主線程
分別啓動燒水線程、清洗線程。而後去讀書。
具體以下圖:
嚴格來講,上圖是經不起推敲的。
爲啥呢? 那個泡茶喝回調方法,在執行的流程上,不屬於主線程在執行。只是在業務邏輯上,泡茶喝這個動做與主線程上的其餘動做,關聯性更強。
上圖,更好的理解方式是,儘可能站在業務流程的角度去理解。
回調不是惟一的非阻塞方式。
還有線程間通訊、信號量等等,不少的非阻塞方式。可是回調倒是一種最好用的、也是開發中用的最多的線程間非阻塞的交互方式。
下面,從最原始的阻塞模式講起,起底整個異步回調模式。
Java中,線程有一個join操做,也叫線程的合併。
join操做的做用,就是完成異步阻塞的工做——阻塞當前的線程,直到異步的併發線程的執行完成。
若是線程A的執行過程當中,經過B.join操做,合併B線程,叫作線程的合併。合併的重要特色之一是,線程A進入阻塞模式,直到B線程執行完成。
爲了方便表達,模擬一下包工頭的甲方和乙方。
將發起合併的線程A叫作甲方線程,被髮起的線程B爲乙方線程。
簡單的說,線程合併就是——甲方等待乙方執行完成。換句話說,甲方將乙方線程合併到甲方線程。
在泡茶喝的例子中,主線程經過join操做,等待燒水線程和清洗線程。這就是一種異步阻塞。
具體以下圖:
先看實例,再看方法的詳細介紹。
泡茶喝的異步阻塞版本,實現以下:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; /** * Created by 尼恩 at 瘋狂創客圈 */ public class JoinDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } static class HotWarterThread extends Thread { public HotWarterThread() { super("** 燒水-Thread"); } public void run() { try { Print.tcfo("洗好水壺"); Print.tcfo("灌上涼水"); Print.tcfo("放在火上"); //線程睡眠一段時間,表明燒水中 Thread.sleep(SLEEP_GAP); Print.tcfo("水開了"); } catch (InterruptedException e) { Print.tcfo(" 發生異常被中斷."); } Print.tcfo(" 運行結束."); } } static class WashThread extends Thread { public WashThread() { super("$$ 清洗-Thread"); } public void run() { try { Print.tcfo("洗茶壺"); Print.tcfo("洗茶杯"); Print.tcfo("拿茶葉"); //線程睡眠一段時間,表明清洗中 Thread.sleep(SLEEP_GAP); Print.tcfo("洗完了"); } catch (InterruptedException e) { Print.tcfo(" 發生異常被中斷."); } Print.tcfo(" 運行結束."); } } public static void main(String args[]) { Thread hThread = new HotWarterThread(); Thread wThread = new WashThread(); hThread.start(); wThread.start(); try { // 合併燒水-線程 hThread.join(); // 合併清洗-線程 wThread.join(); Thread.currentThread().setName("主線程"); Print.tcfo("泡茶喝"); } catch (InterruptedException e) { Print.tcfo(getCurThreadName() + "發生異常被中斷."); } Print.tcfo(getCurThreadName() + " 運行結束."); } }
演示程序中有三條線程:
一條是主線程main;
一條是燒水線程「hThread」;
一條是清洗線程「wThread」;
main線程,調用了hThread.join()實例方法,合併燒水線程,也調用了 wThread.join()實例方法,合併清洗線程。
另外說明一下:hThread是這裏的燒水線程實例的句柄,"** 燒水-Thread"是燒水線程實例的線程名稱,二者不能混淆。
join的方法應用場景:異步阻塞場景。
具體來講:甲方(發起線程)的調用乙方(被髮起線程)的join方法,等待乙方執行完成;若是乙方沒有完成,甲方阻塞。
join是Thread類的一個實例方法,使用的方式大體以下:
// 合併燒水-線程 hThread.join(); // 合併清洗-線程 wThread.join();
實際上,join方法是有三個重載版本:
(1)void join(): 等待乙方線程執行結束,甲方線程重啓執行。
(2)void join(long millis): 等待乙方線程執行一段時間,最長等待時間爲 millis 毫秒。超過millis 毫秒後,不論乙方是否結束,甲方線程重啓執行。
(3)void join(long millis, int nanos): 等待乙方線程執行一段時間,最長等待時間爲 millis 毫秒,加nanos 納秒。超過期間後,不論乙方是否結束,甲方線程重啓執行。
強調一下容易混淆的幾點:
(1)join方法是實例方法,須要使用線程句柄去調用,如thread.join();
(2)執行到join代碼的時候,不是thread所指向的線程阻塞,而是當前線程阻塞;
(3)thread線程表明的是被合併線程(乙方),當前線程阻塞線程(甲方)。當前線程讓出CPU,進入等待狀態。
(4)只有等到thread線程執行完成,或者超時,當前線程才能啓動執行。
join合併有一個很大的問題,就是沒有返回值。
若是燒水線程的水有問題,或者燒水壺壞了,mian線程是沒有辦法知道的。
若是清洗線程的茶杯有問題,清洗不來了,mian線程是沒有辦法知道的。
形象的說,join線程就是一個悶葫蘆。
仍是異步阻塞,可是須要得到結果,怎麼辦呢?
可使用java 的FutureTask 系列類。
FutureTask相關的類型,處於java.util.concurrent包中,不止一個類,是一個系列。同時,這也是Java語言在1.5 版本以後提供了一種的新的多線程使用方法。
咱們知道,異步線程的一個重要接口是Runnable,這裏執行異步線程的業務代碼。可是,Runnable的run方法有一個問題,它是沒有返回的。
所以,Runnable不能用在須要有異步返回值的異步場景。
Java語言在1.5 版本以後從新定義了一個新的、相似Runnable的接口,Callable接口,將run方法改成了call方法,而且帶上了返回值。
Callable的代碼以下:
package java.util.concurrent; @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Callable接口位於java.util.concurrent包中,Callable接口是一個泛型接口。也是一個「函數式接口」。惟一的抽象方法call有返回值,返回值類型爲泛型形參類型。call抽象方法還有一個Exception的異常聲明,允許方法的實現版本內部的異常不通過捕獲。
Callable接口相似於Runnable。不一樣的是,Runnable的惟一抽象方法run沒有返回值,也沒有強制審查異常的異常聲明。比較而言,Callable接口的功能更強大一些。
有一個異想天開的問題:
做爲新版的Callable接口實例,可否做爲Thread線程實例的target來使用呢?
答案是不能。
Callable接口與Runnable接口之間沒有任何的繼承關係,並且兩者惟一方法在的名字上也不一樣。Callable接口實例沒有辦法做爲Thread線程實例的target來使用。
咱們知道,java裏邊的線程類型,就是Thread。Callable須要異步執行,就須要和Thread創建聯繫。java提供了一個搭橋的角色——FutureTask類。
顧名思義,這個是一個將來執行的任務,就至關於新線程所執行的操做。
FutureTask 類也位於 java.util.concurrent包。
FutureTask類 構造函數的參數爲 Callable,而且間接的繼承了Runnable接口。其構造器代碼以下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
到了這裏,FutureTask類的做用就大體明白了。
若是還不明白,看一段實例代碼:
Callable<Boolean> hJob = new HotWarterJob(); FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob); Thread hThread = new Thread(hTask, "** 燒水-Thread");
FutureTask就像一座位於Callable與Thread之間的橋。FutureTask 封裝一個Callable,而後自身又做爲Thread線程的target。
FutureTask還有一個十分重要的貢獻。
Thread線程執行過程當中,異步線程的代碼邏輯在Callable的call方法中,而call方法返回的結果,則須要經過 FutureTask 去獲取。
好了,這下就應該基本清楚了。
總結一下FutureTask這個媒婆的做用:
(1)負責牽線
(2)經過媒婆取得結果
爲了完成這個兩個偉大的使命,FutureTask有個相對比較複雜的繼承關係,具體以下圖:
首先,FutureTask實現了一個接口——RunnableFuture接口,而該RunnableFuture接口繼承了Runnable接口和Future接口。
Runnable接口咱們很熟悉,就是那個java 線程Runnable,表明異步線程的代碼邏輯。
Future接口又是啥呢?
提早劇透下,這個接口,就是用來獲取異步線程結果的。
Future接口和Runnable接口同樣,都是牛氣沖天的接口。 而FutureTask 間接的實現這個兩大接口。
正由於FutureTask可以有兩個很牛逼的爹,因此本身家才很牛逼。
FutureTask 既能當作一個Runnable 做爲 target ,直接被Thread執行;也能做爲Future用來去取得Callable的計算結果。
Future接口這個不是一個複雜的接口,梳理一下,主要提供了3大功能:
(1)獲取併發的任務完成後的執行結果。
(2)可以取消併發執行中的任務;
(3)判斷併發任務是否執行完成;
固然,第一點是最爲經常使用的。也是這個接口的最初使命。
Future接口的代碼以下:
package java.util.concurrent; public interface Future<V> { boolean cancel(boolean mayInterruptRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
對Future接口的方法,詳細說明以下:
V get() :獲取併發任務執行的結果。注意,這個方法是阻塞性的。若是併發任務沒有執行完成,調用此方法的線程會一直阻塞,直到併發任務執行完成。
V get(Long timeout , TimeUnit unit) :獲取併發任務執行的結果。也是阻塞性的,可是會有阻塞的時間限制,若是阻塞時間超過設定的timeout時間,該方法將拋出異常。
boolean isDone():獲取併發任務的執行狀態。若是任務執行結束,返回true。
boolean isCancelled():獲取併發任務的取消狀態。若是任務完成前被取消,則返回true。
boolean cancel(boolean mayInterruptRunning):取消併發任務的執行。
說完了FutureTask的兩個爹,再來到FutureTask自身。
在FutureTask內部,又有哪些成員和方法,具體的執行併發任務、異步獲取任務結果的呢?
首先,FutureTask內部有一個 Callable類型的成員:
private Callable
這個callable實例屬性,是構造器傳進來的。用來保存併發執行的 Callable
其次,FutureTask內部有一個run方法。
這個run方法,是Runnable接口在FutureTask內部的實現。在這個run方法其中,會執行到callable成員的call方法。執行完成後,結果如何提供出去呢?這就是到了最後一點。
最後,FutureTask內部有另外一個 Object 類型的重要成員——outcome實例屬性:
private Object outcome;
掐指一算,就知道這個outcome屬性,是用來保存callable成員call方法的執行結果。FutureTask類run方法執行完成callable成員的call方法後,會將結果保存在outcome實例屬性,供FutureTask類的get實例方法獲取。
好了,重要將這個媒婆介紹完了。
若是尚未清楚,沒關係,看一個實例就一目瞭然了。
回顧一下,前面的join悶葫蘆合併阻塞有一個很大的問題,就是沒有返回值。
若是燒水線程的水有問題,或者燒水壺壞了,mian線程是沒有辦法知道的。
若是清洗線程的茶杯有問題,清洗不來了,mian線程是沒有辦法知道的。
爲了演示結果,給主類增長兩個成員:
static boolean warterOk = false; static boolean cupOk =false;
表明燒水成功和清洗成功。初始值都爲false。
燒水線程、清洗線程執行完後,都須要返回結果。 主線程獲取後,保存在上面的兩個主類成員中。
廢話很少說,看代碼:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * Created by 尼恩 at 瘋狂創客圈 */ public class JavaFutureDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } static class HotWarterJob implements Callable<Boolean> //① { @Override public Boolean call() throws Exception //② { try { Print.tcfo("洗好水壺"); Print.tcfo("灌上涼水"); Print.tcfo("放在火上"); //線程睡眠一段時間,表明燒水中 Thread.sleep(SLEEP_GAP); Print.tcfo("水開了"); } catch (InterruptedException e) { Print.tcfo(" 發生異常被中斷."); return false; } Print.tcfo(" 運行結束."); return true; } } static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { try { Print.tcfo("洗茶壺"); Print.tcfo("洗茶杯"); Print.tcfo("拿茶葉"); //線程睡眠一段時間,表明清洗中 Thread.sleep(SLEEP_GAP); Print.tcfo("洗完了"); } catch (InterruptedException e) { Print.tcfo(" 清洗工做 發生異常被中斷."); return false; } Print.tcfo(" 清洗工做 運行結束."); return true; } } static boolean warterOk = false; static boolean cupOk =false; public static void drinkTea() { if (warterOk && cupOk) { Print.tcfo("泡茶喝"); } else if (!warterOk) { Print.tcfo("燒水失敗,沒有茶喝了"); } else if (!cupOk) { Print.tcfo("杯子洗不了,沒有茶喝了"); } } public static void main(String args[]) { Callable<Boolean> hJob = new HotWarterJob();//③ FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob);//④ Thread hThread = new Thread(hTask, "** 燒水-Thread");//⑤ Callable<Boolean> wJob = new WashJob();//③ FutureTask<Boolean> wTask = new FutureTask<Boolean>(wJob);//④ Thread wThread = new Thread(wTask, "$$ 清洗-Thread");//⑤ hThread.start(); wThread.start(); Thread.currentThread().setName("主線程"); try { warterOk = hTask.get(); cupOk = wTask.get(); // hThread.join(); // wThread.join(); drinkTea(); } catch (InterruptedException e) { Print.tcfo(getCurThreadName() + "發生異常被中斷."); } catch (ExecutionException e) { e.printStackTrace(); } Print.tcfo(getCurThreadName() + " 運行結束."); } }
藉助上面的喝茶實例代碼,說明一下經過FutureTask獲取異步結果的流程步驟:
(1)異步代碼邏輯須要繼承Callable,經過call方法返回具體的值
static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { //..業務代碼,而且有返回值 }
(3)從異步邏輯到異步線程,須要媒婆類FutureTask搭橋
Callable<Boolean> hJob = new HotWarterJob();//異步邏輯 FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob);//媒婆實例 Thread hThread = new Thread(hTask, "** 燒水-Thread");//異步線程
FutureTask和Callable都是泛型類,泛型參數表示返回結果的類型。因此,在使用的時候,倆個類型的泛型參數必定須要一致的。
(3)取得異步線程的執行結果,也須要FutureTask 媒婆實例作下二傳
warterOk = hTask.get();
經過FutureTask 實例的get方法,能夠獲取線程的執行結果。
三步至此,結果到手。
總結一下,FutureTask 比 join 線程合併高明,能取得異步線程的結果。
可是,也就未必高明到哪裏去了。爲啥呢?
由於,經過FutureTask的get方法,獲取異步結果時,主線程也會被阻塞的。這一點,FutureTask和join也是一致的,他們倆都是異步阻塞模式。
異步阻塞的效率是比較低的,被阻塞的主線程,不能幹任何事情,惟一能幹的,就是在傻傻等待。
若是想提升效率,須要用到非阻塞模式。這裏只講回調模式的非阻塞,其餘模式的非阻塞,請關注瘋狂創客圈的後續文章。
原生Java,除了阻塞模式的獲取結果,並無實現非阻塞模式的異步回調。若是須要用到異步回調,得引入一些額外的框架。
在很是著名的google 提供的擴展包 Guava中,提供了一種異步回調的解決方案。
爲了實現異步回調,Guava 對Java的Future 異步模式進行能力導入:
(1)導入了一個新的接口 FutureCallback,表明回調執行的業務邏輯
(2)對Java併發包中的 Future 接口進行了擴展,將回調邏輯做爲監聽器綁定到異步線程
FutureCallback 是一個新增的接口,用來填寫回調邏輯。這個接口,是在實際開發中編程使用到的。回調的代碼,編寫在它的實現類中。
FutureCallback擁有兩個回調方法:
(1)onSuccess ,在異步線程執行成功回調
(2)onFailure,在異步線程拋出異常時回調
FutureCallback的源碼以下:
public interface FutureCallback<V> { void onSuccess(@Nullable V var1); void onFailure(Throwable var1); }
若是將回調方法,綁定到異步線程去呢?
Guava中,有一個很是關鍵的角色,ListenableFuture。看名稱,就能對應出它與Java 中的原生接口的親戚關係。
若是沒有猜錯,這個接口是 Guava 對java 的Future接口的擴展。
來看看 ListenableFuture接口的源碼,以下:
package com.google.common.util.concurrent; import java.util.concurrent.Executor; import java.util.concurrent.Future; public interface ListenableFuture<V> extends Future<V> { void addListener(Runnable var1, Executor var2); }
前面講到,經過Java的Future接口,能夠阻塞取得異步的結果。在這個基礎上,ListenableFuture增長了一個方法 —— addListener 。
這個方法的做用,就是將前一小節的FutureCallback 回調邏輯,綁定到異步線程上。 能夠是,addListener 不直接在實際編程中使用。這個方法只在Guava內部使用,若是對它感興趣,能夠查看Guava源碼。
既然addListener 方法不能直接使用,那麼,在實際編程中,如何將 FutureCallback 回調邏輯綁定到異步線程呢?
不慌,辦法老是有的。
須要用到Guava的Futures 工具類。這個類有一個addCallback 靜態方法,將ListenableFuture 的實例和FutureCallback 的回調實例,進行綁定。
綁定的示意代碼以下:
Futures.addCallback( hFuture , new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { //成功時候的回調邏輯 } public void onFailure(Throwable t) { //異常時候的回調邏輯 } });
從上文已知,原生java的Future接口的實例,一種方法是——直接構建媒婆類FutureTask的實例,就是Future接口的實例。
固然,還有第二種方法,就是經過線程池獲取Future接口的實例。具體的作法是向Java線程池提交異步任務,包括Runnable或者Callable實例。
方法以下:
Future<Boolean> hTask = pool.submit(hJob); Future<Boolean> wTask = pool.submit(wJob);
注意,pool 是一個Java 線程池。
若是要獲取Guava的ListenableFuture 實例,主要是經過相似上面的第二種方式——向線程池提交任務的異步任務的方式獲取。不過,用到的線程池,是Guava的線程池,不是Java的線程池。
Guava線程池,而是對Java線程池的一種裝飾。
兩種線程池的建立代碼,具體以下:
//java 線程池 ExecutorService jPool = Executors.*newFixedThreadPool*(10); //guava 線程池 ListeningExecutorService gPool = MoreExecutors.*listeningDecorator*(jPool);
有了Guava的線程池以後,就能夠經過提交任務,來獲取ListenableFuture 實例了。代碼以下 :
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
關於Gava的線程池,請關注【瘋狂創客圈】的線程池的博客文章。
總結一下,Guava異步回調的流程以下:
第一步:建立Java的 Callable的異步任務實例。實例以下:
Callable<Boolean> hJob = new HotWarterJob();//異步任務Callable<Boolean> wJob = new WashJob();//異步任務
異步任務也能夠是Runnable類型。
第二步: 獲取Guava線程池
//java 線程池 ExecutorService jPool = Executors.*newFixedThreadPool*(10); //guava 線程池 ListeningExecutorService gPool = MoreExecutors.*listeningDecorator*(jPool);
第三步: 提交異步任務到Guava線程池,獲取ListenableFuture 實例
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
第四步:建立回調的 FutureCallback 實例,經過Futures.addCallback,將回調邏輯綁定到ListenableFuture 實例。
Futures.*addCallback*( hFuture , new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { //成功時候的回調邏輯 } public void onFailure(Throwable t) { //異常時候的回調邏輯 } });
完成以上四步,當異步邏輯執行完成後,就會回調FutureCallback 實例中的回調代碼。
已經對喝茶實例的代碼很是熟悉下,下面是Guava的異步回調的演進版本,代碼以下:
package com.crazymakercircle.coccurent; import com.crazymakercircle.util.Print; import com.google.common.util.concurrent.*; import java.util.concurrent.*; /** * Created by 尼恩 at 瘋狂創客圈 */ public class GuavaFutureDemo { public static final int SLEEP_GAP = 500; public static String getCurThreadName() { return Thread.currentThread().getName(); } static class HotWarterJob implements Callable<Boolean> //① { @Override public Boolean call() throws Exception //② { try { Print.tcfo("洗好水壺"); Print.tcfo("灌上涼水"); Print.tcfo("放在火上"); //線程睡眠一段時間,表明燒水中 Thread.sleep(SLEEP_GAP); Print.tcfo("水開了"); } catch (InterruptedException e) { Print.tcfo(" 發生異常被中斷."); return false; } Print.tcfo(" 運行結束."); return true; } } static class WashJob implements Callable<Boolean> { @Override public Boolean call() throws Exception { try { Print.tcfo("洗茶壺"); Print.tcfo("洗茶杯"); Print.tcfo("拿茶葉"); //線程睡眠一段時間,表明清洗中 Thread.sleep(SLEEP_GAP); Print.tcfo("洗完了"); } catch (InterruptedException e) { Print.tcfo(" 清洗工做 發生異常被中斷."); return false; } Print.tcfo(" 清洗工做 運行結束."); return true; } } static boolean warterOk = false; static boolean cupOk = false; public synchronized static void drinkTea() { if (warterOk && cupOk) { Print.tcfo("泡茶喝"); } else if (!warterOk) { Print.tcfo("燒水失敗,沒有茶喝了"); } else if (!cupOk) { Print.tcfo("杯子洗不了,沒有茶喝了"); } } public static void main(String args[]) { Thread.currentThread().setName("主線程"); Callable<Boolean> hJob = new HotWarterJob();//③ Callable<Boolean> wJob = new WashJob();//③ //java 線程池 ExecutorService jPool = Executors.newFixedThreadPool(10); //guava 線程池 ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool); ListenableFuture<Boolean> hFuture = gPool.submit(hJob); Futures.addCallback(hFuture, new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { if (r) { warterOk = true; drinkTea(); } else { Print.tcfo("燒水失敗,沒有茶喝了"); } } public void onFailure(Throwable t) { Print.tcfo("燒水失敗,沒有茶喝了"); } }); ListenableFuture<Boolean> wFuture = gPool.submit(wJob); Futures.addCallback(wFuture, new FutureCallback<Boolean>() { public void onSuccess(Boolean r) { if (r) { cupOk = true; drinkTea(); } else { Print.tcfo("清洗失敗,沒有茶喝了"); } } public void onFailure(Throwable t) { Print.tcfo("杯子洗不了,沒有茶喝了"); } }); try { Print.tcfo("讀書中......"); Thread.sleep(100000); } catch (InterruptedException e) { Print.tcfo(getCurThreadName() + "發生異常被中斷."); } Print.tcfo(getCurThreadName() + " 運行結束."); gPool.shutdown(); } }
本文已經太長,還有不少內容
未完待續
爲何說異步回調是如此的重要呢 ? 由於高併發編程,處處都用到Future模式和Callback模式。
下一篇:Netty 中的Future 回調實現與線程池詳解。這個也是一個很是重要的基礎篇。
Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
瘋狂創客圈 【 博客園 總入口 】