Promising Linking

Future/Promise 執行邏輯數組

scala Future 有幾個要點,第一個是 tryAwait 須要藉助 CowndownLatch 實現,第二個是能夠在 Promise 掛載回調函數promise

首先,大體看下 Scala concurrent 的架構架構

DefaultPromise -> AbstractPromise -> Promise(concurrent) -> Promise[Trait] -> Future[Trait] -> Awaitableapp

在 package 外使用的 Promise 是 Promise[Trait], 其實 DefaultPromise 也是有 map, flatMap 方法的,只不過不能用而已,DefaultPromise 是 scala promise 的惟一實現類異步

我沒能徹底理解 link promise 怎麼實現垃圾回收的,
在 flatMap 中有一個 linkRootOf 函數,從 Promise 的註釋中也能夠看到 promise link 是一個很重要的概念,它解決了 flatMap 函數組合造成無限長的鏈後的 memory leak 問題ide

The ability to link DefaultPromises is needed to prevent memory leaks when using Future.flatMap. The previous implementation of Futhre.flatMap used onComplete handlers to propagate to the ultimate value of a flatMap operation to its promise. Recursive calls to flatMap built a chain of onComplete handlers and promises. Unfortunately none of the handlers or promises in the chain could be collected until the handers has been called detached, which only happended when the final flatMap future was completed. (In some situations, such as infinte streams, this would never actually happen.) Because of the fact that the promise implementation internally created references between promises, and these reference were invisible to user code, it was easy for user code to accidentally build large chains of promises and thereby leak memory.函數

結合 flatMap 函數理解ui

def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
  import impl.Promise.DefaultPromise
  val p = new DefaultPromise[S]()
  onComplete {
    case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
    case Success(v) => try f(v) match {
      // If possible, link DefaultPromises to avoid space leaks
      case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
      case fut => fut.onComplete(p.complete)(internalExecutor)
    } catch { case NonFatal(t) => p failure t }
  }
  p.future
}

每次 flatMap 函數都會建立 DefaultPromise 變量,這個變量經過返回值傳遞到函數外,使它在上一層 scope 可見,若是無限建立不能被 GC 回收,那麼內存很快就會被佔滿,而 stream 類型的數據流極可能就是無限長的,因此這個 DefaultPromise 變量必定要回收掉。this

Example

// 添加 sleep 對分析控制流走向頗有幫助
Future { Thead.sleep(3000), 1 }
    .flatMap { x => { Thread.sleep(20000), 2} }
    .flatMap { y => { Thread.sleep(50000), 3} }

Stage 1:spa

Future { Thread.sleep(3000); 1}

第一個 Future 調用 object Future.apply 方法,建立 PromiseCompletingRunnable, 放到線程池裏運行,運行完畢後(幾秒以後),會調用 promise complete Try 方法,此時還沒調用。

Stage 2:

.flatMap { x => {Thread.sleep(20000), 2}}

complete 邏輯先不分析,而後是第一個 flatMap 方法,flatMap 方法在上面已經給出,不過我這裏先把 flatMap 方法展開,去掉不重要或無關的代碼

def flatMap(f: T => Future[S]): Future[S]
    val p = DefaultPromise[S]
    val callBackFunction = {
        case Success(v) => f(v) match 
            case dp: DefaultPromise => dp.linkRootOf(p)
    }
    
    val runnable = new CallbackRunnable(callbackFunction)
    getState match
        case r: Try => runnable(r)
        case DefaultPromise => compressRoot().dispatcherOrCallback(runnable)
        case listener: List[] => updateState(listenr, runnable::listener)
    
    p.future

flatMap 實際上只作了回調函數註冊的功能,在上面的 promise complete 執行時,會調用這些 callbackFunction.

DefaultPromise 初始化時,State = Nil, 因此註冊回調函數的時候,state 會被設置成 runnable.

Stage 3:

第一個 flatMap 函數執行,假設

f0 = Future{} 
f1 = f0.flatMap {} // f0.state = runnable
f2 = f1.flatMap {} // f1.state = runnable

那麼 stage 3 就是在 f1 上添加回調函數

Stage 3:

假設,第一個 Future 運算完畢,開始返回,promise complete result 開始執行了,complete 調用 tryComplete 函數

def tryComplete(r: Try)
    getState match
        case list: List[] => updateState(list, r); list.foreach(exec)
        case DefaultPromise => ...

返回值爲 Success(1), 執行剛纔註冊的回調函數 callBackFunction, f(v) 返回 Future 類型,其實是 DefaultPromise 類型,這個操做也是經過線程池調用,異步執行,而後走到 dp.linkRootOf(p),注意,這個 dp 再也不是 this 了,而是新產生的 Future, 而 p2 是 flatMap 裏新建立的。

Stage 4:

private def link(target: DefaultPromise[T]): Unit = if (this ne target) {
      getState match {
        case r: Try[_] =>
          if (!target.tryComplete(r.asInstanceOf[Try[T]])) {
            // Currently linking is done from Future.flatMap, which should ensure only
            // one promise can be completed. Therefore this situation is unexpected.
            throw new IllegalStateException("Cannot link completed promises together")
          }
        case _: DefaultPromise[_] =>
          compressedRoot().link(target)
        case listeners: List[_] => if (updateState(listeners, target)) {
          if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_))
        } else link(target)
      }
    }

由於 dp 是新建立的,且當前值還未返回(異步執行中),state = Nil, 因此這裏會把狀態更新爲 target 也就是 p2, 沒有須要執行的回調函數。

Stage 5:

當 f2 返回了,會執行 promise complete try, 進入 tryComplete 邏輯,上一次,tryComplete 走的是 List() 分支,而此次,由於 state 上 Stage 4 換成了 target, 也就是 P2, 因此此次改走 DefaultPromise 分支,調用 P2 上的 Listener 也就是第三個 flatMap 的邏輯。這樣,chain 就跑起來了

Stage 6:

第二個 flatMap 依然執行建立 DefaultPromise, 註冊回調函數的邏輯,

相關文章
相關標籤/搜索