在scala中是沒有原生線程的,其底層使用的是java的Thread機制。可是在scala中對java Thread進行了封裝,實現了更便於操做線程的Future。html
官方文檔: Futures provide a way to reason about performing many operations in parallel– in an efficient and non-blocking way.java
在使用的時候只須要經過object Future 的apply方法傳入執行體便可啓動,那麼future是如何開始運行的呢?又是如何把運行體加入到線程的執行體中的呢?其底層運行機制又是什麼呢?下面就逐步看一下。編程
先看一段代碼.注意在代碼中導入的global,其類型爲global: ExecutionContext,這裏暫時不進行解釋,留意一下後面會用到。promise
package zpj.future import org.scalatest.FunSuite import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global /** * Created by PerkinsZhu on 2018/3/18 11:34 **/ class Test extends FunSuite { test("future demo 1") { Future { println("hello world !!!") } sleep } val sleep = Thread.sleep(1000) }
直接運行代碼會打印出「hello world !!!」。咱們知道,若是使用java的Thread,則必須調用.start()方法來啓動線程的運行,但是在這裏咱們並無主動觸發start()方法,而線程體卻執行了。下面進入源碼中看一下。在這以前注意打開idea的Structure窗口,留意每一個方法是屬於哪一個class、object或者trait中。這樣便於理解整個Future 的結構關係。app
進入Future.apply()函數:ide
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = unit.map(_ => body)
能夠看到在body即是傳入的線程體,在這裏使用unit調用了map方法,那麼這個unit又是什麼呢?函數式編程
/** A Future which is always completed with the Unit value. */ val unit: Future[Unit] = successful(())
一個值爲unit 的已完成future。這裏調用的successful(())函數。注意傳入的() ,這個就是該future的值:Unit 。能夠看一下()的類型:函數
很明顯()就是上面註釋所說的 Unit value.post
繼續咱們進入successful(())看一下是怎麼實現的:ui
/** Creates an already completed Future with the specified result. * * @tparam T the type of the value in the future * @param result the given successful value * @return the newly created `Future` instance */ def successful[T](result: T): Future[T] = Promise.successful(result).future
先看一下參數部分,result:T,還記得上面傳入的()嗎,在這裏便賦值給result。那麼後面的Promise.successful(result).future
又是什麼意思呢?咱們先看前半部分Promise.successful(result),這裏調用的是Promise的succeful(),進入看一下:
/** Creates an already completed Promise with the specified result. * * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ def successful[T](result: T): Promise[T] = fromTry(Success(result))
到這裏看到Success(result)大概就明白了,這就是用來構建future的結果值,其結果即是Success(()) 。【疑問1】同時注意一下這裏返回的結果類型爲Promise[T],而其調用出接收的倒是Future,這兩處是如何對接的呢?咱們暫時放一下,先看下面。那fromTry又是作什麼呢?
/** Creates an already completed Promise with the specified result or exception. * * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ def fromTry[T](result: Try[T]): Promise[T] = impl.Promise.KeptPromise[T](result)
這裏經過KeptPromise建立了一個Promise的實例,繼續進入KeptPromise.apply():
def apply[T](result: Try[T]): scala.concurrent.Promise[T] = resolveTry(result) match { case s @ Success(_) => new Successful(s) case f @ Failure(_) => new Failed(f) }
一、注意這裏的Successful(s)和Failed(f),這兩個是繼承了Promise的私有類,看一下這裏的繼承結構:
private[this] sealed trait Kept[T] extends Promise[T]
private[this] final class Successful[T](val result: Success[T]) extends Kept[T] private[this] final class Failed[T](val result: Failure[T]) extends Kept[T]
二、resolveTry是對result進行進一步處理,判斷result是否失敗,並解析出其Exception,只是對future中的結果作一個細分化。
private def resolveTry[T](source: Try[T]): Try[T] = source match { case Failure(t) => resolver(t) case _ => source } private def resolver[T](throwable: Throwable): Try[T] = throwable match { case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T]) case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t)) case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t)) case e: Error => Failure(new ExecutionException("Boxed Error", e)) case t => Failure(t) }
走到這裏,就明白了Promise.successful(result).future中的 前半部分的執行機。還記得上面拋出的一個疑問嗎?這裏就對【疑問1】解釋一下。
def successful[T](result: T): Future[T] = Promise.successful(result).future接收的是Future,而Promise.successful(result)返回的是一個Promise,這兩個類型怎麼對接呢?後面調用了future ,咱們進入看一下
trait Promise[T] {
def future: Future[T]
...
...
該函數是定義在特質scala.concurrent.Promise中的一個抽象函數(注意這裏的包路徑)。上面咱們知道Promise.successful(result)返回的是一個Successful,那麼future應該會在Successful中進行實現了:
進去以後發現並無,那麼會不會在其父類中實現了呢?咱們繼續進入Kept看看:
發現Kept中也沒有,那麼久繼續向上找,private[this] sealed trait Kept[T] extends Promise[T],(注意這裏的Promise是scala.concurrent.impl中的Promise,不是剛纔的scala.concurrent.Promis)這裏咱們進入scala.concurrent.Promise看一下:
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this
會發如今 scala.concurrent.impl.Promise[T] extends scala.concurrent.Promise[T],且二者都是特質(注意區分這兩個Promise)。在下面能夠看到 future 在這裏被實現了def future: this.type = this。對於這裏該如何理解呢?
future返回的結果應該是Future[T]類型的,那麼這裏的this.type 應該就是Promise類型,而this就應該是上面的Successful(())。這裏可能有些不太容易理解,事實上 scala.concurrent.impl.Promise繼承了Promise 混合了Future ,注意看上面的繼承關係:
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T]
這裏的with混合了scala.concurrent.Future特質,經過def future: this.type = this把Promise類型轉化爲Future返回給了調用處。
走到這裏unit的構建就清晰了,其實質就是一個已經完成了的Future
回到Future.apply()方法中,unit就明白了其構建過程,而對於map呢?該如何理解?
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = unit.map(_ => body)
繼續進入map的實現源碼:
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)
def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
一路跟進來以後會進入scala.concurrent.Future#transform的抽象方法中。上面咱們知道這裏的unit是scala.concurrent.impl.Promise.KeptPromise.Successful的實例,根據上面的經驗一層一層的向上找transform的實現位置,會發如今scala.concurrent.impl.Promise#transform中進行了實現。看一下這裏的實現代碼:
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
在這裏咱們逐一分析一下這三行代碼:
一、val p = new DefaultPromise[S]()。建立了 一個scala.concurrent.impl.Promise.DefaultPromise實例,進入DefaultPromise的構造器中看一下:
class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]
會發現DefaultPromise依舊混合了scala.concurrent.impl.Promise特質,同時還繼承了java.util.concurrent.atomic.AtomicReference且向其構造器中傳入了Nil空列表。這裏先掛起,分析第二行代碼。
二、onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) },在理解這行代碼的時候須要注意scala的參數類型,明確其傳入的是函數仍是參數值。
咱們進入onComplete 發現是一個scala.concurrent.Future#onComplete的抽象方法。那麼找到其實現處:scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete,看一下源碼:
override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
這裏終於看到開啓線程的代碼了,每一個future開啓一個線程的代碼應該就是這裏了。
注意這裏new CallbackRunnable(executor.prepare(), func)) 傳入的對象 executor,和func,這裏的executor是從上面一路帶過來的(implicit executor: ExecutionContext),也就是咱們上面剛開始導入的import scala.concurrent.ExecutionContext.Implicits.global;在看func,回溯上面會發現func就是scala.concurrent.Promise#complete方法,根據名字能夠指定是在Future 完成以後的回調,接收的參數就是Future.apply()的函數體。
進入scala.concurrent.impl.CallbackRunnable看一塊兒源碼:
private final class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it var value: Try[T] = null override def run() = { require(value ne null) // must set value to non-null before running! try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } } def executeWithValue(v: Try[T]): Unit = { require(value eq null) // can't complete it twice value = v // Note that we cannot prepare the ExecutionContext at this point, since we might // already be running on a different thread! try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t } } }
注意以下幾點:
一、繼承關係能夠發現CallbackRunnable是java.lang.Runnable的實現類,所以其實一個能夠在java Threa中運行的線程。 CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable
二、注意其構造器參數,executor是一個全局線程池,onComplete: Try[T] => Any是一個函數。函數是能夠調用的代碼塊,能夠傳參的(理解scala的函數式編程)。
三、注意其run方法中執行的代碼塊,其中是調用了onComplete的,且傳入的結果是一個Value。
四、注意executeWithValue的參數v,其把v賦值給Value。賦值以後調用了 executor.execute(this);該命令再熟悉不過了,調用線程池執行線程,這裏的this就是CallbackRunnable實例。
經過這四點能夠明白:
scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete 是在單獨的線程中執行的,結合上面的 onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }這塊代碼,發現onComplete執行的就是scala.concurrent.Promise#complete的代碼邏輯。
再看一下scala.concurrent.impl.Promise#transform的源碼:
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
注意這裏面的參數類型,f: Try[T] => Try[S]是一個函數,然而注意這裏: p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) ,看一下 p.complete()方法接收的參數類型是什麼:
def complete(result: Try[T]): this.type = if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
一個結果參數,不是一個函數。再看上面的f(result),其實質在調用f()函數,傳入的參數就是result,而後計算出結果以後把結果值傳入scala.concurrent.Promise#complete。仔細體會一下這裏的調用邏輯。也就是說在調用scala.concurrent.Promise#complete以前f()函數已經進行了調用,這裏的f()函數也就是Future.apply()的函數體。
彙總上面再理一下調用邏輯:
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
在onComplete ()中開啓線程,並執行線程體。在線程執行過程當中,調用p.complete()函數,而在調用p.complete()以前會觸發f()函數的調用,這樣便觸發了Future.apply()的執行,因而便執行了 println("hello world !!!") 代碼塊。
所以Future.apply()中的代碼塊是在單獨的一個線程中執行的,這即是scala 中Future自動開啓線程執行代碼塊的機制。
這裏不太容易理解的就是這個函數的調用時機。搞清楚Future是如何把Future.apply()代碼塊加載到java Thread中運行以後,Future的核心便易於理解了。
注意這裏還有一個result的傳入時機:
onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }
這個result 是從哪裏過來的呢?咱們知道future是能夠組合上一個future的結果的。例如:
Future { 10 }.map( _ + 10).map(_ * 10)
這裏執行邏輯時機上是(10+10)* 10 結果就是200 ,那麼這裏的10如何傳給第二個map函數的呢?又是如何把20傳給第三個map函數的呢?
咱們再看一下scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete的實現源碼:
override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
注意這裏的result,調用executeWithValue()以後會把該result賦值給scala.concurrent.impl.CallbackRunnable#value的參數,在run運行過程當中,調用onComlete會把該繼續把該result傳給p.complete()
override def run() = { require(value ne null) // must set value to non-null before running! try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } } override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
這裏的result即是線程run方法中傳入的Value,那麼在(new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)這裏的result又是哪裏來的呢?
看一下onComplete的源碼:
private[this] sealed trait Kept[T] extends Promise[T] { def result: Try[T] override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
發現result是一個抽象值,那麼咱們就去找Kept的實現類scala.concurrent.impl.Promise.KeptPromise.Successful。看一下構造器:
private[this] final class Successful[T](val result: Success[T]) extends Kept[T]
在這裏能夠發現其實result是經過構造器傳入的,那麼是哪裏調用構造器傳入的呢?還記得咱們看unit實現邏輯嗎?其中有一部分這樣的代碼:
def apply[T](result: Try[T]): scala.concurrent.Promise[T] = resolveTry(result) match { case s @ Success(_) => new Successful(s) case f @ Failure(_) => new Failed(f) }
這裏的S即是傳入的result,而在構建unit的時候,這裏的S是一個Unit值,這也是初始Future的值。
那麼咱們上面說的十、20分別是如何經過map傳入的呢?
這裏咱們回想一下前面的unit,unit是經過scala.concurrent.impl.Promise.KeptPromise.Successful構造的,其混入的是scala.concurrent.impl.Promise.KeptPromise.Kept所以看下面
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
unit在調用transform的時候,執行的 onComplete 是scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete。而看第三行返回的結果: p.future,也便是說第一個Future返回的對象是DefaultPromise()實例的future。結合代碼:
Future { 10 }.map( _ + 10).map(_ * 10)
這裏返回的future是DefaultPromise()的future,因此調用map的也是DefaultPromise()的future。那麼,進入map方法以後,咱們會發現又進入了scala.concurrent.Future#transform
def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
注意這裏調用transform的再也不是KeptPromise()了,而是DefaultPromise()的實例在調用。因此 在調用onComplete()的時候進入的就是scala.concurrent.impl.Promise.DefaultPromise#onComplete,而再也不是scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete了
下面看一下scala.concurrent.impl.Promise.DefaultPromise#onComplete的源碼:
final def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func))
注意這裏只是new 了一個CallbackRunnable,並無啓動。不啓動的緣由就是不肯定上一個Future是否執行成功。可能須要等待,由此能夠猜到dispatchOrAddCallback()的目的就是對調用者future進行判斷和等待的邏輯。看一下scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback的源碼:
/** Tries to add the callback, if already completed, it dispatches the callback to be executed. * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks * to the root promise when linking two promises together. */ @tailrec private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { get() match { case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) case dp: DefaultPromise[_] => compressedRoot(dp).dispatchOrAddCallback(runnable) case listeners: List[_] => if (compareAndSet(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) } } /** * Gets the current value. * * @return the current value */ public final V get() {// 注意該方法的路徑:java.util.concurrent.atomic.AtomicReference#get return value; }
注意以下幾點:
一、scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback是一個遞歸方法,注意註釋@tailrec
二、case r: Try[_] 該分支說明調用者future已經結束,啓動該future的線程,執行map中的操做。
三、爲何會調用的get()方法呢?由於DefaultPromise混入了AtomicReference:
class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]
注意這裏傳入的是Nil ,這也是爲何會有case listeners: List[_]分支的緣由。
scala在進行debug的時候不像java那麼方便,須要深刻理解函數式編程的邏輯,函數的調用邏輯。
=========================================
=========================================
-------end