【併發編程】Future模式添加Callback及Promise 模式

Futurejavascript

Future是Java5增長的類,它用來描述一個異步計算的結果。你可使用 isDone 方法檢查計算是否完成,或者使用 get 方法阻塞住調用線程,直到計算完成返回結果。你也可使用 cancel 方法中止任務的執行。下面來一個栗子:前端

public class FutureDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> f = es.submit(() ->{
            Thread.sleep(10000);
            // 結果
            return 100;
        });

        // do something

        Integer result = f.get();
        System.out.println(result);

//        while (f.isDone()) {
//            System.out.println(result);
//        }
    }
}

在這個例子中,咱們往線程池中提交了一個任務並當即返回了一個Future對象,接着能夠作一些其餘操做,最後利用它的 get 方法阻塞等待結果或 isDone 方法輪詢等待結果(關於Future的原理能夠參考以前的文章:【併發編程】Future模式及JDK中的實現java

雖然這些方法提供了異步執行任務的能力,可是對於結果的獲取卻仍是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。編程

阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時的獲得計算結果,爲何不能用觀察者設計模式當計算結果完成及時通知監聽者呢?設計模式

不少語言,好比Node.js,採用Callback的方式實現異步編程。Java的一些框架,好比Netty,本身擴展了Java的 Future 接口,提供了 addListener 等多個擴展方法。Google的guava也提供了通用的擴展Future:ListenableFuture 、 SettableFuture 以及輔助類 Futures 等,方便異步編程。爲此,Java終於在JDK1.8這個版本中增長了一個能力更強的Future類:CompletableFuture 。它提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果。下面來看看這幾種方式。緩存

Netty-Future多線程

引入Maven依賴:併發

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29.Final</version>
</dependency>
public class NettyFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        EventExecutorGroup group = new DefaultEventExecutorGroup(4);
        System.out.println("開始:" + DateUtils.getNow());

        Future<Integer> f = group.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                return 100;
            }
        });

        f.addListener(new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> objectFuture) throws Exception {
                System.out.println("計算結果:" + objectFuture.get());
            }
        });

        System.out.println("結束:" + DateUtils.getNow());
        // 不讓守護線程退出
        new CountDownLatch(1).await();
    }
}

輸出結果:框架

開始:2019-05-16 08:25:40:779
結束:2019-05-16 08:25:40:788
開始耗時計算:2019-05-16 08:25:40:788
結束耗時計算:2019-05-16 08:25:50:789
計算結果:100

從結果能夠看出,耗時計算結束後自動觸發Listener的完成方法,避免了主線程無謂的阻塞等待,那麼它到底是怎麼作到的呢?下面看源碼異步

DefaultEventExecutorGroup 實現了 EventExecutorGroup 接口,而 EventExecutorGroup 則是實現了JDK ScheduledExecutorService 接口的線程組接口,因此它擁有線程池的全部方法。然而它卻把全部返回 java.util.concurrent.Future 的方法重寫爲返回 io.netty.util.concurrent.Future ,把全部返回 java.util.concurrent.ScheduledFuture 的方法重寫爲返回 io.netty.util.concurrent.ScheduledFuture 。

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    /**
     * 返回一個EventExecutor
     */
    EventExecutor next();

    Iterator<EventExecutor> iterator();

    Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);
    <T> Future<T> submit(Callable<T> task);

    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

EventExecutorGroup 的submit方法由於 newTaskFor 的重寫致使返回了netty的 Future 實現類,而這個實現類正是 PromiseTask 。

@Override
public <T> Future<T> submit(Callable<T> task) {
    return (Future<T>) super.submit(task);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new PromiseTask<T>(this, callable);
}

PromiseTask 的實現很簡單,它緩存了要執行的 Callable 任務,並在run方法中完成了任務調用和Listener的通知。

@Override
public void run() {
    try {
        if (setUncancellableInternal()) {
            V result = task.call();
            setSuccessInternal(result);
        }
    } catch (Throwable e) {
        setFailureInternal(e);
    }
}

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

@Override
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}

任務調用成功或者失敗都會調用 notifyListeners 來通知Listener,因此你們得在回調的函數裏調用 isSuccess 方法來檢查狀態。

這裏有一個疑惑,會不會 Future 在調用 addListener 方法的時候任務已經執行完成了,這樣子會不會通知就會失敗了啊?

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    synchronized (this) {
        addListener0(listener);
    }

    if (isDone()) {
        notifyListeners();
    }

    return this;
}

能夠發現,在Listener添加成功以後,會當即檢查狀態,若是任務已經完成馬上進行回調,因此這裏不用擔憂啦。OK,下面看看Guava-Future的實現。

Guava-Future

首先引入guava的Maven依賴:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>
public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                return 100;
            }
        });
        
        future.addListener(new Runnable() {
            @Override
            public void run() {
                System.out.println("調用成功");
            }
        }, executorService);
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }
}

ListenableFuture 能夠經過 addListener 方法增長回調函數,通常用於不在意執行結果的地方。若是須要在執行成功時獲取結果或者執行失敗時獲取異常信息,須要用到 Futures 工具類的 addCallback 方法:

Futures.addCallback(future, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(@Nullable Integer result) {
        System.out.println("成功,計算結果:" + result);
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("失敗");
    }
}, executorService);

前面提到除了 ListenableFuture 外,還有一個 SettableFuture 類也支持回調能力。它實現自 ListenableFuture ,因此擁有 ListenableFuture 的全部能力。

public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListenableFuture<Integer> future = submit(executorService);
        Futures.addCallback(future, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(@Nullable Integer result) {
                System.out.println("成功,計算結果:" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("失敗:" + t.getMessage());
            }
        }, executorService);
        Thread.sleep(1000);
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }

    private static ListenableFuture<Integer> submit(Executor executor) {
        SettableFuture<Integer> future = SettableFuture.create();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                // 返回值
                future.set(100);
                // 設置異常信息
//                future.setException(new RuntimeException("custom error!"));
            }
        });
        return future;
    }
}

看起來用法上沒有太多差異,可是有一個很容易被忽略的重要問題。當 SettableFuture 的這種方式最後調用了 cancel 方法後,線程池中的任務仍是會繼續執行,而經過 submit 方法返回的 ListenableFuture 方法則會當即取消執行,這點尤爲要注意。下面看看源碼:

和Netty的Future同樣,Guava也是經過實現了自定義的 ExecutorService 實現類 ListeningExecutorService 來重寫了 submit 方法。

public interface ListeningExecutorService extends ExecutorService {
  <T> ListenableFuture<T> submit(Callable<T> task);
  ListenableFuture<?> submit(Runnable task);
  <T> ListenableFuture<T> submit(Runnable task, T result);
}

一樣的,newTaskFor 方法也被進行了重寫,返回了自定義的Future類:TrustedListenableFutureTask

@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return TrustedListenableFutureTask.create(runnable, value);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return TrustedListenableFutureTask.create(callable);
}

任務調用會走 TrustedFutureInterruptibleTask 的run方法:

@Override
public void run() {
    TrustedFutureInterruptibleTask localTask = task;
    if (localTask != null) {
        localTask.run();
    }
}

@Override
public final void run() {
    if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {
        return; // someone else has run or is running.
    }
    try {
        // 抽象方法,子類進行重寫
        runInterruptibly();
    } finally {
        if (wasInterrupted()) {
            while (!doneInterrupting) {
                Thread.yield();
            }
        }
    }
}

最終仍是調用到 TrustedFutureInterruptibleTask 的 runInterruptibly 方法,等待任務完成後調用 set 方法。

@Override
void runInterruptibly() {
    if (!isDone()) {
        try {
            set(callable.call());
        } catch (Throwable t) {
            setException(t);
        }
    }
}

protected boolean set(@Nullable V value) {
    Object valueToSet = value == null ? NULL : value;
    // CAS設置值
    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
        complete(this);
        return true;
    }
    return false;
}

在 complete 方法的最後會獲取到Listener進行回調。

上面提到的 SettableFuture 和 ListenableFuture 的 cancel 方法效果不一樣,緣由在於一個重寫了 afterDone 方法而一個沒有。

下面是 ListenableFuture 的 afterDone 方法:

@Override
protected void afterDone() {
    super.afterDone();

    if (wasInterrupted()) {
        TrustedFutureInterruptibleTask localTask = task;
        if (localTask != null) {
            localTask.interruptTask();
        }
    }

    this.task = null;
}

wasInterrupted 用來判斷是否調用了 cancel (cancel方法會設置一個取消對象Cancellation到value中)

protected final boolean wasInterrupted() {
    final Object localValue = value;
    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
}

interruptTask 方法經過線程的 interrupt 方法真正取消線程任務的執行:

final void interruptTask() {
    Thread currentRunner = runner;
    if (currentRunner != null) {
        currentRunner.interrupt();
    }
    doneInterrupting = true;
}

由 Callback Hell 引出 Promise 模式

若是你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,若是你對前端不熟悉,也沒關係,咱們先來看看回調地獄(Callback Hell)是個什麼概念。

回調是一種咱們推崇的異步調用方式,但也會遇到問題,也就是回調的嵌套。當須要多個異步回調一塊兒書寫時,就會出現下面的代碼(以 js 爲例):

asyncFunc1(opt, (...args1) => { 
  asyncFunc2(opt, (...args2) => {       
    asyncFunc3(opt, (...args3) => {            
      asyncFunc4(opt, (...args4) => {
          // some operation
      });
    });
  });
});

雖然在 JAVA 業務代碼中不多出現回調的多層嵌套,但總歸是個問題,這樣的代碼不易讀,嵌套太深修改也麻煩。因而 ES6 提出了 Promise 模式來解決回調地獄的問題。可能就會有人想問:java 中存在 Promise 模式嗎?答案是確定的。

前面提到了 Netty 和 Guava 的擴展都提供了 addListener 這樣的接口,用於處理 Callback 調用,但其實 jdk1.8 已經提供了一種更爲高級的回調方式:CompletableFuture。首先嚐試用 CompletableFuture 來重寫上面回調的問題。

public class CompletableFutureTest {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("開始耗時計算:" + DateUtils.getNow());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結束耗時計算:" + DateUtils.getNow());
            return 100;
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("回調結果:" + result);
        });
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }
}

使用CompletableFuture耗時操做沒有佔用主線程的時間片,達到了異步調用的效果。咱們也不須要引入任何第三方的依賴,這都是依賴於 java.util.concurrent.CompletableFuture 的出現。CompletableFuture 提供了近 50 多個方法,大大便捷了 java 多線程操做,和異步調用的寫法。

使用 CompletableFuture 解決回調地獄問題:

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        long l = System.currentTimeMillis();
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("在回調中執行耗時操做...");
            Thread.sleep(10000);
            return 100;
        });
        completableFuture = completableFuture.thenCompose(i -> {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("在回調的回調中執行耗時操做...");
                Thread.sleep(10000);
                return i + 100;
            });
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("計算結果:" + result);
        });
        System.out.println("主線程運算耗時:" + (System.currentTimeMillis() - l) + " ms");
        new CountDownLatch(1).await();
    }
}

輸出:

在回調中執行耗時操做...主線程運算耗時:58 ms在回調的回調中執行耗時操做...計算結果:200

使用 thenCompose 或者 thenComposeAsync 等方法能夠實現回調的回調,且寫出來的方法易於維護。

總的看來,爲Future模式增長回調功能就不須要阻塞等待結果的返回而且不須要消耗無謂的CPU資源去輪詢處理狀態,JDK8以前使用Netty或者Guava提供的工具類,JDK8以後則可使用自帶的 CompletableFuture 類。Future 有兩種模式:未來式和回調式。而回調式會出現回調地獄的問題,由此衍生出了 Promise 模式來解決這個問題。這纔是 Future 模式和 Promise 模式的相關性。

做者注:歡迎關注筆者公衆號(ID:weknow619,或掃頭像),按期分享IT互聯網、金融等工做經驗心得、人生感悟,歡迎訂閱交流,目前就任阿里-移動事業部,須要大廠內推的也可到公衆號砸簡歷。

相關文章
相關標籤/搜索