前段時間看spark,看着迷迷糊糊的。最近終於有點頭緒,先梳理了一下spark rpc相關的東西,先記錄下來。html
我的認爲,若是把分佈式系統(HDFS, HBASE,SPARK等)比做一我的,那麼RPC能夠認爲是人體的血液循環系統。它將系統中各個不一樣的組件(如Hbase中的master, Regionserver, client)聯繫了起來。一樣,在spark中,不一樣組件像driver,executor,worker,master(stanalone模式)之間的通訊也是基於RPC來實現的。git
Spark 1.6以前,spark的RPC是基於Akaa來實現的。Akka是一個基於scala語言的異步的消息框架。Spark1.6後,spark借鑑Akka的設計本身實現了一個基於Netty的rpc框架。大概的緣由是1.6以前,RPC經過Akka來實現,而大文件是基於netty來實現的,加之akka版本兼容性問題,因此1.6以後把Akka改掉了,具體jira見(https://issues.apache.org/jira/browse/SPARK-5293)。github
本文主要對spark1.6以後基於netty新開發的rpc框架作一個較爲深刻的分析。apache
spark 基於netty新的rpc框架借鑑了Akka的中的設計,它是基於Actor模型,各個組件能夠認爲是一個個獨立的實體,各個實體之間經過消息來進行通訊。具體各個組件之間的關係圖以下(圖片來自[1]):架構
表示一個個須要通訊的個體(如master,worker,driver),主要根據接收的消息來進行對應的處理。一個RpcEndpoint經歷的過程依次是:構建->onStart→receive→onStop。其中onStart在接收任務消息前調用,receive和receiveAndReply分別用來接收另外一個RpcEndpoint(也能夠是自己)send和ask過來的消息。框架
RpcEndpointRef是對遠程RpcEndpoint的一個引用。當咱們須要向一個具體的RpcEndpoint發送消息時,通常咱們須要獲取到該RpcEndpoint的引用,而後經過該應用發送消息。異步
表示遠程的RpcEndpointRef的地址,Host + Port。分佈式
RpcEnv爲RpcEndpoint提供處理消息的環境。RpcEnv負責RpcEndpoint整個生命週期的管理,包括:註冊endpoint,endpoint之間消息的路由,以及中止endpoint。函數
Rpc實現相關類之間的關係圖以下(圖片來自[2]):oop
核心要點以下:
1,核心的RpcEnv是一個特質(trait),它主要提供了中止,註冊,獲取endpoint等方法的定義,而NettyRpcEnv提供了該特質的一個具體實現。
2,經過工廠RpcEnvFactory來產生一個RpcEnv,而NettyRpcEnvFactory用來生成NettyRpcEnv的一個對象。
3,當咱們調用RpcEnv中的setupEndpoint來註冊一個endpoint到rpcEnv的時候,在NettyRpcEnv內部,會將該endpoint的名稱與其本省的映射關係,rpcEndpoint與rpcEndpointRef之間映射關係保存在dispatcher對應的成員變量中。
接下來,咱們看一個具體的案例:在standalone模式中,worker會定時發心跳消息(SendHeartbeat)給master,那心跳消息是怎麼從worker發送到master的呢,master又是怎麼接收消息的?
1,在worker中,forwordMessageScheduler線程會定時每隔心跳超時時間的四分之一時間向本身發送SendHeartbeat消息,在worker的receive函數中,咱們看到一旦接收到SendHeartbeat消息,worker會調用sendToMaster函數,將Heartbeat消息(包含worker Id和當前worker的rpcEndpoint引用)發送給master。
2,在worker的sendToMaster函數中,經過masterRef.send(message)將消息發送出去。那這個調用背後又作了什麼事情呢?NettryRpcEnv中send的實現以下:
能夠看到,當前發送地址(nettyEnv.address),目標的master地址(this)和發送的消息(SendHeartbeat)被封裝成了RequestMessage消息,若是是遠程rpc調用的話,最終send將調用postToOutbox函數,而且此時消息會被序列化成Byte流。
3,在postToOutbox函數中,消息將通過OutboxMessage中的sendWith方法(client.send(content)),最終經過TransportClient的send方法(client.send(content)),而在TransportClient中將消息進一步封裝,而後發送給master。
4, 在master端TransportRequestHandler的handle方法中,因爲心跳信息在worker端被分裝成了OneWayMessage,因此在該handle方法中,將調用processOneWayMessage進行處理。
5,processOneWayMessage函數將調用rpcHandler的實現類NettyRpcEnv中的receive方法。在該方法中,首先經過internalRecieve將消息解包成RequestMessage。而後該消息經過dispatcher的分發給對應的endpoint。
6,那消息是怎麼分發的呢?在Dispatcher的postMessage方法中,能夠看到,首先根據對應的endpoint的EndpointData信息(主要是該endpoint及其應用以及其信箱(inbox)),而後將消息塞到給endpoint(此例中的master)的信箱中,最後將消息塞到recievers的阻塞隊列中。
7,那隊列中的消息是怎麼被消費的呢?在Dispatcher中有一個線程池threadpool在MessageLoop類的run方法中,將receivers中的對象取出來,交由信箱的process方法去處理。若是沒有收到任何消息,將會阻塞在take處。
8,在inbox的proces方法中,首先取出消息,而後根據消息的類型(此例中是oneWayMessage),最終將調用endpoint的receiver方法進行處理(也就是master中的receive方法)。至此,整個一次rpc調用的流程結束。
本文主要對rpc的歷史,初始實現思想以及一次rpc的具體流程作了一個較爲深刻的分析。此外,對spark rpc實現涉及的一部分類也作了一個歸納性說明。這也是一個開始,解下來,會陸續對spark的一些內部原理作較爲深刻的分析。
[1] https://wongxingjun.github.io/2016/12/08/Spark-RPC%E8%A7%A3%E8%AF%BB/
[2] http://shiyanjun.cn/archives/1545.html