Spark是最近幾年已經算是最爲成功的大數據計算框架,那麼此次咱們就來介紹它內部的一個小點,Spark RPC框架。html
在介紹以前,咱們須要先說明什麼是RPC,引用百度百科:java
RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。react
Spark RPC能夠說是Spark分佈式集羣的基礎,如果將Spark類比爲一我的的話,Spark RPC無疑就是它的血液部分。而在Spark1.6以前,它的RPC部分仍是用akka實現的,但以後底層就換成了netty來實現。爲何要這樣作呢?由於啊,這樣將Spark和Akka耦合在了一塊兒,若是你係統自己就有使用到Akka,而後又想使用Spark的話,那兩個Akka框架版本不一致可怎麼辦呀,這無疑是很讓人頭痛的。Spark團隊正是考慮到了這一點,因此將Akka替換成了netty。git
此次咱們就來看看Spark是如何讓它的血液流動起來的吧。有一位大神將Spark RPC中的RPC部分剝離出來,弄成一個新的可運行的 RPC 項目,這個項目自己就能夠看成一個簡易的Akka來使用,地址在這Spark RPC。程序員
雖然名字不同,但這個項目的類和內容基本和Spark Core中RPC部分的代碼和結構基本是同樣的,這樣咱們就能夠經過這個來學習Spark RPC框架。github
PS:所用spark版本:spark2.1.0算法
咱們程序員學東西最喜歡從一個Hello world開始,那麼接下來咱們就來演示如何下載並運行最簡單的Hello World例子吧。編程
首先,我使用的編譯器是IDEA,經過idea將github上的代碼clone下來。
能夠看到項目目錄下有兩個模塊,網絡
kraps-rpc存放的是Spark RPC的源代碼,而咱們要作的便是運行 kraps-rpc-example中的示例代碼。架構
啓動PRC的話首先須要啓動Server端,開啓監聽服務,而後才能經過Client進行訪問。這裏在HelloworldServer.scala中都已經幫咱們寫好,不過在main方法中須要修改一下內容,就是將host改成本機地址。
def main(args: Array[String]): Unit = { // val host = args(0) val host = "localhost" val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345) val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv) rpcEnv.setupEndpoint("hello-service", helloEndpoint) rpcEnv.awaitTermination() }
而後咱們只須要右鍵該文件而後執行便可。
接下來咱們就須要啓動Client端代碼,咱們先到HelloworldClient文件中,這裏面提供了同步和異步兩個方法能夠運行。代碼一樣都已經寫好,經過修改註釋便可使用不一樣的方法運行。一樣是右鍵點擊該文件執行。
def main(args: Array[String]): Unit = { //異步方法 //asyncCall() //同步方法 syncCall() }
異步方法中,ask會返回一個Future(注意這裏的Future是scala中的Future,和java的是不同的)。而且在Future運行結果出來前,咱們能夠去作其餘事情(異步的優點所在)。scala中的Future和Java的Future有些不一樣,不過這能夠先不去管,先看成Java裏面的Future便可。
def asyncCall() = { val rpcConf = new RpcConf() val config = RpcEnvClientConfig(rpcConf, "hello-client") val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service") val future: Future[String] = endPointRef.ask[String](SayHi("neo")) future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value") case scala.util.Failure(e) => println(s"Got error: $e") } Await.result(future, Duration.apply("3s")) //在future結果運行出來前,會先打印這條語句。 println("print me at first!") Thread.sleep(7) }
而同步方法是直接將結果返回,而且會阻塞,這個時間內你沒法作其餘事情,只能等待,直到結果返回。
def syncCall() = { val rpcConf = new RpcConf() val config = RpcEnvClientConfig(rpcConf, "hello-client") val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service") val result = endPointRef.askWithRetry[String](SayBye("neo")) println(result) }
很簡單是吧,運行過例子後,咱們就能夠來了解一些Spark RPC運行過程當中相當重要的兩個編程模型,以及在這其中使用到的一些主要的類。
Spark RPC是使用了Actor模型和Reactor模型的混合模式,咱們結合兩種模型分別說明Spark RPC中各個類的做用:
首先咱們先來看Spark RPC的類圖。
是否是感受很亂?沒事,咱們來逐步剖析各個類。
爲了更加清楚了說明各個類的關係,咱們要先知道兩個模型,分別是Actor模型和Reactor模型,咱們將從這兩個模型的角度來拆解各個類的關係。
其實以前也有寫過一篇介紹Actor模型的文章,感興趣的同窗能夠點擊這裏查看Actor模型淺析。
其實Actor主要就是這副圖的內容:
在Spark RPC中有幾個類分別與Actor模型中的各個角色對應,對應以下,左邊的是Spark RPC中的類,右邊的是Actor模型中的角色:
RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem
咱們逐個來看:
RPC Environment 是 RpcEndpoint 的運行環境。它管理 RpcEndpoint 的整個生命週期:
RPC Environment在akka已經被移除的2.0後面版本中,RPC Environment的實現類是NettyRpcEnv。一般是由NettyRpcEnvFactory.create建立。
RpcEndpoint能經過callbacks接收消息。一般須要咱們本身寫一個類繼承RpcEndpoint。編寫本身的接收信息和返回信息規則。
RpcEndpoint的生命週期被RPC Environment管理。其生命週期包括,onStart,receive和onStop。
它是做爲服務端,好比上面例子中的HelloworldServer就是一個RpcEndpoint。
RpcEndpointRef是RpcEndpoint在RPC Environment中的一個引用。
它包含一個地址(即Spark URL)和名字。RpcEndpointRef做爲客戶端向服務端發送請求並接收返回信息,一般能夠選擇使用同步或異步的方式進行發送。
Spark RPC採用Actor模型和Reactor模型混合的結構,上面已經介紹了Actor,那麼如今咱們就來介紹Reactor模型,一樣,咱們能夠從一張圖來看Reactor的架構。
使用Reactor模型,由底層netty建立的EventLoop作I/O多路複用,這裏使用Multiple Reactors這種形式,如上圖所示,從netty的角度而言,Main Reactor和Sub Reactor對應BossGroup和WorkerGroup的概念,前者負責監聽TCP鏈接、創建和斷開,後者負責真正的I/O讀寫。
而圖中的ThreadPool就是的Dispatcher中的線程池,它來解耦開來耗時的業務邏輯和I/O操做,這樣就能夠更scalabe,只須要少數的線程就能夠處理成千上萬的鏈接,這種思想是標準的分治策略,offload非I/O操做到另外的線程池。
Dispatcher的主要做用是保存註冊的RpcEndpoint、分發相應的Message到RpcEndPoint中進行處理。Dispatcher便是上圖中ThreadPool的角色。它同時也維繫一個threadpool,用來處理每次接受到的InboxMessage。而這裏處理InboxMessage是經過inbox實現的。
Inbox其實屬於Actor模型,是Actor中的信箱,不過它和Dispatcher聯繫緊密因此放這邊。
InboxMessage有多個實現它的類,好比OneWayMessage,RpcMessage,等等。Dispatcher會將接收到的InboxMessage分發到對應RpcEndpoint的Inbox中,而後Inbox便會處理這個InboxMessage。
OK,此次就先介紹到這裏,下次咱們從代碼的角度來看Spark RPC的運行機制
若是以爲對你有幫助,不妨關注一波吧~~
參考資料:https://zhuanlan.zhihu.com/p/28893155
推薦閱讀:
從分治算法到 MapReduce
Actor併發編程模型淺析
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什麼纔是好的程序員