Netty系列-netty的Future 和 Promise

首先咱們來看看future和promise接口總體設計java

最頂層的future是jdk的,第二個是netty自定義的future,兩個同名,繼承關係promise

看看jdk的future接口異步

public interface Future<V> {
    // 取消任務
    boolean cancel(boolean mayInterruptIfRunning);
    // 任務是否取消
    boolean isCancelled();
    // 任務是否完成
    boolean isDone();
    // 阻塞的獲取執行的結果
    V get() throws InterruptedException, ExecutionException;
    // 在必定時間內-超時-阻塞的獲取執行的結果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

瞅瞅netty的future接口async

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 是否成功
    boolean isSuccess();
    // 是否取消
    boolean isCancellable();
    Throwable cause();
     // 添加listener進行回調
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 阻塞的等待任務執行,若是失敗則拋出失敗緣由的異常
    Future<V> sync() throws InterruptedException;
    // 不響應中斷等待異常
    Future<V> syncUninterruptibly();

    // 阻塞等待任務執行,失敗不拋異常
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);

    // 立刻獲取到任務的結果,不阻塞,而jdk的future是阻塞的
    V getNow();


    // 取消任務執行,若是取消成功,任務會由於 CancellationException 異常而致使失敗
    //      也就是 isSuccess()==false,同時上面的 cause() 方法返回 CancellationException 的實例。
    // mayInterruptIfRunning 說的是:是否對正在執行該任務的線程進行中斷(這樣才能中止該任務的執行),
    //       彷佛 Netty 中 Future 接口的各個實現類,都沒有使用這個參數
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

netty的future在jdk的基礎上擴展了它須要的方法,sync和await的區別咱們放到下面看實現類的時候說ide

同時咱們也能夠看到,這個future接口跟io操做是無關的函數

接下來咱們看看ChannelFuture接口,接口註釋上寫的很清楚,咱們來看看ui

* The result of an asynchronous {@link Channel} I/O operation.
* <p>
* All I/O operations in Netty are asynchronous. It means any I/O calls will
* return immediately with no guarantee that the requested I/O operation has
* been completed at the end of the call. Instead, you will be returned with
* a {@link ChannelFuture} instance which gives you the information about the
* result or status of the I/O operation.
* <p>
* A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
* When an I/O operation begins, a new future object is created. The new future
* is uncompleted initially - it is neither succeeded, failed, nor cancelled
* because the I/O operation is not finished yet. If the I/O operation is
* finished either successfully, with failure, or by cancellation, the future is
* marked as completed with more specific information, such as the cause of the
* failure. Please note that even failure and cancellation belong to the
* completed state.
全部io操做都是異步的,一個io操做的調用會當即返回一個帶有結果或者狀態的io實例。
io操做要麼是未完成的,要麼是完成的。當它開始時,future會被建立,一開始是未完成的,未完成的時候沒有成功、失敗或者取消狀態
當它是完成的時候,能夠是失敗或者取消的,失敗或者取消緣由會被附加到future上。
* <pre> * +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+ * </pre>

上面那個狀態遷移圖很清楚了,在兩種過程的時候會有什麼狀態,咱們看看接口this

public interface ChannelFuture extends Future<Void> {
    // 返回future關聯的channel
    Channel channel();
    // 重寫下面幾個方法,修改返回值爲channelfuture
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();
    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();

    /**
     * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
     * following methods:
     * <ul>
     *     <li>{@link #addListener(GenericFutureListener)}</li>
     *     <li>{@link #addListeners(GenericFutureListener[])}</li>
     *     <li>{@link #await()}</li>
     *     <li>{@link #await(long, TimeUnit)} ()}</li>
     *     <li>{@link #await(long)} ()}</li>
     *     <li>{@link #awaitUninterruptibly()}</li>
     *     <li>{@link #sync()}</li>
     *     <li>{@link #syncUninterruptibly()}</li>
     * </ul>
       標記該future是void的,使不能使用上面的方法
     */
    boolean isVoid();
}

 netty實際上是強烈建議直接經過添加監聽器的方式來獲取io操做結果,或者進行後續操做的,ChannelFuture能夠增長或者刪除一個多個 GenericFutureListener,它定義以下spa

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

執行完後會回調 operationComplete方法線程

注意一點,不要在ChannelHandler中調用ChannelFuture的await方法,會致使死鎖。這是由於發起io操做後,由io線程負責異步通知發起io操做的用戶線程,若是io線程和用戶線程是同一個的話,就會致使io線程等待本身通知操做完成,這就會致使死鎖,本身掛死本身。

咱們繼續看promise接口,

public interface Promise<V> extends Future<V> {

    // 標記該future成功及設置結果,並通知全部listener
    // 若是失敗的話拋異常
    Promise<V> setSuccess(V result);

     // 和setsuccess同樣,只是失敗的話返回false
    boolean trySuccess(V result);

     // 標記future失敗,而後通知listener
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

    // 標記該future 不可被取消
    boolean setUncancellable();

    // 下面跟ChannelFuture同樣,都是覆蓋重寫方法
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

Promise是可寫的future,Future自己並無寫操做相關的接口,netty經過Promise對其進行擴展,用於設置io操做的結果。Promise 實例內部是一個任務,任務的執行每每是異步的,一般是一個線程池來處理任務。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未來會被某個執行任務的線程在執行完成之後調用,同時那個線程在調用 setSuccess(result) 或 setFailure(t) 後會回調 listeners 的回調函數(固然,回調的具體內容不必定要由執行任務的線程本身來執行,它能夠建立新的線程來執行,也能夠將回調任務提交到某個線程池來執行)。並且,一旦 setSuccess(...) 或 setFailure(...) 後,那些 await() 或 sync() 的線程就會從等待中返回。

接下來咱們看看ChannelPromise

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    @Override
    Channel channel();
@Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess();
boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause);
@Override ChannelPromise addListener(GenericFutureListener
<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override ChannelPromise sync()
throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. */ ChannelPromise unvoid(); }

看方法其實很清楚,基本都是覆寫綜合了ChannelFuture和Promise接口的,就返回值變了,看看咱們一開始的類繼承圖,ChannelPromise 接口同時繼承了 ChannelFuture 和 Promise,最終繼承的都是Future接口,接下來咱們看看具體的實現類DefaultPromise吧

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // 爲了後面操做成功後經過cas來保存結果到result字段
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // result爲null的時候默認值
    private static final Object SUCCESS = new Object();
    // 操做成功後cas比對的值
    private static final Object UNCANCELLABLE = new Object();
   
    // 保存執行的結果
    private volatile Object result;
    // 線程執行器
    private final EventExecutor executor;
    // 監聽者
    private Object listeners;
     /**
     * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
     */
    // 等待這個 promise 的線程數(調用sync()/await()進行等待的線程數量)
    private short waiters;

    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
     * executor changes.
     */
     // 是否喚醒正在等待線程,用於防止重複執行喚醒,否則會重複執行 listeners 的回調方法
    private boolean notifyingListeners;

    ....
 }

 

屬性看完了,咱們能夠看看它主要的方法

    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean trySuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

set和try的區別就是返回值不同而已,咱們看看底層的方法 setSuccess0

    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

就是經過cas來把objResult保存到result屬性上,而後Notify其餘線程。其餘方法都差很少,能夠比對看看

咱們再看個await方法

public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

若是當前Promise已被設置,則返回;若是碰到線程中斷則響應中斷;檢查死鎖,因爲在IO線程中調用Promise的await方法或者sync方法會致使死鎖,前面說過的,因此須要檢驗保護,斷定當前線程是不是io線程;同步鎖定當前Promise對象,循環斷定是否設置完成,使用循環是避免僞喚醒,防止線程 被意外喚醒致使功能異常。

接下來咱們順便也看下sync方法

@Override
    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }

首先調用await方法,而後看是否須要拋出異常,若是任務失敗的話就從新拋出異常,這也是兩方法區別了。

DefaultChannelPromise實現咱們就不看了,基本都是基於DefaultPromise的,只是返回值都是 ChannelPromise而已。

下面咱們來寫個例子吧

public class ChannelPromiseExample extends Thread{

    private static final Object object = new Object();

    public static void main(String[] args) {
        final DefaultEventExecutor executor = new DefaultEventExecutor();
        final Promise<Integer> promise = executor.newPromise();

        // 任務seccess或者failure來回調operationComplete 方法
        promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future)
                    throws Exception {
                System.out.println(Thread.currentThread().getName() + " 第一個監聽器");
                if (future.isSuccess()) {
                    System.out.println("任務成功,result:" + future.get());
                } else {
                    System.out.println("任務失敗,result:" + future.cause());
                }
            }
        }).addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future)
                    throws Exception {
                System.out.println(Thread.currentThread().getName() + " 第二個監聽器");
            }
        });
        // 提交任務
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //能夠設置成功或者失敗
                //promise.setSuccess(1);
                promise.setFailure(new Throwable("FAILURE"));
            }
        });


        try {
            System.out.println("promise wait begin");
            //promise.sync();
            promise.await();
            System.out.println("promise wait end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

能夠體會下await和sync的區別

相關文章
相關標籤/搜索