Future/Promise 執行邏輯數組
scala Future 有幾個要點,第一個是 tryAwait 須要藉助 CowndownLatch 實現,第二個是能夠在 Promise 掛載回調函數promise
首先,大體看下 Scala concurrent 的架構架構
DefaultPromise -> AbstractPromise -> Promise(concurrent) -> Promise[Trait] -> Future[Trait] -> Awaitableapp
在 package 外使用的 Promise 是 Promise[Trait], 其實 DefaultPromise 也是有 map, flatMap 方法的,只不過不能用而已,DefaultPromise 是 scala promise 的惟一實現類異步
我沒能徹底理解 link promise 怎麼實現垃圾回收的,
在 flatMap 中有一個 linkRootOf 函數,從 Promise 的註釋中也能夠看到 promise link 是一個很重要的概念,它解決了 flatMap 函數組合造成無限長的鏈後的 memory leak 問題ide
The ability to link DefaultPromises is needed to prevent memory leaks when using Future.flatMap. The previous implementation of Futhre.flatMap used onComplete handlers to propagate to the ultimate value of a flatMap operation to its promise. Recursive calls to flatMap built a chain of onComplete handlers and promises. Unfortunately none of the handlers or promises in the chain could be collected until the handers has been called detached, which only happended when the final flatMap future was completed. (In some situations, such as infinte streams, this would never actually happen.) Because of the fact that the promise implementation internally created references between promises, and these reference were invisible to user code, it was easy for user code to accidentally build large chains of promises and thereby leak memory.函數
結合 flatMap 函數理解ui
def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { import impl.Promise.DefaultPromise val p = new DefaultPromise[S]() onComplete { case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] case Success(v) => try f(v) match { // If possible, link DefaultPromises to avoid space leaks case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) case fut => fut.onComplete(p.complete)(internalExecutor) } catch { case NonFatal(t) => p failure t } } p.future }
每次 flatMap 函數都會建立 DefaultPromise 變量,這個變量經過返回值傳遞到函數外,使它在上一層 scope 可見,若是無限建立不能被 GC 回收,那麼內存很快就會被佔滿,而 stream 類型的數據流極可能就是無限長的,因此這個 DefaultPromise 變量必定要回收掉。this
// 添加 sleep 對分析控制流走向頗有幫助 Future { Thead.sleep(3000), 1 } .flatMap { x => { Thread.sleep(20000), 2} } .flatMap { y => { Thread.sleep(50000), 3} }
Stage 1:
spa
Future { Thread.sleep(3000); 1}
第一個 Future 調用 object Future.apply 方法,建立 PromiseCompletingRunnable, 放到線程池裏運行,運行完畢後(幾秒以後),會調用 promise complete Try 方法,此時還沒調用。
Stage 2:
.flatMap { x => {Thread.sleep(20000), 2}}
complete 邏輯先不分析,而後是第一個 flatMap 方法,flatMap 方法在上面已經給出,不過我這裏先把 flatMap 方法展開,去掉不重要或無關的代碼
def flatMap(f: T => Future[S]): Future[S] val p = DefaultPromise[S] val callBackFunction = { case Success(v) => f(v) match case dp: DefaultPromise => dp.linkRootOf(p) } val runnable = new CallbackRunnable(callbackFunction) getState match case r: Try => runnable(r) case DefaultPromise => compressRoot().dispatcherOrCallback(runnable) case listener: List[] => updateState(listenr, runnable::listener) p.future
flatMap 實際上只作了回調函數註冊的功能,在上面的 promise complete 執行時,會調用這些 callbackFunction.
DefaultPromise 初始化時,State = Nil, 因此註冊回調函數的時候,state 會被設置成 runnable.
Stage 3:
第一個 flatMap 函數執行,假設
f0 = Future{} f1 = f0.flatMap {} // f0.state = runnable f2 = f1.flatMap {} // f1.state = runnable
那麼 stage 3 就是在 f1 上添加回調函數
Stage 3:
假設,第一個 Future 運算完畢,開始返回,promise complete result 開始執行了,complete 調用 tryComplete 函數
def tryComplete(r: Try) getState match case list: List[] => updateState(list, r); list.foreach(exec) case DefaultPromise => ...
返回值爲 Success(1), 執行剛纔註冊的回調函數 callBackFunction, f(v) 返回 Future 類型,其實是 DefaultPromise 類型,這個操做也是經過線程池調用,異步執行,而後走到 dp.linkRootOf(p)
,注意,這個 dp 再也不是 this 了,而是新產生的 Future, 而 p2 是 flatMap 裏新建立的。
Stage 4:
private def link(target: DefaultPromise[T]): Unit = if (this ne target) { getState match { case r: Try[_] => if (!target.tryComplete(r.asInstanceOf[Try[T]])) { // Currently linking is done from Future.flatMap, which should ensure only // one promise can be completed. Therefore this situation is unexpected. throw new IllegalStateException("Cannot link completed promises together") } case _: DefaultPromise[_] => compressedRoot().link(target) case listeners: List[_] => if (updateState(listeners, target)) { if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) } else link(target) } }
由於 dp 是新建立的,且當前值還未返回(異步執行中),state = Nil, 因此這裏會把狀態更新爲 target 也就是 p2, 沒有須要執行的回調函數。
Stage 5:
當 f2 返回了,會執行 promise complete try, 進入 tryComplete 邏輯,上一次,tryComplete 走的是 List() 分支,而此次,由於 state 上 Stage 4 換成了 target, 也就是 P2, 因此此次改走 DefaultPromise 分支,調用 P2 上的 Listener 也就是第三個 flatMap 的邏輯。這樣,chain 就跑起來了
Stage 6:
第二個 flatMap 依然執行建立 DefaultPromise, 註冊回調函數的邏輯,