Netty: DefaultPromise源碼解讀

1、爲何須要 io.netty.util.concurrent.Promise ?

若是你有一個阻塞的方法,好比 Thread.sleep(1000),而又不想阻塞當前線程 A,只須要把該方法包裝成一個任務由另外一個線程 B 執行便可。java

ExecutorService pool = Executors.newFixedThreadPool(3);
Future<Integer> future = pool.submit(() -> {
    Thread.sleep(1000);
    return 1;
});
複製代碼

若是你須要在任務結束以後執行其餘邏輯,一種方式是 A 線程先經過調用 future.get() 獲取值,而後執行其餘代碼;可是 get 方法自己也是一個阻塞方法,在這期間 A 線程阻塞。數組

另一種方法是 B 線程執行完任務後,繼續執行後續邏輯。Netty 中的 Future,io.netty.util.concurrent.Future,經過回調方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); 實現了該功能。promise

Promise 接口繼承了 Future 接口,在增長了 listener 的狀況下,提供了 Promise<V> setSuccess(V result) 方法,能夠在任務中手動設置返回值,並當即通知 listeners。ide

2、實例程序

private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8);

public void methodA() {
    Promise promise = methodA("ceee...eeeb");
    promise.addListener(future -> {		// 1
        Object ret = future.get();      // 4. 此時能夠直接拿到結果
      	// 後續邏輯由 B 線程執行
        System.out.println(ret);
    });
  	// A 線程不阻塞,繼續執行其餘代碼...
}

public Promise<ResponsePacket> methodB(String name) {
    Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next());
    loopGroup.schedule(() -> {		// 2
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("scheduler thread: " + Thread.currentThread().getName());
        promise.setSuccess("hello " + name);	// 3
    }, 0, TimeUnit.SECONDS);

    return promise;
}
複製代碼

簡單的使用 Promise 包括:oop

  1. 給 promise 增長 listener,promise.addListener()
  2. 分配執行任務的線程,loopGroup.schedule()
  3. 在任務執行過程當中,設置結果,promise.setSuccess()

3、源碼分析

1. addListener

// class: DefaultPromise
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");

    synchronized (this) {  // 1. 增長 listener
        addListener0(listener);	
    }

    if (isDone()) {  // 2. 若是任務執行完了,通知全部 listener
        notifyListeners();
    }

    return this;
}
複製代碼

繼續看 addListener0:源碼分析

private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
 	// 1. 添加第 1 個 listener 時,直接賦值便可
    if (listeners == null) {
        listeners = listener;
    } 
  	// 3. 添加第 3 個以及更多 listener 時,直接加入數組便可
  	else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } 
  	// 2. 添加第 2 個 listener 時,listeners 類型更改成 DefaultFutureListeners,內部實現爲一個數組
  	else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}
複製代碼

因爲能夠添加多個 listener,很容易想到經過一個數組保存全部 listener。而實現類裏面 listeners 類型爲 Object,多是考慮到大部分都只有 1 個 listener,節省內存空間。this

2. schedule

將任務加入隊列,由線程池執行。spa

3. setSuccess

// class: DefaultPromise
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {	// 若是設置成功,返回;不然拋異常
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
  	// 設置 result
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    // cas 操做
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

private synchronized boolean checkNotifyWaiters() {
      /** * 有些線程不是經過增長 listener 的方式獲取結果,而是經過 promise.get() 方法獲取, * 那麼這些線程爲阻塞狀態;當設置了 result 後,須要喚醒這些線程 */
    if (waiters > 0) {
        notifyAll();
    }
    return listeners != null;  // 只要存在 listener,就返回 true
}
複製代碼

繼續查看 notifyListeners:.net

// class: DefaultPromise
private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
      	// TODO 嵌套監聽
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
              	// 1. 若是是 promise 綁定的線程,直接執行
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    // 2. 不然,加入任務調度, 所以 listener 方法最終仍是由 promise 綁定的線程執行的
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
      	// 依次通知全部 listener
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener<?>) listeners);
        }
        synchronized (this) {
            if (this.listeners == null) {
                notifyingListeners = false;
                return;
            }
            // 通知原先的 listeners 時,有可能有新的 listener 在此期間註冊, 也須要通知到
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);	// 執行 listener 中的方法
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}
複製代碼
相關文章
相關標籤/搜索