Netty 中的異步編程 Future 和 Promise

Netty 中大量 I/O 操做都是異步執行,本篇博文來聊聊 Netty 中的異步編程。前端

Java Future 提供的異步模型

JDK 5 引入了 Future 模式。Future 接口是 Java 多線程 Future 模式的實現,在 java.util.concurrent包中,能夠來進行異步計算。java

對於異步編程,咱們想要的實現是:提交一個任務,在任務執行期間提交者能夠作別的事情,這個任務是在異步執行的,當任務執行完畢通知提交者任務完成獲取結果。編程

那麼在 Future 中是怎麼實現的呢?咱們先看接口定義:數組

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

咱們看一個示例:promise

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println("start");
        Future<Integer> submit = executorService.submit(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        Integer value = null;
        try {
            value = submit.get();
        } catch (Exception e) {
            e.printStackTrace();
        } 
        System.out.println(value);
        System.out.println("end");
        
    }
}

Futrue 的使用方式是:投遞一個任務到 Future 中執行,操做完以後調用 Future#get() 或者 Future#isDone() 方法判斷是否執行完畢。從這個邏輯上看, Future 提供的功能是:用戶線程須要主動輪詢 Future 線程是否完成當前任務,若是不經過輪詢是否完成而是同步等待獲取則會阻塞直到執行完畢爲止。因此從這裏看,Future並非真正的異步,由於它少了一個回調,充其量只能算是一個同步非阻塞模式。多線程

executorService.submit()方法獲取帶返回值的 Future 結果有兩種方式:併發

  1. 一種是經過實現 Callable接口;
  2. 第二種是中間變量返回。繼承 Future 的子類: FutureTask,經過 FutureTask 返回異步結果而不是在主線程中獲取(FutureTask 本質也是使用 Callable 進行建立)。

上面兩種方式的代碼就變爲這樣:less

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println("start");
        //方式1 經過 executorService 提交一個異步線程
        //Future<Integer> submit = executorService.submit(new NewCallableTask());

        //方式2 經過 FutureTask 包裝異步線程的返回,返回結果在 FutureTask 中獲取而不是 在提交線程中
        FutureTask<Integer> task = new FutureTask<>(new NewCallableTask());
        executorService.submit(task);
        //-------------方式2--------------

        Integer value = null;
        try {
            value = task.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(value);
        System.out.println("end");

    }

    /**
     * 經過實現 Callable 接口
     */
     static class NewCallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }
    }

}

通常在使用線程池建立線程執行任務的時候會有兩種方式,要麼實現 Runnable 接口,要麼實現 Callable 接口,它們的區別在於:異步

  1. Callable 能夠在任務結束的時候提供一個返回值,Runnable 沒法提供這個功能;
  2. Callable 的 call 方法分能夠拋出異常,而 Runnable 的 run 方法不能拋出異常。

而咱們的異步返回天然是使用 Callable 方式。那麼 Callable 是如何實現的呢?ide

從 Callable 被提交的地方入手:executorService.submit(task), ExecutorService 是一個接口,他的默認實現類是:AbstractExecutorService,咱們看這裏的 submit()實現方式:

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

能夠看到將 Callable 又包裝成了 RunnableFuture。而這個 RunnableFuture 就比較神奇,它同時繼承了 Runnable 和 Future ,既有線程的能力又有可攜帶返回值的功能。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

因此再看 submit()方法,實際上是將 RunnableFuture 線程送入線程池執行,執行是一個新線程,只是這個執行的對象提供了 get()方法來獲取執行結果。

那麼 Callable 優點如何變爲 RunnableFuture 的呢?咱們看 newTaskFor(task)方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

將 Callable 包裝爲 FutureTask 對象,看到這裏又關聯到 FutureTask , :

public class FutureTask<V> implements RunnableFuture<V> {
  
}

能夠看到 FutureTask 是 RunnableFuture 的子類,這也就解釋了上面的示例爲何在線程池中能夠提交 FutureTask 實例。

更詳細的執行過程這裏就再也不分析,重點剖析 Future 的實現過程,它並非真正的異步,沒有實現回調。因此在Java8 中又新增了一個真正的異步函數:CompletableFuture。

CompletableFuture 非阻塞異步編程模型

Java 8 中新增長了一個類:CompletableFuture,它提供了很是強大的 Future 的擴展功能,最重要的是實現了回調的功能。

使用示例:

public class CallableFutureTest {

		
    public static void main(String[] args) {
        System.out.println("start");
      	/**
         * 異步非阻塞
         */
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("sleep done");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("done");
    }
}

CompletableFuture.runAsync()方法提供了異步執行無返回值任務的功能。

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // do something
    return "result";
}, executorService);

CompletableFuture.supplyAsync()方法提供了異步執行有返回值任務的功能。

CompletableFuture源碼中有四個靜態方法用來執行異步任務:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}

public static CompletableFuture<Void> runAsync(Runnable runnable){..}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}

前面兩個能夠看到是帶返回值的方法,後面兩個是不帶返回值的方法。同時支持傳入自定義的線程池,若是不傳入線程池的話默認是使用 ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。

合併兩個異步任務

若是有兩個任務須要異步執行,且後面須要對這兩個任務的結果進行合併處理,CompletableFuture 也支持這種處理:

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
    return task1 + task2; // return "Task1Task2" String
});

經過 CompletableFuture.thenCombineAsync()方法獲取兩個任務的結果真後進行相應的操做。

下一個依賴上一個的結果

若是第二個任務依賴第一個任務的結果:

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
    return CompletableFuture.supplyAsync(() -> {
        return task1 + "Task2"; // return "Task1Task2" String
    });
}, executorService);

CompletableFuture.thenComposeAsync()支持將第一個任務的結果傳入第二個任務中。

經常使用 API 介紹

  1. 拿到上一個任務的結果作後續操做,上一個任務完成後的動做
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面四個方法表示在當前階段任務完成以後下一步要作什麼。whenComplete 表示在當前線程內繼續作下一步,帶 Async 後綴的表示使用新線程去執行。

  1. 拿到上一個任務的結果作後續操做,使用 handler 來處理邏輯,能夠返回與第一階段處理的返回類型不同的返回類型。

    public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

    Handler 與 whenComplete 的區別是 handler 是能夠返回一個新的 CompletableFuture 類型的。

    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        return "hahaha";
    }).handle((r, e) -> {
        return 1;
    });
  2. 拿到上一個任務的結果作後續操做, thenApply方法

    public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

    注意到 thenApply 方法的參數中是沒有 Throwable,這就意味着若有有異常就會當即失敗,不能在處理邏輯內處理。且 thenApply 返回的也是新的 CompletableFuture。 這就是它與前面兩個的區別。

  3. 拿到上一個任務的結果作後續操做,能夠不返回任何值,thenAccept方法

    public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

    看這裏的示例:

    CompletableFuture.supplyAsync(() -> {
      return "result";
    }).thenAccept(r -> {
      System.out.println(r);
    }).thenAccept(r -> {
      System.out.println(r);
    });

    執行完畢是不會返回任何值的。

CompletableFuture 的特性提如今執行完 runAsync 或者 supplyAsync 以後的操做上。CompletableFuture 可以將回調放到與任務不一樣的線程中執行,也能將回調做爲繼續執行的同步函數,在與任務相同的線程中執行。它避免了傳統回調最大的問題,那就是可以將控制流分離到不一樣的事件處理器中。

另外當你依賴 CompletableFuture 的計算結果才能進行下一步的時候,無需手動判斷當前計算是否完成,能夠經過 CompletableFuture 的事件監聽自動去完成。

Netty 中的異步編程

說 Netty 中的異步編程以前先說一個異步編程模型:Future/Promise異步模型。

future和promise起源於函數式編程和相關範例(如邏輯編程 ),目的是將值(future)與其計算方式(promise)分離,從而容許更靈活地進行計算,特別是經過並行化。

Future 表示目標計算的返回值,Promise 表示計算的方式,這個模型將返回結果和計算邏輯分離,目的是爲了讓計算邏輯不影響返回結果,從而抽象出一套異步編程模型。那計算邏輯如何與結果關聯呢?它們之間的紐帶就是 callback。

引用自:https://zh.wikipedia.org/wiki/Future%E4%B8%8Epromise

在 Netty 中的異步編程就是基於該模型來實現。Netty 中很是多的異步調用,最簡單的例子就是咱們 Server 和 Client 端啓動的例子:

Server:

Client:

Netty 中使用了一個 ChannelFuture 來實現異步操做,看似與 Java 中的 Future 類似,咱們看一下代碼:

public interface ChannelFuture extends Future<Void> {
  
}

這裏 ChannelFuture 繼承了一個 Future,這是 Java 中的 Future 嗎?跟下去發現並非 JDK 的,而是 Netty 本身實現的。該類位於:io.netty.util.concurrent包中:

public interface Future<V> extends java.util.concurrent.Future<V> {
  
  // 只有IO操做完成時才返回true
  boolean isSuccess();
  // 只有當cancel(boolean)成功取消時才返回true
  boolean isCancellable();
  // IO操做發生異常時,返回致使IO操做以此的緣由,若是沒有異常,返回null
  Throwable cause();
  // 向Future添加事件,future完成時,會執行這些事件,若是add時future已經完成,會當即執行監聽事件
  Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  // 移除監聽事件,future完成時,不會觸發
  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  // 等待future done
  Future<V> sync() throws InterruptedException;
  // 等待future done,不可打斷
  Future<V> syncUninterruptibly();
  // 等待future完成
  Future<V> await() throws InterruptedException;
  // 等待future 完成,不可打斷
  Future<V> awaitUninterruptibly();
  boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  boolean await(long timeoutMillis) throws InterruptedException;
  boolean awaitUninterruptibly(long timeout, TimeUnit unit);
  boolean awaitUninterruptibly(long timeoutMillis);
  // 馬上得到結果,若是沒有完成,返回null
  V getNow();
  // 若是成功取消,future會失敗,致使CancellationException
  @Override
  boolean cancel(boolean mayInterruptIfRunning);
  
}

Netty 本身實現的 Future 繼承了 JDK 的 Future,新增了 sync() await() 用於阻塞等待,還加了 Listeners,只要任務結束去回調 Listener 就能夠了,那麼咱們就不必定要主動調用 isDone() 來獲取狀態,或經過 get() 阻塞方法來獲取值。

Netty的 Future 與 Java 的 Future 雖然類名相同,但功能上略有不一樣,Netty 中引入了 Promise 機制。在 Java 的 Future 中,業務邏輯爲一個 Callable 或 Runnable 實現類,該類的 call()run()執行完畢意味着業務邏輯的完結,在 Promise 機制中,能夠在業務邏輯中人工設置業務邏輯的成功與失敗,這樣更加方便的監控本身的業務邏輯。

public interface Promise<V> extends Future<V> {
	// 設置future執行結果爲成功
    Promise<V> setSuccess(V result);
	
	// 嘗試設置future執行結果爲成功,返回是否設置成功
    boolean trySuccess(V result);
	// 設置失敗
    Promise<V> setFailure(Throwable cause);
	// 嘗試設置future執行結果爲失敗,返回是否設置成功 
    boolean tryFailure(Throwable cause);
    // 設置爲不能取消
    boolean setUncancellable();
	
	// 源碼中,如下爲覆蓋了Future的方法,例如;
	
	Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
	
	@Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}

Promise 接口繼承自 Future 接口,重點添加了上述幾個方法,能夠人工設置 future 的執行成功與失敗,並通知全部監聽的 listener。

從 Future 和 Promise 提供的方法來看,Future 都是 get 類型的方法,主要用來判斷當前任務的狀態。而 Promise 中是 set 類型的方法,主要來對任務的狀態來進行操做。這裏就體現出來將 結果和操做過程分離的設計。

Promise 實現類是DefaultPromise類,該類十分重要,Future 的 listener 機制也是由它實現的,因此咱們先來分析一下該類。先來看一下它的重要屬性:

// 能夠嵌套的Listener的最大層數,可見最大值爲8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
                                                           SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
// 異步操做不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
// 異步操做失敗時保存異常緣由
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
  new CancellationException(), DefaultPromise.class, "cancel(...)"));

第一個套 listener,是指在 listener 的 operationComplete() 方法中,能夠再次使用 future.addListener() 繼續添加 listener,Netty 限制的最大層數是8,用戶可以使用系統變量io.netty.defaultPromise.maxListenerStackDepth設置。

爲了更好的說明,先寫了一個示例,Netty 中的 Future/Promise模型是能夠單獨拿出來使用的。

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

import java.util.concurrent.TimeUnit;

/**
 * @author rickiyang
 * @date 2020-04-19
 * @Desc TODO
 */
public class PromiseTest {

    public static void main(String[] args) {
        PromiseTest testPromise = new PromiseTest();
        Promise<String> promise = testPromise.doSomething("哈哈");
        promise.addListener(future -> System.out.println(promise.get()+", something is done"));

    }

    /**
     * 建立一個DefaultPromise並返回,將業務邏輯放入線程池中執行
     * @param value
     * @return
     */
    private Promise<String> doSomething(String value) {
        NioEventLoopGroup loop = new NioEventLoopGroup();
        DefaultPromise<String> promise = new DefaultPromise<>(loop.next());
        loop.schedule(() -> {
            try {
                Thread.sleep(1000);
                promise.setSuccess("執行成功。" + value);
                return promise;
            } catch (InterruptedException ignored) {
                promise.setFailure(ignored);
            }
            return promise;
        }, 0, TimeUnit.SECONDS);
        return promise;
    }

}

經過這個例子能夠看到,Promise 可以在業務邏輯線程中通知 Future 成功或失敗,因爲 Promise 繼承了 Netty 的 Future,所以能夠加入監聽事件。而 Future 和 Promise 的好處在於,獲取到 Promise 對象後能夠爲其設置異步調用完成後的操做,而後當即繼續去作其餘任務。

來看一下 addListener() 方法:

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
  checkNotNull(listener, "listener");
	//併發控制,保證多線程狀況下只有一個線程執行添加操做
  synchronized (this) {
    addListener0(listener);
  }
	// 操做完成,通知監聽者
  if (isDone()) {
    notifyListeners();
  }

  return this;
}



private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
  if (listeners == null) {
    listeners = listener;
  } else if (listeners instanceof DefaultFutureListeners) {
    // 若是當前Promise實例持有listeners的是DefaultFutureListeners類型,則調用它的add()方法進行添加
    ((DefaultFutureListeners) listeners).add(listener);
  } else {
    // 步入這裏說明當前Promise實例持有listeners爲單個GenericFutureListener實例,須要轉換爲DefaultFutureListeners實例
    listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
  }
}

這裏看到有一個全局變量 listeners,咱們看到他的定義:

private Object listeners;

爲啥會是一個 Object 類型的對象呢,不是應該是 List 或者是數組纔對嘛。Netty之因此這樣設計,是由於大多數狀況下 listener 只有一個,用集合和數組都會形成浪費。當只有一個 listener 時,該字段爲一個 GenericFutureListener 對象;當多於一個 listener 時,該字段爲 DefaultFutureListeners ,能夠儲存多個 listener。

咱們再來看 notifyListeners() 方法:

private void notifyListeners() {
  EventExecutor executor = executor();
  //當前EventLoop線程須要檢查listener嵌套
  if (executor.inEventLoop()) {
    final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
    //這裏是當前listener的嵌套層數
    final int stackDepth = threadLocals.futureListenerStackDepth();
    if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
      threadLocals.setFutureListenerStackDepth(stackDepth + 1);
      try {
        notifyListenersNow();
      } finally {
        threadLocals.setFutureListenerStackDepth(stackDepth);
      }
      return;
    }
  }
	//外部線程直接提交給新線程執行
  safeExecute(executor, new Runnable() {
    @Override
    public void run() {
      notifyListenersNow();
    }
  });
}

這裏有個疑問就是爲何要設置當前的調用棧深度+1。

接着看真正執行通知的方法:

private void notifyListenersNow() {
  Object listeners;
  synchronized (this) {
     // 正在通知或已沒有監聽者(外部線程刪除)直接返回
    if (notifyingListeners || this.listeners == null) {
      return;
    }
    notifyingListeners = true;
    listeners = this.listeners;
    this.listeners = null;
  }
  for (;;) {
    //只有一個listener
    if (listeners instanceof DefaultFutureListeners) {
      notifyListeners0((DefaultFutureListeners) listeners);
    } else {
      //有多個listener
      notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
    }
    synchronized (this) {
      if (this.listeners == null) {
        // 執行完畢且外部線程沒有再添加監聽者
        notifyingListeners = false;
        return;
      }
      //外部線程添加了新的監聽者繼續執行
      listeners = this.listeners;
      this.listeners = null;
    }
  }
}

Netty 中 DefalutPromise 是一個很是經常使用的類,這是 Promise 實現的基礎。DefaultChannelPromise DefalutPromise 的子類,加入了 channel 這個屬性。

Promise 目前支持兩種類型的監聽器:

  • GenericFutureListener:支持泛型的 Future ;
  • GenericProgressiveFutureListener:它是GenericFutureListener的子類,支持進度表示和支持泛型的Future 監聽器(有些場景須要多個步驟實現,相似於進度條那樣)。

爲了讓 Promise 支持多個監聽器,Netty 添加了一個默認修飾符修飾的DefaultFutureListeners類用於保存監聽器實例數組:

final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners
    
    // 這個構造相對特別,是爲了讓Promise中的listeners(Object類型)實例由單個GenericFutureListener實例轉換爲DefaultFutureListeners類型
    @SuppressWarnings("unchecked")
    DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        // 注意這裏,每次擴容數組長度是原來的2倍
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        // 把當前的GenericFutureListener加入數組中
        listeners[size] = l;
        // 監聽器總數量加1
        this.size = size + 1;
        // 若是爲GenericProgressiveFutureListener,則帶進度指示的監聽器總數量加1
        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                // 計算須要須要移動的監聽器的下標
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    // listenersToMove後面的元素所有移動到數組的前端
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                // 當前監聽器總量的最後一個位置設置爲null,數量減1
                listeners[-- size] = null;
                this.size = size;
                // 若是監聽器是GenericProgressiveFutureListener,則帶進度指示的監聽器總數量減1
                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }
    
    // 返回監聽器實例數組
    public GenericFutureListener<? extends Future<?>>[] listeners() {
        return listeners;
    }
    
    // 返回監聽器總數量
    public int size() {
        return size;
    }
    
    // 返回帶進度指示的監聽器總數量
    public int progressiveSize() {
        return progressiveSize;
    }
}

以上就是關於 Promise 和監聽器相關的實現分析,再回到以前的啓動類,是否是還有一個 sync() 方法:

@Override
public Promise<V> sync() throws InterruptedException {
  await();
  rethrowIfFailed();
  return this;
}


public Promise<V> await() throws InterruptedException {
  // 異步操做已經完成,直接返回
  if (isDone()) {
    return this;    
  }
  if (Thread.interrupted()) {
    throw new InterruptedException(toString());
  }
  // 死鎖檢測
  checkDeadLock();
  // 同步使修改waiters的線程只有一個
  synchronized (this) {
    while (!isDone()) { // 等待直到異步操做完成
      incWaiters();   // ++waiters;
      try {
        wait(); // JDK方法
      } finally {
        decWaiters(); // --waiters
      }
    }
  }
  return this;
}

這裏其實就是一個同步檢測當前事件是否完成的過程。

以上就是 Netty 中實現的 Future/Promise 異步回調機制。實現並非很難懂,代碼很值得學習。除了 Netty 中實現了 Future/Promise模型,在Guava中也有相關的實現,你們平常使用能夠看習慣引用相關的包。

Guava實現:

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>21.0</version>
</dependency>
  
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
Futures.addCallback(future, new FutureCallback<Integer>() {
    public void onSuccess(Integer result) {
        System.out.println("success:" + result);
    }

    public void onFailure(Throwable throwable) {
        System.out.println("fail, e = " + throwable);
    }
});

Thread.currentThread().join();
相關文章
相關標籤/搜索