墮落的人生啊……
偶然間看到這個問題,對於標配jdk1.8
的咱們是否是分分鐘拍出答案?
答曰:簡單,Callable
,完美解決,下一題……java
但是,身處jdk1.4
(甚至更早)的前輩們,要怎麼作才能拿到線程返回值呢?或者說,禁用Callable
技能,怎麼獲取線程返回值?
嗯,這彷佛是線程間通訊的問題;只有Runnable
做爲武器,有些麻煩,接受挑戰!程序員
首先,定義任務Task多線程
// 任務Task class Task implements Runnable{ @Getter Object result; //返回值 @Override public void run() { try { // 模擬某耗時邏輯 System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } // 計算獲得最終結果 result = Integer.valueOf(9987); System.out.println(String.format("[%s] 執行完畢..",Thread.currentThread().getName())); } }
以後,啓動線程併發
public static void main(String[] args) { // 使用了內部類,採用以下方式new CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); }
好,T-1
線程啓動了,看樣子能很好的執行任務,問題是main方法
中怎麼獲取到Task
的返回值result
呢?
以目前的代碼運行,效果絕對是T-1線程單飛,和main線程沒啥聯繫。jvm
我有一項能力,老是能第一時間至關最簡易的方法。ide
main線程辛苦些,多跑跑腿檢查下result的狀態:函數
public static void main(String[] args) { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); // main線程頻繁檢查T-1線程 while (true){ if(task.getResult()!=null){ System.out.println(String.format("結果 task=%s",task.getResult())); break; } // 讓cpu稍微冷靜一下 TimeUnit.MILLISECONDS.sleep(200L); System.out.println(String.format("[main] 勤勞檢查result中(result=%s)",task.getResult())); } }
運行效果,多是這樣的:工具
[T-1] 執行中.. [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [main] 勤勞檢查result中(result=null) [T-1] 執行完畢.. [main] 勤勞檢查result中(result=9987) 結果 task=9987
雖然已加入了對cpu而言人性化的休眠方法(sleep),但這依然不是個很好的方案。該方案極大的操勞了main線程,須要一遍遍的檢查子線程的運行狀況——子線程是否將最終結果賦值。
那有沒有一種方式,能夠在T-1運行完以後,告訴main線程呢?
做爲一個老派(技術陳舊)的程序員,我首先想到的是wait..notify組合
this
public static void main(String[] args){ CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); while (true){ //檢查result狀態,尚未賦值,則等待 if(task.getResult()==null){ System.out.println(String.format("[%s] 等待執行..",Thread.currentThread().getName())); synchronized (task){ try { task.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } if(task.getResult()!=null){ System.out.println(String.format("結果 task=%s",task.getResult())); break; } } }
class Task implements Runnable{ @Getter Object result; @Override public void run() { try { System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } result = Integer.valueOf(9987); //喚醒wait的對象 synchronized (this){ this.notify(); } System.out.println(String.format("[%s] 執行完畢..",Thread.currentThread().getName())); } }
改造後,執行效果以下:spa
[T-1] 執行中.. [main] 等待執行.. [T-1] 執行完畢.. 結果 task=9987
其實也可使用LockSupport
實現,和 wait / notify 相似,直接貼出完整代碼吧:
public class CallbackTest { class Task implements Runnable{ @Getter Object result; // 構造函數傳入調用線程(main線程) Thread runner; Task(Thread runner){ this.runner = runner; } @Override public void run() { try { System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } result = Integer.valueOf(9987); //喚醒main線程 synchronized (this){ LockSupport.unpark(runner); } System.out.println(String.format("[%s] 執行完畢..",Thread.currentThread().getName())); } } public static void main(String[] args) { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(Thread.currentThread()); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); while (true){ if(task.getResult()==null){ System.out.println(String.format("[%s] 等待執行..",Thread.currentThread().getName())); LockSupport.park(); //main線程阻塞 } if(task.getResult()!=null){ System.out.println(String.format("結果 task=%s",task.getResult())); break; } } } }
至此,咱們至關於能夠用本身的方式獲取到Thread的返回值了,此時回顧下文章開始初的解答:
偶然間看到這個問題,對於標配`jdk1.8`的咱們是否是分分鐘拍出答案? 答曰:簡單,`Callable`,完美解決,下一題……
當時很天然的就回答了Callable
,先看看它是怎麼用的。
public class CallbackTest { class Task implements Callable<Object> { @Override public Object call() { try { // 某耗時邏輯 System.out.println(String.format("[%s] 執行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.valueOf(9987); } } public static void main(String[] args) throws ExecutionException, InterruptedException { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); ExecutorService es = Executors.newSingleThreadExecutor(); Future<Object> future = es.submit(task); System.out.println("結果:"+future.get()); es.shutdown(); } }
代碼並不複雜,demo 中獲取返回值的方式是future.get()
,這是一個阻塞方法;在子線程執行完(return)以前會一直阻塞。沒用過的開發兄弟(姐妹?)們自行科普吧,很少解釋了。
Callable源碼以下:
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Callable自己就一接口,沒什麼玄機,玄機在 Future 或者說FutureTask
上。
重頭戲來了,看看源碼是怎麼實現從其它線程獲取返回值的。
先瞧瞧FutureTask的江湖地位:
能夠看出,FutureTask
是Future接口
和Runnable接口
的實現類。
此事留個大概印象,咱們來看下FutureTask
是怎麼和Callable
關聯上的?
(可對照下文,追下源碼;若是實在不理解,可直接跳到本章節末尾結論處)
ExecutorService es = Executors.newSingleThreadExecutor(); Future<Object> future = es.submit(task);
以例子中的ExecutorService的submit方法做爲入口,實際的實現方法爲AbstractExecutorService
的submit
:
/* `AbstractExecutorService` */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); //註釋1-構建了FutureTask execute(ftask); //註釋2-最終會調用ftask的run方法,也就是調用`步驟1構建的FutureTask對象的run方法 return ftask; } ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); //1.1-調用FutureTask的構造函數 }
FutureTask
的構造函數:// callable是FutureTask的成員變量 private Callable<V> callable; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //爲成員變量賦值 this.state = NEW; // ensure visibility of callable }
結論1
:經ExecutorService的穿針引線,Callable會最終賦值給FutureTask的成員變量
/* `AbstractExecutorService`的`submit` */ public <T> Future<T> submit(Callable<T> task) { ... execute(ftask); //註釋2-最終會調用ftask的run方法,也就是調用`步驟1構建的FutureTask對象的run方法 ... } ↓↓↓↓↓ ↓↓↓↓↓ /* ThreadPoolExecutor的execute */ public void execute(Runnable command) { ... addWorker(null, false); //添加到worker中 ... } ↓↓↓↓↓ ↓↓↓↓↓ /* ThreadPoolExecutor的addWorker */ private boolean addWorker(Runnable firstTask, boolean core) { w = new Worker(firstTask); //`Worker`封裝 final Thread t = w.thread; ... t.start(); //註釋3-worker中的thread執行start方法,會調用對應Runnable的run方法 ... } ↓↓↓↓↓ ↓↓↓↓↓ /* 內部類`Work` */ final Thread thread; // 成員變量 Runnable firstTask; // 成員變量 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //賦值成員變量firstTask this.thread = getThreadFactory().newThread(this); //建立新線程,並賦值成員變量thread } // 3.1-`註釋3`處的start,會執行此處的run方法,進而會調用runWorker方法 public void run() { runWorker(this); } final void runWorker(Worker w) { ... Runnable task = w.firstTask; ... task.run(); //##### 注意了,最終會調用到此處 ##### ... }
task.run()
中的task又是什麼呢,就是在最開始賦值的FutureTask(註釋1處),看下它的run方法
public void run() { ... Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //會調用callable的call方法,這個方法中就是咱們本身定義的邏輯 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); //註釋4-賦值動做 } ... }
結論2
:調用過程,通過一系列週轉,最終會調用Callable的call方法(也就是咱們的自定義邏輯)
//成員變量 private Object outcome; protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //賦值給成員變量 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
綜上,可得出結論:FutureTask包裝了Callable,執行期call方法後,將返回值賦值給成員變量
接下來探索下返回值的獲取,即Future.get()
的實現。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 1-未完成狀態,線程阻塞 s = awaitDone(false, 0L); return report(s); // 2-已完成狀態,直接獲取 } // 1.1-阻塞 private int awaitDone(boolean timed, long nanos) throws InterruptedException { ... LockSupport.park(this); //阻塞 ... // 2.1-返回了outcome private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
get() 的邏輯並不複雜:
outcome
(call() 方法返回值的引用)賦值,阻塞outcome
賦值,則將該對象返回怎麼樣,是否是有點似曾相識?這和咱們本身實現的那一版的邏輯是一致的!
再次看下set() 方法,裏面應該會有LockSupport.unpark(Thread t)
方法。
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); //這裏看上去很可疑 } } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //吼吼吼,抓到你了,果真有LockSupport.unpark(t) } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
果真找到了park方法對應的unpark,證實咱們的推斷是正確的——FutureTask
的核心實現思路,與咱們本身的實現方式是一致的(尤爲LockSupport版本),即子線程未完成時阻塞,已完成時釋放。
主邏輯分析完了,再來兩個開胃甜點。
對比本身和源碼的實現,都用LockSupport,使用的阻塞方法卻不相同——park()
vs park(Object blocker)
差異在哪?引用官方文檔的解釋:
The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. 當線程被阻塞時記錄此對象,以容許監視和診斷工具識別線程被阻塞的緣由。 (Such tools may access blockers using method getBlocker(java.lang.Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. 有參數的park(Object blocker)是被強烈推薦的 The normal argument to supply as a blocker within a lock implementation is this.
按文檔中的意思:傳入的blocker對象,至關於一個標誌對象,線程阻塞時會記錄下來。下面的例子能明顯看出差異
舉例說明:(轉自 https://www.jianshu.com/p/835...)
private static void parkVsParkBlocker() { Thread t1 = new Thread(() -> { LockSupport.park(); }, "t1"); t1.start(); Object blocker = new Object(); Thread t2 = new Thread(() -> { LockSupport.park(blocker); }, "t2"); t2.start(); LockSupport.getBlocker(t2); unpark(t1, 60); unpark(t2, 60); }
Print java stack trace of a given jvm process.
jstack jps -l | grep LockSupport | awk '{print $1}'
FutureTask做爲抽象出的工具類,考慮了多線程環境下的get()的狀況,這部分在前文故意忽略了。
而併發環境下的數據統一,主要靠volatile
關鍵字+CAS
來達成。(經典模式)
// 狀態,記錄子線程執行狀況 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // 記錄子線程,運行Callable.call()的線程 private volatile Thread runner; // 等待節點,鏈表 private volatile WaitNode waiters;
volatile關鍵字,此處主要用於讓其它線程可見(可見性);那CAS
(Compare And Sweep)是作什麼的?
本質上,它就是個樂觀鎖:
return true
return false
jdk 9
以前,主要靠Unsafe實現CAS;自jdk 9
開始,推出了VarHandle
,旨在替代 AtomicXX 工具,以及方便開發人員使用Unsafe的部分權能。
以狀態 state 的變動爲例,看下VarHandle是如何完成CAS的:
private volatile int state; /* 聲明和賦值 */ private static final VarHandle STATE; static{ try { MethodHandles.Lookup l = MethodHandles.lookup(); //1 - 經過MethodHandles.lookup()聲明MethodHandles.Lookup對象 STATE = l.findVarHandle(FutureTask.class, "state", int.class); //2 - 賦值VarHandle STATE,此時STATE和state就創建了某種聯繫 } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } /* 調用 */ protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 3 - 當前對象,將state變量,由NEW改爲COMPLETING outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }