前情提要:html
上一篇咱們已經說明了Spark RPC框架的一個簡單例子,Spark RPC相關的兩個編程模型,Actor模型和Reactor模型以及一些經常使用的類。這一篇咱們仍是用上一篇的例子,從代碼的角度講述Spark RPC的運行時序,從而揭露Spark RPC框架的運行原理。咱們主要將分紅兩部分來說,分別從服務端的角度和客戶端的角度深度解析。java
不過源碼解析部分都是比較枯燥的,Spark RPC這裏也是同樣,其中不少東西都是繞來繞去,牆裂建議使用上一篇中介紹到的那個Spark RPC項目,下載下來並運行,經過斷點的方式來一步一步看,結合本篇文章,你應該會有更大的收穫。程序員
PS:所用spark版本:spark2.1.0算法
咱們將以上一篇HelloworldServer爲線索,深刻到Spark RPC框架內部的源碼中,來看看啓動一個服務時都作了些什麼。編程
由於代碼部分都是比較繞的,每一個類也常常會搞不清楚,我在介紹一個方法的源碼時,一般都會將類名也一併寫出來,這樣應該會更加清晰一些。bootstrap
HelloworldServer{ ...... def main(args: Array[String]): Unit = { //val host = args(0) val host = "localhost" val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345) val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv) rpcEnv.setupEndpoint("hello-service", helloEndpoint) rpcEnv.awaitTermination() } ...... }
這段代碼中有兩個主要流程,咱們分別來講promise
首先是下面這條代碼的運行流程:網絡
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)多線程
其實就是經過 NettyRpcEnvFactory 建立出一個 RPC Environment ,其具體類是 NettyRpcEnv 。併發
咱們再來看看建立過程當中會發生什麼。
object NettyRpcEnvFactory extends RpcEnvFactory { ...... def create(config: RpcEnvConfig): RpcEnv = { val conf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance] //根據配置以及地址,new 一個 NettyRpcEnv , val nettyEnv = new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress) //若是是服務端建立的,那麼會啓動服務。服務端和客戶端都會經過這個方法建立一個 NettyRpcEnv ,但區別就在這裏了。 if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => //啓動服務的方法,下一步就是調用這個方法了 nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv } ...... }
還沒完,若是是服務端調用這段代碼,那麼主要的功能是建立RPCEnv,即NettyRpcEnv(客戶端在後面說)。以及經過下面這行代碼,
nettyEnv.startServer(config.bindAddress, actualPort)
去調用相應的方法啓動服務端的服務。下面進入到這個方法中去看看。
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... def startServer(bindAddress: String, port: Int): Unit = { // here disable security val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList() //TransportContext 屬於 spark.network 中的部分,負責 RPC 消息在網絡中的傳輸 server = transportContext.createServer(bindAddress, port, bootstraps) //在每一個 RpcEndpoint 註冊的時候都會註冊一個默認的 RpcEndpointVerifier,它的做用是客戶端調用的時候先用它來詢問 Endpoint 是否存在。 dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) } ...... }
執行完畢以後這個create方法就結束。這個流程主要就是開啓一些服務,而後返回一個新的NettyRpcEnv。
這條代碼會去調用NettyRpcEnv中相應的方法
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) } ...... }
咱們看到,這個方法主要是調用dispatcher進行註冊的。dispatcher的功能上一節已經說了,
Dispatcher的主要做用是保存註冊的RpcEndpoint、分發相應的Message到RpcEndPoint中進行處理。Dispatcher便是上圖中ThreadPool的角色。它同時也維繫一個threadpool,用來處理每次接受到的 InboxMessage。而這裏處理InboxMessage是經過inbox實現的。
這裏咱們就說一說dispatcher的流程。
dispatcher在NettyRpcEnv被建立的時候建立出來。
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... //初始化時建立 dispatcher private val dispatcher: Dispatcher = new Dispatcher(this) ...... }
dispatcher類被建立的時候也有幾個屬性須要注意:
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) { ...... //每一個 RpcEndpoint 其實都會被整合成一個 EndpointData 。而且每一個 RpcEndpoint 都會有一個 inbox。 private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) } //一個阻塞隊列,當有 RpcEndpoint 相關請求(InboxMessage)的時候,就會將請求塞到這個隊列中,而後被線程池處理。 private val receivers = new LinkedBlockingQueue[EndpointData] //初始化便建立出來的線程池,當上面的 receivers 隊列中沒內容時,會阻塞。當有 RpcEndpoint 相關請求(即 InboxMessage )的時候就會馬上執行。 //這裏處理 InboxMessage 本質上是調用相應 RpcEndpoint 的 inbox 去處理。 private val threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool } ...... }
瞭解一些Dispatcher的邏輯流程後,咱們來正式看看Dispatcher的registerRpcEndpoint方法。
顧名思義,這個方法就是將RpcEndpoint註冊到Dispatcher中去。當有Message到來的時候,便會分發Message到相應的RpcEndPoint中進行處理。
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) { ...... def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) //註冊 RpcEndpoint 時須要的是 上面的 EndpointData ,其中就包含 endpointRef ,這個主要是供客戶端使用的。 val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) //多線程環境下,註冊一個 RpcEndpoint 須要判斷如今是否處於 stop 狀態。 synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } //新建 EndpointData 並存儲到一個 ConcurrentMap 中。 if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) //將 這個 EndpointData 加入到 receivers 隊列中,此時 dispatcher 中的 threadpool 會去處理這個加進來的 EndpointData //處理過程是調用它的 inbox 的 process()方法。而後 inbox 會等待消息到來。 receivers.offer(data) // for the OnStart message } endpointRef } ...... }
Spark RPC服務端邏輯小結:咱們說明了Spark RPC服務端啓動的邏輯流程,分爲兩個部分,第一個是RPC env,即NettyRpcEnv的建立過程,第二個則是RpcEndpoint註冊到dispatcher的流程。
1. NettyRpcEnvFactory 建立 NettyRpcEnv
2. Dispatcher註冊RpcEndpoint
依舊是以上一節 HelloWorld 的客戶端爲線索,咱們來逐層深刻在 RPC 中,客戶端 HelloworldClient 的 asyncCall() 方法。
object HelloworldClient { ...... def asyncCall() = { val rpcConf = new RpcConf() val config = RpcEnvClientConfig(rpcConf, "hello-client") val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service") val future: Future[String] = endPointRef.ask[String](SayHi("neo")) future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value") case scala.util.Failure(e) => println(s"Got error: $e") } Await.result(future, Duration.apply("30s")) rpcEnv.shutdown() } ...... }
建立Spark RPC客戶端Env(即NettyRpcEnvFactory)部分和Spark RPC服務端是同樣的,只是不會開啓監聽服務,這裏就不詳細展開。
咱們從這一句開始看,這也是Spark RPC客戶端和服務端區別的地方所在。
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
上面的的setupEndpointRef最終會去調用下面setupEndpointRef()這個方法,這個方法中又進行一次跳轉,跳轉去setupEndpointRefByURI這個方法中。須要注意的是這兩個方法都是RpcEnv裏面的,而RpcEnv是抽象類,它裏面只實現部分方法,而NettyRpcEnv繼承了它,實現了所有方法。
abstract class RpcEnv(conf: RpcConf) { ...... def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = { //會跳轉去調用下面的方法 setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) } def setupEndpointRefByURI(uri: String): RpcEndpointRef = { //其中 asyncSetupEndpointRefByURI() 返回的是 Future[RpcEndpointRef]。 這裏就是阻塞,等待返回一個 RpcEndpointRef。 // defaultLookupTimeout.awaitResult 底層調用 Await.result 阻塞 直到結果返回或返回異常 defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri)) } ...... }
這裏最主要的代碼其實就一句,
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
這一段能夠分爲兩部分,第一部分的defaultLookupTimeout.awaitResult其實底層是調用Await.result阻塞等待一個異步操做,直到結果返回。
而asyncSetupEndpointRefByURI(uri)則是根據給定的uri去返回一個RpcEndpointRef,它是在NettyRpcEnv中實現的:
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = { //獲取地址 val addr = RpcEndpointAddress(uri) //根據地址等信息新建一個 NettyRpcEndpointRef 。 val RpcendpointRef = new NettyRpcEndpointRef(conf, addr, this) //每一個新建的 RpcendpointRef 都有先有一個對應的verifier 去檢查服務端存不存在對應的 Rpcendpoint 。 val verifier = new NettyRpcEndpointRef( conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this) //向服務端發送請求判斷是否存在對應的 Rpcendpoint。 verifier.ask[Boolean](RpcEndpointVerifier.createCheckExistence(endpointRef.name)).flatMap { find => if (find) { Future.successful(endpointRef) } else { Future.failed(new RpcEndpointNotFoundException(uri)) } }(ThreadUtils.sameThread) } ...... }
asyncSetupEndpointRefByURI()這個方法實現兩個功能,第一個就是新建一個RpcEndpointRef。第二個是新建一個verifier,這個verifier的做用就是先給服務端發送一個請求判斷是否存在RpcEndpointRef對應的RpcEndpoint。
這段代碼中最重要的就是verifiter.ask[Boolean](...)了。若是有找到以後就會調用Future.successful這個方法,反之則會經過Future.failed拋出一個異常。
ask能夠算是比較核心的一個方法,咱們能夠到ask方法中去看看。
class NettyRpcEnv{ ...... private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = { val promise = Promise[Any]() val remoteAddr = message.receiver.address // def onFailure(e: Throwable): Unit = { // println("555"); if (!promise.tryFailure(e)) { log.warn(s"Ignored failure: $e") } } def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => println("666"); if (!promise.trySuccess(rpcReply)) { log.warn(s"Ignored message: $reply") } } try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { //跳轉到這裏執行 //封裝一個 RpcOutboxMessage ,同時 onSuccess 方法也是在這裏註冊的。 val rpcMessage = RpcOutboxMessage(serialize(message), onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) promise.future.onFailure { case _: TimeoutException => println("111");rpcMessage.onTimeout() // case _ => println("222"); }(ThreadUtils.sameThread) } val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { // println("333"); onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) //promise 對應的 future onComplete時會去調用,但當 successful 的時候,上面的 run 並不會被調用。 promise.future.onComplete { v => // println("4444"); timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => onFailure(e) } promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) } ...... }
這裏涉及到使用一些scala多線程的高級用法,包括Promise和Future。若是想要對這些有更加深刻的瞭解,能夠參考這篇文章。
這個函數的做用從名字中就能夠看得出,其實就是將要發送的消息封裝成一個RpcOutboxMessage,而後交給OutBox去發送,OutBox和前面所說的InBox對應,對應Actor模型中的MailBox(信箱)。用於發送和接收消息。
其中使用到了Future和Promise進行異步併發以及錯誤處理,好比當發送時間超時的時候Promise就會返回一個TimeoutException,而咱們就能夠設置本身的onFailure函數去處理這些異常。
OK,註冊完RpcEndpointRef後咱們即可以用它來向服務端發送消息了,而其實RpcEndpointRef發送消息仍是調用ask方法,就是上面的那個ask方法。上面也有介紹,本質上就是經過OutBox進行處理。
咱們來梳理一下RPC的客戶端的發送流程。
客戶端邏輯小結:客戶端和服務端比較相似,都是須要建立一個NettyRpcEnv。不一樣的是接下來客戶端建立的是RpcEndpointRef,並用之向服務端對應的RpcEndpoint發送消息。
1.NettyRpcEnvFactory建立NettyRpcEnv
2. 建立RpcEndpointRef
3. RpcEndpointRef使用同步或者異步的方式發送請求。
OK,以上就是SparkRPC時序的源碼分析。下一篇會將一個實際的例子,Spark的心跳機制和代碼。喜歡的話就關注一波吧
推薦閱讀 :
從分治算法到 MapReduce
Actor併發編程模型淺析
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什麼纔是好的程序員