spark 源碼分析之九--Spark RPC剖析之StreamManager和RpcHandler

StreamManager

StreamManager類說明

StreamManager 官方說明以下:apache

The StreamManager is used to fetch individual chunks from a stream. This is used in TransportRequestHandler in order to respond to fetchChunk() requests. Creation of the stream is outside the scope of the transport layer, but a given stream is guaranteed to be read by only one client connection, meaning that getChunk() for a particular stream will be called serially and that once the connection associated with the stream is closed, that stream will never be used again.
 
即StreamManager用於從流中獲取單個塊。這在TransportRequestHandler中用於響應fetchChunk()請求。流的建立超出了傳輸層的範圍,可是保證給定流只能由一個客戶端鏈接讀取,這意味着特定流的getChunk()將被串行調用,而且一旦與流關聯的鏈接關閉後,該流將永遠不會再次使用。

StreamManager兩個子類實現

它有兩個子類:

能夠看到它有兩個實現,一個實現是OneForOneStreamManager,也就是說的getChunk 方法被串行調用,一旦關閉再也不使用。其關鍵方法 getChunk 以下:數組

其中,sream 維護了 streamId 和 StreamState 的映射關係。app

StreamState 的類結構以下:socket

appId 是TransportClient 的 id 號;ide

associatedChannel 表示 與之關聯的 channel 對象;post

buffer 是 一個 迭代的 ManagedBuffer 對象,其中ManagedBuffer是一個不可變的byte數組的抽象;fetch

chunksBeingTransferred保存了正在傳輸的chunk number;spa

curChunk 保存了已經完成的chunk 的 下標索引。.net

 

另外一個實現是NettyStreamManager是用於支持管理器註冊資源的,主要被SparkContext 使用,它不支持getChunk 這個關鍵的方法。日誌

我的觀點,這個類不該該繼承StreamManager 了,由於它的關鍵功能 getChunk 都不能用了。

RpcHandler

NettyRpcHandler繼承了RpcHandler,並實現了 logging trait。

超類RpcHandler

其官方解釋以下:

Handler for sendRPC() messages sent by TransportClients.

即它是處理 TransportClient 發送的 rpc 消息的handler。也就是說,這是在server 端使用的

其類結構以下:

其關鍵方法解釋以下:
1. receiveStream 方法:接收單個RPC消息,其中包括要做爲流接收的數據。在此方法中拋出的任何異常將以字符串形式做爲標準RPC故障發送回客戶端。對於單個TransportClient(即通道),不會並行調用此方法和receive方法。從流中讀取數據時出錯(org.apache.spark.network.client.StreamCallback.onData(String,ByteBuffer))將致使整個通道失敗。在org.apache.spark.network.client.StreamCallback.onComplete(String)中對流進行「後處理」失敗將致使rpcFailure,但該通道將保持活動狀態。
2. receive 方法:接收單個RPC消息,其中包括要做爲流接收的數據。在此方法中拋出的任何異常將以字符串形式做爲標準RPC故障發送回客戶端。對於單個TransportClient(即通道),不會並行調用此方法和#receive。從流中讀取數據時出錯(org.apache.spark.network.client.StreamCallback.onData(String,ByteBuffer))將致使整個通道失敗。在org.apache.spark.network.client.StreamCallback.onComplete(String)中對流進行「後處理」失敗將致使rpcFailure,但該通道將保持活動狀態。
有兩個重載的 receive 方法,其關係以下:

ONE_WAY_CALLBACK 方法是一個默認的OneWayCallback 實現,主要是用於打印日誌track。 

NettyRpcHandler

子類NettyRpcHandler 的官方說明以下:

Dispatches incoming RPCs to registered endpoints. The handler keeps track of all client instances that communicate with it, so that the RpcEnv knows which TransportClient instance to use when sending RPCs to a client endpoint (i.e., one that is not listening for incoming connections, but rather needs to be contacted via the client socket). Events are sent on a per-connection basis, so if a client opens multiple connections to the RpcEnv, multiple connection / disconnection events will be created for that client (albeit with different RpcAddress information).

即,它是負責將傳入的RPC調度到已註冊的端點上的handler。它跟蹤與之通訊的全部客戶端實例,以便RpcEnv知道在將RPC發送到客戶端端點時使用哪一個TransportClient實例(即,一個不監聽傳入鏈接,但須要經過客戶端套接字)。事件是基於每一個鏈接發送的,所以若是客戶端打開與RpcEnv的多個鏈接,將爲該客戶端建立多個鏈接/斷開鏈接事件(儘管具備不一樣的RpcAddress信息)。

其關鍵方法以下:

首先它會根據傳進來的TransportClient的channel獲取到 remoteAddress 的信息,而後和ByteBuffer 類型的message 進一步封裝成RequestMessage而後將接收進來的事件post給Dispatcher對象,Dispatcher再作進一步分發。

相關文章
相關標籤/搜索