netty中的future和promise源碼分析(二)

前面一篇netty中的future和promise源碼分析(一)中對future進行了重點分析,接下來說一講promise.
promise是可寫的future,從future的分析中能夠發如今其中沒有寫操做的接口,netty特地使promise擴展了future,能夠對異步操做結果進行設置。segmentfault

(一)defaultpromise

包含的字段

//原子保存異步操做結果
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
//異步操做結果
private volatile Object result;
private final EventExecutor executor;

操做結果通知

爲了可以支持異步地獲取操做結果,netty中用通知的方式來對後續的listener中的操做,操做結果等進行控制。通知的前提包括success,fail,cancel三種狀態。數組

  1. success狀態:setsuccess()方法
  • 第一步:異步操做結束調用setSuccess(V result)或trySuccess(V result)
    方法,將操做結果當作參數傳入,來通知能夠對結果進行使用。promise

    public Promise<V> setSuccess(V result) {
       if (setSuccess0(result)) {
       //觸發listener中的operationcomplete()方法
           notifyListeners();
           return this;
       }
       throw new IllegalStateException("complete already: " + this);
      }
  • 第二步:首先調用setsuccess0()方法對result變量進行保存,若是保存成功則經過notifyListeners()觸發listener中的operationcomplete()方法.(此處先不對notifylisteners()方法進行分析,見下文)異步

    #setsuccess0()方法:
      private boolean setSuccess0(V result) {
       return setValue0(result == null ? SUCCESS : result);
      }
    
      #setvalue0()方法:
      private boolean setValue0(Object objResult) {
       if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
           RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
           checkNotifyWaiters();
           return true;
       }
       return false;
       }

    注意:在success狀態下保存結果時,若是result(異步操做結果)爲null,則將promise內部的result設置爲常量SUCCESS。再者,在promise的result中,只容許保存一次,因此netty採用cas保證結果只保存一遍,若結果保存出錯返回false.ide

    #SUCCESS常量
      private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
  • 第三步:保存完結果後,通知全部同步等待異步操做結果的線程。oop

    private synchronized void checkNotifyWaiters() {
       if (waiters > 0) {
           notifyAll();
       }
      }

    2.success狀態:trysuccess()方法源碼分析

    trysuccess()方法與setsuccess()方法大同小異,只不過在保存結果出錯的時候,返回false,而setsuccess()拋出一個異常信息。this

    public boolean trySuccess(V result) {
       if (setSuccess0(result)) {
           notifyListeners();
           return true;
       }
       return false;
    }

    3.fail狀態:線程

    fail狀態下通知機制和success幾乎相同,區別在於保存異步操做結果的時候,fail狀態保存的是使用CauseHolder進行封裝的異常信息對象。設計

    private boolean setFailure0(Throwable cause) {
       return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }

    4.cancel狀態:
    cancel狀態,表示異步操做的時候,對promise對象進行了cancel操做。

    public boolean cancel(boolean mayInterruptIfRunning) {
       if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
           checkNotifyWaiters();
           notifyListeners();
           return true;
       }
       return false;
    }

    一樣的,cancel後也和success和fail同樣,對result進行了設置。在success的時候,容許初始值爲null和UNCANCELLABLE常量(表示不容許cancel),在cancel狀態只容許爲null.

    # CANCELLATION_CAUSE_HOLDER
      //封裝了CancellationException異常。
     private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
    new CancellationException(), DefaultPromise.class, "cancel(...)"));

    5.操做結果通知總結:

    首先,經過調用setsuccess()等方法,啓動通知機制;
    而後,將異步操做結果進行保存,僅容許保存一次,不然會返回false.
    保存好信息後,觸發listener中的操做,還會通知全部同步等待異步操做結果的線程。

添加監聽者

接下來分析promise如何添加監聽者。

(一). 首先來看一下用來保存監聽者對象的字段。

/**
 * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
 * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
  *
  * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
   */
    private Object listeners;

從註釋能夠總結出那麼幾個信息:

1.這個object類型的listeners字段,能夠是GenericFutureListener類型,也能夠是link DefaultFutureListeners(用來保存GenericFutureListener的數組)。
 ps:這樣設計的好處,多數狀況下,listener只有一個,用集合或者數組會形成浪費,只有真正須要多個監聽者的時候,才使用數組
 2.若是listeners爲null,表示還未添加監聽者或者已經觸發過了(一旦觸發就會將listeners清空)
 3.能夠在外部添加監聽者,因此使用加鎖的形式(synchronized(this))添加監聽者。

(二). 添加監聽器的過程

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");

    synchronized (this) {
        addListener0(listener);
    }

    if (isDone()) {
        notifyListeners();
    }

    return this;
}

用加鎖的方式添加監聽器,添加完成後,若是promise的狀態爲isdone,就會當即觸發Listener.接下來看看addlistener0()是如何添加的。

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
    }
}

前面已經分析過,listeners字段只多是GenericFutureListener類型,或者DefaultFutureListeners類型。因此若是爲Null,直接保存;若是已是DefaultFutureListeners(數組形式),就讓其再添加一個listener;若是是GenericFutureListener類型,就建立一個數組。


(三)DefaultFutureListeners類分析:

首先看看它的構造方法:

DefaultFutureListeners(
        GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
    listeners = new GenericFutureListener[2];
    listeners[0] = first;
    listeners[1] = second;
    size = 2;
    if (first instanceof GenericProgressiveFutureListener) {
        progressiveSize ++;
    }
    if (second instanceof GenericProgressiveFutureListener) {
        progressiveSize ++;
    }
}

能夠看出,從構造之初,他就是size爲2的數組。

再來看看它是如何在後續過程當中添加元素的:

public void add(GenericFutureListener<? extends Future<?>> l) {
    GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
    final int size = this.size;
    if (size == listeners.length) {
        this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
    }
    listeners[size] = l;
    this.size = size + 1;

    if (l instanceof GenericProgressiveFutureListener) {
        progressiveSize ++;
    }
  }

若是數組容量未滿,就繼續添加元素;若是數組容量已滿,就將容量翻倍,將原數組內容複製拷貝到新數組中。


(四)添加監聽者總結:

1.添加完監聽者,就會嘗試去觸發listener中的操做。
2.promise內部用來保存監聽者的listeners只會是兩種類型,GenericFutureListener類型和link DefaultFutureListeners。

觸發監聽者

在前面的setsuccess()和addlistener()等方法中均可以看到notifylisteners()方法,這就是觸發監聽者的起點。

private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
           ……
                notifyListenersNow();
           ……
    }

    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

private static void safeExecute(EventExecutor executor, Runnable task) {
        executor.execute(task);
    ……
}

在notifylisteners()方法中,能夠看到,listener中觸發的異步操做要求是在線程組中執行的,若是是在線程組外部提交的任務,會將任務封裝成runnable提交到任務隊列中等待執行。
接下來看看notifynow()方法中作了什麼。

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        // Only proceed if there are listeners to notify and we are not already notifying listeners.
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
        }
        synchronized (this) {
            if (this.listeners == null) {
                // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                // need to be in a finally block.
                notifyingListeners = false;
                return;
            }
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}

在該方法中,首先將Listeners取出來,而後將其清空(每次觸發完listeners都會將原來的listeners清空),而後執行listener中具體的操做,執行完操做,會再次檢查是否又有listeners添加進來,確保無誤後,從方法中退出。

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

觸發監聽者總結:
1.觸發的listeners中具體的操做是在線程池中進行
2.觸發完畢的listeners會將其清空。


同步等待

netty還提供了接口能夠同步等待異步操做結果,使用到的是await()和sync()方法。

public Promise<V> await() throws InterruptedException {
    if (isDone()) {
        return this;
    }

    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    checkDeadLock();

    synchronized (this) {
        while (!isDone()) {
            incWaiters();
            try {
                wait();
            } finally {
                decWaiters();
            }
        }
    }
    return this;
 }

原理很簡單,就是讓線程在promise對象上等待通知。若是是isdone狀態,就直接返回。
sync()方法是在await()方法的基礎上添加了額外的功能,區別只是sync()調用,若是異步操做失敗,則會拋出異常。

public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}

(二)defaultchannelpromise

以上分析得出一個疑惑?
從defaultpromise的分析能夠得知,listener中的操做是由線程池來執行。但注意到defaultpromise的其中一個權限爲protected的構造方法不須要傳入eventexecutor,這可能致使出現nullpoint異常。
因此出現了另外一個擴展類,defaultchannelpromise.

public DefaultChannelPromise(Channel channel) {
    this.channel = checkNotNull(channel, "channel");
}

public DefaultChannelPromise(Channel channel, EventExecutor executor) {
    super(executor);
    this.channel = checkNotNull(channel, "channel");
}

defaultchannelpromise類有兩個構造方法,一個爲父類傳入eventexecutor,一個調用的是上面提到的父類中protected的構造方法,那它是如何解決eventexecutor空指向的異常的?

protected EventExecutor executor() {
    EventExecutor e = super.executor();
    if (e == null) {
        return channel().eventLoop();
    } else {
        return e;
    }
}

能夠看到,當eventexecutor爲Null時,保存的是channel中的eventexecutor.

相關文章
相關標籤/搜索