最近在看JUC線程池java.util.concurrent.ThreadPoolExecutor
的源碼實現,其中瞭解到java.util.concurrent.Future
的實現原理。從目前java.util.concurrent.Future
的實現來看,雖然實現了異步提交任務,可是任務結果的獲取過程須要主動調用Future#get()
或者Future#get(long timeout, TimeUnit unit)
,而前者是阻塞的,後者在異步任務執行時間不肯定的狀況下有可能須要進行輪詢,這兩種狀況和異步調用的初衷有點相違背。因而筆者想結合目前瞭解到的Future
實現原理的前提下擴展出支持(監聽)回調的Future
,思路上參考了Guava
加強的ListenableFuture
。本文編寫的時候使用的JDK是JDK11,其餘版本可能不適合。java
併發大師Doug Lea在設計JUC線程池的時候,提供了一個頂層執行器接口Executor
:shell
public interface Executor { void execute(Runnable command); }
實際上,這裏定義的方法Executor#execute()
是整套線程池體系最核心的接口,也就是ThreadPoolExecutor
定義的核心線程、額外建立的線程(線程池最大線程容量 - 核心線程數)都是在這個接口提交任務的時候懶建立的,也就是說ExecutorService
接口擴展的功能都是基於Executor#execute()
的基礎進行擴展。Executor#execute()
方法只是單純地把任務實例Runnable
對象投放到線程池中分配合適的線程執行,可是因爲方法返回值是void
類型,咱們是沒法感知任務何時執行完畢。這個時候就須要對Runnable
任務實例進行包裝(下面是僞代碼 + 僞邏輯):編程
// 下面這個Wrapper和Status類是筆者虛構出來 @RequiredArgsConstructor class Wrapper implements Runnable{ private final Runnable target; private Status status = Status.of("初始化"); @Override public void run(){ try{ target.run(); status = Status.of("執行成功"); }catch(Throwable t){ status = Status.of("執行異常"); } } }
咱們只須要把new Wrapper(原始Runnable實例)
投放到線程池執行,那麼經過定義好的Status
狀態記錄變量就能得知異步任務執行的狀態,以及何時執行完畢(包括正常的執行完畢和異常的執行完畢)。這裏僅僅解決了任務執行的狀態獲取,可是Executor#execute()
方法法返回值是void
類型的特色使得咱們沒法回調Runnable
對象執行的結果。這個時候須要定義一個能夠回調執行結果的接口,其實已經有現成的接口Callable
:併發
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
這裏遇到了一個問題:因爲Executor#execute()
只接收Runnable
參數,咱們須要把Callable
接口適配到Runnable
接口,這個時候,作一次簡單的委託便可:app
@RequiredArgsConstructor class Wrapper implements Runnable{ private final Callable callable; private Status status = Status.of("初始化"); @Getter private Object outcome; @Override public void run(){ try{ outcome = callable.call(); status = Status.of("執行成功"); }catch(Throwable t){ status = Status.of("執行異常"); outcome = t; } } }
這裏把Callable
實例直接委託給Wrapper
,而Wrapper
實現了Runnable
接口,執行結果直接存放在定義好的Object
類型的對象outcome
中便可。當咱們感知到執行狀態已經結束,就能夠從outcome
中提取到執行結果。異步
上面一個小結僅僅對Future
實現作一個相對合理的虛擬推演,實際上,RunnableFuture
纔是JUC中經常使用的複合接口,它同時實現了Runnable
和Future
:ide
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
上一節提到的虛構出來的Wrapper
類,在JUC中相似的實現是java.util.concurrent.FutureTask
,它就是Callable
和Runnable
的適配器,FutureTask
實現了RunnableFuture
接口:函數
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; // 省略其餘代碼 }
注意到核心屬性state
表示執行狀態,outcome
承載執行結果。接着看提交Callable
類型任務的方法ExecutorService#submit()
:測試
public interface ExecutorService extends Executor { // 省略其餘接口方法 <T> Future<T> submit(Callable<T> task); }
當咱們經過上述ExecutorService#submit()
方法提交Callable
類型任務的時候,實際上作了以下的步驟:ui
task
的存在性,若是爲null
拋出NullPointerException
。Callable
類型的task
包裝爲FutureTask
實例。FutureTask
實例放到線程池中執行,也就是調用Executor#execute(FutureTask實例)
。FutureTask
實例的接口實例RunnableFuture
(其實是返回子接口Future
實例)。若是咱們須要獲取結果,能夠Future#get()
或者Future#get(long timeout, TimeUnit unit)
獲取,調用這兩個方法的時候參看FutureTask
裏面的方法實現,得知步驟以下:
state
小於等於COMPLETING(1)
,說明任務還在執行中,獲取結果的請求線程會放入WaitNode
類型的隊列中進行阻塞。state
和把結果賦值到outcome
以外,還會喚醒全部阻塞獲取結果的線程,而後調用鉤子方法FutureTask#done()
(具體見源碼FutureTask#finishCompletion()
)。其實分析了這麼多,筆者想指出的結論就是:Callable
類型任務提交到線程池中執行完畢(包括正常執行完畢和異常執行完畢)以後,都會回調鉤子方法FutureTask#done()
。這個就是咱們擴展可監聽Future
的理論依據。
先作一次編碼實現,再簡單測試其功能。
先定義一個Future
接口的子接口ListenableFuture
,用於添加可監聽的回調:
public interface ListenableFuture<V> extends Future<V> { void addCallback(ListenableFutureCallback<V> callback, Executor executor); }
ListenableFutureCallback
是一個函數式回調接口:
@FunctionalInterface public interface ListenableFutureCallback<V> { void callback(V value, Throwable throwable); }
對於ListenableFutureCallback
而言,回調的結果value
和throwable
是互斥的。正常執行完畢的狀況下value
將會是執行結果值,throwable
爲null
;異常執行完畢的狀況下,value
將會是null
,throwable
將會是拋出的異常實例。若是更習慣於分開處理正常執行完畢的結果和異常執行完畢的結果,ListenableFutureCallback
能夠這樣定義:
public interface ListenableFutureCallback<V> { void onSuccess(V value); void onError(Throwable throwable); }
接着定義ListenableExecutorService
接口繼承ExecutorService
接口:
public interface ListenableExecutorService extends ExecutorService { <T> ListenableFuture<T> listenableSubmit(Callable<T> callable); /** * 定義這個方法是由於有些時候因爲任務執行時間很是短,有可能經過返回的ListenableFuture實例添加回調以前已經執行完畢,所以能夠支持顯式傳入回調 * * @param callable callable * @param callbacks callbacks * @param executor executor * @return ListenableFuture */ <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor); }
而後添加一個執行單元適配器ListenableFutureCallbackRunnable
,承載每次回調觸發的調用(實現Runnable
接口,從而支持異步執行):
@RequiredArgsConstructor public class ListenableFutureCallbackRunnable<V> implements Runnable { private final ListenableFutureCallback<V> callback; private final V value; private final Throwable throwable; @Override public void run() { callback.callback(value, throwable); } }
接着須要定義一個FutureTask
的子類ListenableFutureTask
,核心邏輯是覆蓋FutureTask#done()
方法觸發回調:
// ListenableFutureTask public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> { private final List<Execution<V>> executions = new ArrayList<>(); public ListenableFutureTask(Callable<V> callable) { super(callable); } public ListenableFutureTask(Runnable runnable, V result) { super(runnable, result); } public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) { return new ListenableFutureTask<>(callable); } @Override protected void done() { Iterator<Execution<V>> iterator = executions.iterator(); Throwable throwable = null; V value = null; try { value = get(); } catch (Throwable t) { throwable = t; } while (iterator.hasNext()) { Execution<V> execution = iterator.next(); ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(), value, throwable); // 異步回調 if (null != execution.getExecutor()) { execution.getExecutor().execute(callbackRunnable); } else { // 同步回調 callbackRunnable.run(); } } } @Override public void addCallback(ListenableFutureCallback<V> callback, Executor executor) { Execution<V> execution = new Execution<>(); execution.setCallback(callback); execution.setExecutor(executor); executions.add(execution); } } // Execution - 承載每一個回調實例和對應的Executor,Executor實例爲null則進行同步回調 @Data public class Execution <V>{ private Executor executor; private ListenableFutureCallback<V> callback; }
最後一步就是編寫線程池ListenableThreadPoolExecutor
,繼承自ThreadPoolExecutor
而且實現ListenableExecutorService
接口:
public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService { public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) { if (null == callable) { throw new IllegalArgumentException("callable"); } ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable); execute(listenableFutureTask); return listenableFutureTask; } @Override public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) { if (null == callable) { throw new IllegalArgumentException("callable"); } if (null == callbacks) { throw new IllegalArgumentException("callbacks"); } ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable); for (ListenableFutureCallback<T> callback : callbacks) { listenableFutureTask.addCallback(callback, executor); } execute(listenableFutureTask); return listenableFutureTask; } }
引入junit
,編寫測試類以下:
public class ListenableFutureTest { private static ListenableExecutorService EXECUTOR; private static Executor E; @BeforeClass public static void before() { EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement())); return thread; } }); E = Executors.newFixedThreadPool(3); } @Test public void testListenableFuture1() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); return "message"; }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, null); Thread.sleep(2000); } @Test public void testListenableFuture2() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); throw new RuntimeException("exception"); }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, null); Thread.sleep(2000); } @Test public void testListenableFuture3() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); return "message"; }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, E); System.out.println("testListenableFuture3 end..."); Thread.sleep(2000); } @Test public void testListenableFuture4() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); throw new RuntimeException("exception"); }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, E); System.out.println("testListenableFuture4 end..."); Thread.sleep(2000); } }
執行結果:
// testListenableFuture1 Value = message,Throwable = null // testListenableFuture2 Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception // testListenableFuture3 testListenableFuture3 end... Value = message,Throwable = null // testListenableFuture4 testListenableFuture4 end... Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
和預期的結果一致,注意一下若是Callable
執行拋出異常,異常被包裝爲ExecutionException
,要調用Throwable#getCause()
才能獲得原始的異常實例。
本文經過了解ThreadPoolExecutor
和Future
的實現原理作簡單的擴展,使得異步提交任務變得更加優雅和簡便。強化了動手能力的同時,也能加深對併發編程的一些認知。固然,本文只是提供一個十分簡陋的實現,筆者其實還想到了如對回調處理的耗時作監控、回調打上分組標籤執行等等更完善的功能,等到有須要的場景再進行實現。
這裏記錄一下過程當中的一些領悟:
Executor#execute()
是線程池的核心接口,全部其餘功能都是基於此接口作擴展,它的設計自己是無狀態的。(本文完 c-1-d e-a-20190702)