上一篇內容寫了Java
中線程池的實現原理及源碼分析,說好的是實實在在的大知足,想經過一篇文章讓你們對線程池有個透徹的瞭解,可是文章寫完總以爲還缺點什麼?java
上篇文章只提到線程提交的execute()
方法,並無講解線程提交的submit()
方法,submit()
有一個返回值,能夠獲取線程執行的結果Future<T>
,這一講就那深刻學習下submit()
和FutureTask
實現原理。數據結構
我能想到的使用場景就是在並行計算的時候,例如一個方法中調用methodA()、methodB()
,咱們能夠經過線程池異步去提交方法A、B,而後在主線程中獲取組裝方法A、B計算後的結果,可以大大提高方法的吞吐量。多線程
/** * @author wangmeng * @date 2020/5/28 15:30 */ public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newCachedThreadPool(); System.out.println("====執行FutureTask線程任務===="); Future<String> futureTask = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("FutureTask執行業務邏輯"); Thread.sleep(2000); System.out.println("FutureTask業務邏輯執行完畢!"); return "歡迎關注: 一枝花算不算浪漫!"; } }); System.out.println("====執行主線程任務===="); Thread.sleep(1000); boolean flag = true; while(flag){ if(futureTask.isDone() && !futureTask.isCancelled()){ System.out.println("FutureTask異步任務執行結果:" + futureTask.get()); flag = false; } } threadPool.shutdown(); } }
上面的使用很簡單,submit()
內部傳遞的其實是個Callable
接口,咱們本身實現其中的call()
方法,咱們經過futureTask
既能夠獲取到具體的返回值。異步
submit()
是也是提交任務到線程池,只是它能夠獲取任務返回結果,返回結果是經過FutureTask
來實現的,先看下ThreadPoolExecutor
中代碼實現:ide
public class ThreadPoolExecutor extends AbstractExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } } public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } }
提交任務仍是執行execute()
方法,只是task
被包裝成了FutureTask
,也就是在excute()
中啓動線程後會執行FutureTask.run()
方法。源碼分析
再來具體看下它執行的完整鏈路圖:學習
上圖能夠看到,執行任務並返回執行結果的核心邏輯實在FutureTask
中,咱們以FutureTask.run/get
兩個方法爲突破口,一點點剖析FutureTask
的實現原理。this
先看下FutureTask
中部分屬性:線程
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; }
當前task狀態,共有7中類型。
NEW: 當前任務還沒有執行
COMPLETING: 當前任務正在結束,還沒有徹底結束,一種臨界狀態
NORMAL:當前任務正常結束
EXCEPTIONAL: 當前任務執行過程當中發生了異常。
CANCELLED: 當前任務被取消
INTERRUPTING: 當前任務中斷中..
INTERRUPTED: 當前任務已中斷3d
用戶提交任務傳遞的Callable,自定義call方法,實現業務邏輯
任務結束時,outcome保存執行結果或者異常信息。
當前任務被線程執行期間,保存當前任務的線程對象引用
由於會有不少線程去get當前任務的結果,因此這裏使用了一種stack數據結構來保存
咱們已經知道在線程池runWorker()
中最終會調用到FutureTask.run()
方法中,咱們就來看下它的執行原理吧:
具體代碼以下:
public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }
首先是判斷FutureTask
中state
狀態,必須是NEW
才能夠繼續執行。
而後經過CAS
修改runner
引用爲當前線程。
接着執行用戶自定義的call()
方法,將返回結果設置到result中,result可能爲正常返回也可能爲異常信息。這裏主要是調用set()/setException()
set()
方法的實現很簡單,直接看下代碼:
public class FutureTask<V> implements RunnableFuture<V> { protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } } }
將call()
返回的數據賦值給全局變量outcome
上,而後修改state
狀態爲NORMAL
,最後調用finishCompletion()
來作掛起線程的喚醒操做,這個方法等到get()
後面再來說解。
接着看下代碼:
public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } }
若是FutureTask
中state
爲NORMAL
或者COMPLETING
,說明當前任務並無執行完成,調用get()
方法會被阻塞,具體的阻塞邏輯在awaitDone()
方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
這個方法能夠說是FutureTask
中最核心的方法了,一步步來分析:
若是timed
不爲空,這說明指定nanos
時間還未返回結果,線程就會退出。
q
是一個WaitNode
對象,是將當前引用線程封裝在一個stack
數據結構中,WaitNode
對象屬性以下:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
接着判斷當前線程是否中斷,若是中斷則拋出中斷異常。
下面就進入一輪輪的if... else if...
判斷邏輯,咱們仍是採用分支的方式去分析。
分支一:if (s > COMPLETING) {
此時get()
方法已經有結果了,不管是正常返回的結果,仍是異常、中斷、取消等,此時直接返回state
狀態,而後執行report()
方法。
分支二:else if (s == COMPLETING)
條件成立,說明當前任務接近完成狀態,這裏讓當前線程再釋放cpu
,進行下一輪搶佔cpu
。
分支三:else if (q == null)
第一次自旋執行,WaitNode
尚未初始化,初始化q=new WaitNode();
分支四:else if (!queued){
queued
表明當前線程是否入棧,若是沒有入棧則進行入棧操做,順便將全局變量waiters
指向棧頂元素。
分支五/六:LockSupport.park
若是設置了超時時間,則使用parkNanos
來掛起當前線程,不然使用park()
通過這麼一輪自旋循環後,若是執行call()
尚未返回結果,那麼調用get()
方法的線程都會被掛起。
被掛起的線程會等待run()
返回結果後依次喚醒,具體的執行邏輯在finishCompletion()
中。
最終stack
結構中數據以下:
具體實現代碼以下:
private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } done(); callable = null; }
代碼實現很簡單,看過get()
方法後,咱們知道全部調用get()
方法的線程,在run()
尚未返回結果前,都會保存到一個有WaitNode
構成的statck
數據結構中,並且每一個線程都會被掛起。
這裏是遍歷waiters
棧頂元素,而後依次查詢起next
節點進行喚醒,喚醒後的節點接着會日後調用report()
方法。
具體代碼以下:
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
這個方法很簡單,由於執行到了這裏,說明當前state
狀態確定大於COMPLETING
,判斷若是是正常返回,那麼返回outcome
數據。
若是state
是取消狀態,拋出CancellationException
異常。
若是狀態都不知足,則說明執行中出現了差錯,直接拋出ExecutionException
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
cancel()
方法的邏輯很簡單,就是修改state
狀態爲CANCELLED
,而後調用finishCompletion()
來喚醒等待的線程。
這裏若是mayInterruptIfRunning
,就會先中斷當前線程,而後再去喚醒等待的線程。
FutureTask
的實現原理其實很簡單,每一個方法基本上都畫了一個簡單的流程圖來方便當即。
後面還打算分享一個BlockingQueue
相關的源碼解讀,這樣線程池也能夠算是完結了。
在這以前可能會先分享一個SpringCloud
常見配置代碼分析、最佳實踐等手冊,方便工做中使用,也是對以前看過的源碼一種總結。敬請期待!
歡迎關注: