[netty4][netty-common]Future與Promise分析

接口與類結構體系

-- [I]java.util.concurrent.Future<V>  
---- [I]io.netty.util.concurrent.Future<V>  
------ [AC]AbstractFuture, [I]ChannelFuture, [I]Promise 

-- [AC]AbstractFuture, [I]Promise         -- [I]ChannelFuture, [I]Promise      
---- DefaultPromise                       ---- [I]ChannelPromise

-- [I]ChannelPromise, [I]FlushCheckpoint, [C]DefaultPromise  
---- [C]DefaultChannelPromise

JDK的Future提供特性

  1. 是否完成
  2. 是否取消
  3. 結果獲取
  4. 取消執行

netty的Future增長的特性

  1. 是否成功(完成後的結果,完成不必定成功)
  2. 是否能被取消
  3. 若是失敗時的異常獲取
  4. 支持監聽器,監聽操做完成的回調
  5. sync 阻塞等待直至完成 // 跟get有什麼區別?A: 只阻塞,不取結果,在一些實現邏輯中而有是否死鎖等檢查。
  6. await 阻塞等待直至完成 // 跟get有什麼區別?跟sync有什麼區別?A: 在一些實現邏輯中多了在調完await以後再調用rethrowIfFailed(字面意思)。
  7. getNow,非阻塞獲取結果(能夠理解成JDK Future的是否完成和結果獲取的組合)
  8. 從新覆寫cancel行爲定義

Promise增長的特性

總體定位是一個支持可寫的Future,能夠理解成:能夠經過API設置結果的成功仍是失敗。對應netty的Future的特性1。html

  1. 設置是否成功的結果,並觸發監聽器operationComplete事件。操做失敗拋異常
  2. 嘗試設置是否成功的結果, 並觸發監聽器operationComplete事件。操做失返回false
  3. 設置是否失敗的結果,並觸發監聽器operationComplete事件。操做失敗拋異常
  4. 嘗試設置是否失敗的結果, 並觸發監聽器operationComplete事件。操做失返回false
  5. 設置是否能夠取消
  6. 覆寫返回Future的方法簽名爲返回Promise

注意: 是否能夠取消,是否成功等,在DefaultPromise實現中,是用一個result字段來實現的。而且用AtomicReferenceFieldUpdater結合volatile來完成在併發狀況下字段的正確設置值。java

ChannelPromise增長的特性
  1. 覆寫返回ChannelFuture的方法簽名爲返回ChannelPromise
  2. unvoid行爲(若是是void的則返回新的ChannelPromise,不然返回當前實例)
AbstractFuture完成的邏輯

完成get的實現邏輯,或者說定義的行爲順序,包含超時的get與一直等的get數組

+-----------+
               |   await   |
               +-----+-----+
                     |
                     |
            +--------v-------+
            |  get cause     |
       +----+ cause == null  +---+
       |    +----------------+   |
       |                         |
       |                         |
+------v------+           +------v------+
| throw exp   |           |  getNow     |
+-------------+           +-------------+

DefaultPromise完成的邏輯

實現是線程安全的promise

  1. 實現監聽器添加、刪除與觸發邏輯。引入EventExecutor實例,一對一。 用於觸發監聽器時使用。觸發監聽器邏輯有棧深度保護策略。
  2. 經過volatile Object result字段完成是否成功,是否取消的狀態描述。
  3. 實現設置(含嘗試)成功,設置失敗(含嘗試),設置不可取消的邏輯
  4. 實現是否成功,是否取消的判斷邏輯
  5. 異常的獲取,結果的獲取
  6. await邏輯的實現。依靠Object的wait方法實現。同時用short waiters來描述wait的次數。
  7. 獲取執行結果。執行結果也是volatile Object result字段承載。
  8. 取消邏輯實現。設置result字段爲CANCELLATION_CAUSE_HOLDER。notifyAll wait。notify全部監聽器。
  9. 是否取消的判斷邏輯實現。比對result字段值,比對cause字段的異常類型
  10. 是否完成的判斷邏輯實現。與是否取消邏輯相似。
  11. sync邏輯實現。在調完await以後再調用rethrowIfFailed(字面意思)。
  12. 死鎖檢測邏輯實現。若是executor在eventLoop則死鎖(executor.inEventLoop)。死鎖扔異常BlockingOperationException

await邏輯安全

+----------------+
       +----Y-----+     isDone     |
  +----v---+      +----------------+
  | return |              N
  +------+-+              |
         ^        +-------v--------+
         +----Y---+  interrupted   |
                  +----------------+
                          N
                          |
+----------+      +-------v--------+
| throw exp<--Y---+  checkDeadLock |
+----------+      +----------------+
                          N
                          +---------synchronized----------+
                          |                               |
                          |                               |
                  +-------v--------+                      |
       while-loop-+   !isDone      +-----------+          |
          |       +----------------+   +-------v------+   |
          |                            |  incWaiters  |   |
          |                            +-------+------+   |
          |       +----------------+           |          |
          |       |     wait       <-----------+          |
          |       +-------+--------+                      |
          |               |            +--------------+   |
          |               +------------> decWaiters   |   |
          |                            +--------------+   |
          +-------------------------------------------+   |
                                                          |
                          +-------------------------------+

若是有其餘人作了notify 可是此時任務尚未done,那麼則會繼續wait,由於這是一個while loop!併發

觸發監聽器邏輯有棧深度保護策略
前提是在同一個線程上,若是不是同一個線程就沒有意義了。因此要判斷executor.inEventLoop()。
在UnpaddedInternalThreadLocalMap中有個字段int futureListenerStackDepth字段來維護FutureListener的棧深度。
在同一個線程上,作notifyListener0以前會將futureListenerStackDepth加1,作完以後恢復原值。
這樣若是在同一個線程遞歸調用到notifyListener0即notifyListener則會觸發觸發監聽器邏輯有棧深度保護策略。
棧深度保護閾值默認是8,能夠經過io.netty.defaultPromise.maxListenerStackDepth系統參數配置。oop

關於Promise的監聽器
監聽器是用listeners字段,這個字段是Object類型,居然沒有給一個明確的類型。在邏輯實現中有DefaultFutureListeners、GenericProgressiveFutureListener與GenericFutureListener等。
裏面包了一個GenericFutureListener數組,達成一個複合的(列表型的)Listener。
GenericProgressiveFutureListener在netty自身裏面沒有用到具體實現。.net

安全執行任務的包裝線程

private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

注意: rejectedExecutionLogger 單獨的日誌名稱,因此能夠單獨配置。日誌

相關文章
相關標籤/搜索