Java併發編程-擴展可回調的Future

前提

最近在看JUC線程池java.util.concurrent.ThreadPoolExecutor的源碼實現,其中瞭解到java.util.concurrent.Future的實現原理。從目前java.util.concurrent.Future的實現來看,雖然實現了異步提交任務,可是任務結果的獲取過程須要主動調用Future#get()或者Future#get(long timeout, TimeUnit unit),而前者是阻塞的,後者在異步任務執行時間不肯定的狀況下有可能須要進行輪詢,這兩種狀況和異步調用的初衷有點相違背。因而筆者想結合目前瞭解到的Future實現原理的前提下擴展出支持(監聽)回調的Future,思路上參考了Guava加強的ListenableFuture。本文編寫的時候使用的JDK是JDK11,其餘版本可能不適合。java

簡單分析Future的實現原理

虛擬例子推演

併發大師Doug Lea在設計JUC線程池的時候,提供了一個頂層執行器接口Executorshell

public interface Executor {

    void execute(Runnable command);
}

實際上,這裏定義的方法Executor#execute()是整套線程池體系最核心的接口,也就是ThreadPoolExecutor定義的核心線程、額外建立的線程(線程池最大線程容量 - 核心線程數)都是在這個接口提交任務的時候懶建立的,也就是說ExecutorService接口擴展的功能都是基於Executor#execute()的基礎進行擴展。Executor#execute()方法只是單純地把任務實例Runnable對象投放到線程池中分配合適的線程執行,可是因爲方法返回值是void類型,咱們是沒法感知任務何時執行完畢。這個時候就須要對Runnable任務實例進行包裝(下面是僞代碼 + 僞邏輯):編程

// 下面這個Wrapper和Status類是筆者虛構出來
@RequiredArgsConstructor
class Wrapper implements Runnable{

    private final Runnable target;
    private Status status = Status.of("初始化");

    @Override
    public void run(){
        try{
           target.run();
           status = Status.of("執行成功");
        }catch(Throwable t){
           status = Status.of("執行異常"); 
        }
    }
}

咱們只須要把new Wrapper(原始Runnable實例)投放到線程池執行,那麼經過定義好的Status狀態記錄變量就能得知異步任務執行的狀態,以及何時執行完畢(包括正常的執行完畢和異常的執行完畢)。這裏僅僅解決了任務執行的狀態獲取,可是Executor#execute()方法法返回值是void類型的特色使得咱們沒法回調Runnable對象執行的結果。這個時候須要定義一個能夠回調執行結果的接口,其實已經有現成的接口Callable併發

@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
}

這裏遇到了一個問題:因爲Executor#execute()只接收Runnable參數,咱們須要把Callable接口適配到Runnable接口,這個時候,作一次簡單的委託便可:app

@RequiredArgsConstructor
class Wrapper implements Runnable{

    private final Callable callable;
    private Status status = Status.of("初始化");
    @Getter
    private Object outcome;

    @Override
    public void run(){
        try{
           outcome = callable.call();
           status = Status.of("執行成功");
        }catch(Throwable t){
           status = Status.of("執行異常"); 
           outcome = t;
        }
    }
}

這裏把Callable實例直接委託給Wrapper,而Wrapper實現了Runnable接口,執行結果直接存放在定義好的Object類型的對象outcome中便可。當咱們感知到執行狀態已經結束,就能夠從outcome中提取到執行結果。異步

Future的實現

上面一個小結僅僅對Future實現作一個相對合理的虛擬推演,實際上,RunnableFuture纔是JUC中經常使用的複合接口,它同時實現了RunnableFutureide

public interface RunnableFuture<V> extends Runnable, Future<V> {
    
    void run();
}

上一節提到的虛構出來的Wrapper類,在JUC中相似的實現是java.util.concurrent.FutureTask,它就是CallableRunnable的適配器,FutureTask實現了RunnableFuture接口:函數

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    // 省略其餘代碼
}

注意到核心屬性state表示執行狀態,outcome承載執行結果。接着看提交Callable類型任務的方法ExecutorService#submit()測試

public interface ExecutorService extends Executor {

    // 省略其餘接口方法

    <T> Future<T> submit(Callable<T> task);
}

當咱們經過上述ExecutorService#submit()方法提交Callable類型任務的時候,實際上作了以下的步驟:ui

  1. 檢查入參task的存在性,若是爲null拋出NullPointerException
  2. Callable類型的task包裝爲FutureTask實例。
  3. 把新建的FutureTask實例放到線程池中執行,也就是調用Executor#execute(FutureTask實例)
  4. 返回FutureTask實例的接口實例RunnableFuture(其實是返回子接口Future實例)。

若是咱們須要獲取結果,能夠Future#get()或者Future#get(long timeout, TimeUnit unit)獲取,調用這兩個方法的時候參看FutureTask裏面的方法實現,得知步驟以下:

  1. 若是狀態state小於等於COMPLETING(1),說明任務還在執行中,獲取結果的請求線程會放入WaitNode類型的隊列中進行阻塞。
  2. 若是任務執行完畢,無論異常完畢仍是正常完畢,除了會更新狀態state和把結果賦值到outcome以外,還會喚醒全部阻塞獲取結果的線程,而後調用鉤子方法FutureTask#done()(具體見源碼FutureTask#finishCompletion())。

其實分析了這麼多,筆者想指出的結論就是:Callable類型任務提交到線程池中執行完畢(包括正常執行完畢和異常執行完畢)以後,都會回調鉤子方法FutureTask#done()。這個就是咱們擴展可監聽Future的理論依據。

擴展可回調的Future

先作一次編碼實現,再簡單測試其功能。

編碼實現

先定義一個Future接口的子接口ListenableFuture,用於添加可監聽的回調:

public interface ListenableFuture<V> extends Future<V> {

    void addCallback(ListenableFutureCallback<V> callback, Executor executor);
}

ListenableFutureCallback是一個函數式回調接口:

@FunctionalInterface
public interface ListenableFutureCallback<V> {

    void callback(V value, Throwable throwable);
}

對於ListenableFutureCallback而言,回調的結果valuethrowable是互斥的。正常執行完畢的狀況下value將會是執行結果值,throwablenull;異常執行完畢的狀況下,value將會是nullthrowable將會是拋出的異常實例。若是更習慣於分開處理正常執行完畢的結果和異常執行完畢的結果,ListenableFutureCallback能夠這樣定義:

public interface ListenableFutureCallback<V> {

    void onSuccess(V value);

    void onError(Throwable throwable);
}

接着定義ListenableExecutorService接口繼承ExecutorService接口:

public interface ListenableExecutorService extends ExecutorService {

    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable);

    /**
     * 定義這個方法是由於有些時候因爲任務執行時間很是短,有可能經過返回的ListenableFuture實例添加回調以前已經執行完畢,所以能夠支持顯式傳入回調
     *
     * @param callable  callable
     * @param callbacks callbacks
     * @param executor  executor
     * @return ListenableFuture
     */
    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor);
}

而後添加一個執行單元適配器ListenableFutureCallbackRunnable,承載每次回調觸發的調用(實現Runnable接口,從而支持異步執行):

@RequiredArgsConstructor
public class ListenableFutureCallbackRunnable<V> implements Runnable {

    private final ListenableFutureCallback<V> callback;
    private final V value;
    private final Throwable throwable;

    @Override
    public void run() {
        callback.callback(value, throwable);
    }
}

接着須要定義一個FutureTask的子類ListenableFutureTask,核心邏輯是覆蓋FutureTask#done()方法觸發回調:

// ListenableFutureTask
public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {

    private final List<Execution<V>> executions = new ArrayList<>();

    public ListenableFutureTask(Callable<V> callable) {
        super(callable);
    }

    public ListenableFutureTask(Runnable runnable, V result) {
        super(runnable, result);
    }

    public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) {
        return new ListenableFutureTask<>(callable);
    }

    @Override
    protected void done() {
        Iterator<Execution<V>> iterator = executions.iterator();
        Throwable throwable = null;
        V value = null;
        try {
            value = get();
        } catch (Throwable t) {
            throwable = t;
        }
        while (iterator.hasNext()) {
            Execution<V> execution = iterator.next();
            ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),
                    value, throwable);
            // 異步回調
            if (null != execution.getExecutor()) {
                execution.getExecutor().execute(callbackRunnable);
            } else {
                // 同步回調
                callbackRunnable.run();
            }
        }
    }

    @Override
    public void addCallback(ListenableFutureCallback<V> callback, Executor executor) {
        Execution<V> execution = new Execution<>();
        execution.setCallback(callback);
        execution.setExecutor(executor);
        executions.add(execution);
    }
}

// Execution - 承載每一個回調實例和對應的Executor,Executor實例爲null則進行同步回調
@Data
public class Execution <V>{

    private Executor executor;
    private ListenableFutureCallback<V> callback;
}

最後一步就是編寫線程池ListenableThreadPoolExecutor,繼承自ThreadPoolExecutor而且實現ListenableExecutorService接口:

public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService {

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) {
        if (null == callable) {
            throw new IllegalArgumentException("callable");
        }
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        execute(listenableFutureTask);
        return listenableFutureTask;
    }

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) {
        if (null == callable) {
            throw new IllegalArgumentException("callable");
        }
        if (null == callbacks) {
            throw new IllegalArgumentException("callbacks");
        }
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        for (ListenableFutureCallback<T> callback : callbacks) {
            listenableFutureTask.addCallback(callback, executor);
        }
        execute(listenableFutureTask);
        return listenableFutureTask;
    }
}

測試

引入junit,編寫測試類以下:

public class ListenableFutureTest {

    private static ListenableExecutorService EXECUTOR;
    private static Executor E;

    @BeforeClass
    public static void before() {
        EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), new ThreadFactory() {

            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement()));
                return thread;
            }
        });
        E = Executors.newFixedThreadPool(3);
    }

    @Test
    public void testListenableFuture1() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            return "message";
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, null);
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture2() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            throw new RuntimeException("exception");
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, null);
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture3() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            return "message";
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, E);
        System.out.println("testListenableFuture3 end...");
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture4() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            throw new RuntimeException("exception");
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, E);
        System.out.println("testListenableFuture4 end...");
        Thread.sleep(2000);
    }
}

執行結果:

// testListenableFuture1
Value = message,Throwable = null

// testListenableFuture2
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

// testListenableFuture3
testListenableFuture3 end...
Value = message,Throwable = null

// testListenableFuture4
testListenableFuture4 end...
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

和預期的結果一致,注意一下若是Callable執行拋出異常,異常被包裝爲ExecutionException,要調用Throwable#getCause()才能獲得原始的異常實例。

小結

本文經過了解ThreadPoolExecutorFuture的實現原理作簡單的擴展,使得異步提交任務變得更加優雅和簡便。強化了動手能力的同時,也能加深對併發編程的一些認知。固然,本文只是提供一個十分簡陋的實現,筆者其實還想到了如對回調處理的耗時作監控、回調打上分組標籤執行等等更完善的功能,等到有須要的場景再進行實現。

這裏記錄一下過程當中的一些領悟:

  • Executor#execute()是線程池的核心接口,全部其餘功能都是基於此接口作擴展,它的設計自己是無狀態的。
  • 靈活使用適配器模式,能夠在不改變已發佈的接口的功能同時實現新的接口的功能適配。
  • 要善於發掘和使用JDK類庫設計者留給開發者的擴展接口。

我的博客

(本文完 c-1-d e-a-20190702)

相關文章
相關標籤/搜索