spark 源碼分析之十一--Spark RPC剖析之TransportClient、TransportServer剖析

TransportClient類說明

先來看,官方文檔給出的說明:html

Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow efficient transfer of a large amount of data, broken up into chunks with size ranging from hundreds of KB to a few MB. 
Note that while this client deals with the fetching of chunks from a stream (i.e., data plane), the actual setup of the streams is done outside the scope of the transport layer. The convenience method "sendRPC" is provided to enable control plane communication between the client and server to perform this setup. 
For example, a typical workflow might be: 
client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100 
client.fetchChunk(streamId = 100, chunkIndex = 0, callback) 
client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
... 
client.sendRPC(new CloseStream(100))
Construct an instance of TransportClient using TransportClientFactory. A single TransportClient may be used for multiple streams, but any given stream must be restricted to a single client, in order to avoid out-of-order responses. 
NB: This class is used to make requests to the server, while TransportResponseHandler is responsible for handling responses from the server. Concurrency: thread safe and can be called from multiple threads.java


用於獲取預先協商的流的連續塊的客戶端。此API容許有效傳輸大量數據,分解爲大小從幾百KB到幾MB的chunk。
注意,雖然該客戶端處理從流(即,數據平面)獲取chunk,可是流的實際設置在傳輸層的範圍以外完成。提供便利方法「sendRPC」以使客戶端和服務器之間的控制平面通訊可以執行該設置。
例如,典型的工做流程多是:bootstrap

// 打開遠程文件
client.sendRPC(new OpenFile(「/ foo」)) - >返回StreamId = 100安全

// 打開獲取遠程文件chunk-0
client.fetchChunk(streamId = 100,chunkIndex = 0,callback)服務器

// 打開獲取遠程文件chunk-1 
client.fetchChunk(streamId = 100,chunkIndex = 1,callback)
.. .併發

// 關閉遠程文件
client.sendRPC(new CloseStream(100))
使用TransportClientFactory構造TransportClient的實例。異步

單個TransportClient能夠用於多個流,可是任何給定的流必須限制在單個客戶端,以免無序響應。
注意:此類用於向服務器發出請求,而TransportResponseHandler負責處理來自服務器的響應。併發:線程安全,能夠從多個線程調用。async

簡言之,能夠認爲TransportClient就是Spark Rpc 最底層的基礎客戶端類。主要用於向server端發送rpc 請求和從server 端獲取流的chunk塊。ide

 

下面看一下類的結構:源碼分析

它有兩個內部類:RpcChannelListener和StdChannelListener,這兩個類的繼承關係以下:

其公共父類GenericFutureListener 官方說明以下:

Listens to the result of a Future. The result of the asynchronous operation is notified once this listener is added by calling Future.addListener(GenericFutureListener).

即,監聽一個Future 對象的執行結果,經過Future.addListener(GenericFutureListener)的方法,添加監聽器來監聽這個異步任務的最終結果。當異步任務執行成功以後,會調用監聽器的 operationComplete 方法。在StdChannelListener 中,其operationComplete 方法其實就是添加了日誌打印運行軌跡的做用,添加了異常的處理方法 handleFailure,它是一個空實現,以下:

其子類RpcChannelListener的handleFailure實現以下:

這個handleFailure 方法充當着失敗處理轉發的做用。其調用了 RpcResponseCallback (經過構造方法傳入)的 onFailure 方法。

再來看一下TransportClient 的主要方法解釋:

1. fetchChunk : Requests a single chunk from the remote side, from the pre-negotiated streamId. Chunk indices go from 0 onwards. It is valid to request the same chunk multiple times, though some streams may not support this. Multiple fetchChunk requests may be outstanding simultaneously, and the chunks are guaranteed to be returned in the same order that they were requested, assuming only a single TransportClient is used to fetch the chunks.其源碼以下:

 

2. stream:Request to stream the data with the given stream ID from the remote end.其源碼以下:

 

3. sendRpc:Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked with the server's response or upon any failure.

4. uploadStream:Send data to the remote end as a stream. This differs from stream() in that this is a request to *send* data to the remote end, not to receive it from the remote.

 

5. sendRpcSync:Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to a specified timeout for a response.

 

6. send:Sends an opaque message to the RpcHandler on the server-side. No reply is expected for the message, and no delivery guarantees are made.

7. removeRpcRequest:Removes any state associated with the given RPC.主要是從handler 中把監聽的rpcRequest移除。

8. close:close the channel

9. timeOut: Mark this channel as having timed out.

能夠看出,其主要是一個比較底層的客戶端,主要用於發送底層數據的request,主要是數據層面的流中的chunk請求或者是控制層面的rpc請求,發送數據請求的方法中都有一個回調方法,回調方法是用於處理請求返回的結果。

TransportClient初始化

它是由TransportClientFactory 建立的。看TransportClientFactory 的核心方法: createClient(java.net.InetSocketAddress)的關鍵代碼以下:

 1 // 1. 添加一個 ChannelInitializer 的 handler
 2 bootstrap.handler(new ChannelInitializer<SocketChannel>() {
 3  @Override 4 public void initChannel(SocketChannel ch) { 5 TransportChannelHandler clientHandler = context.initializePipeline(ch); 6  clientRef.set(clientHandler.getClient()); 7  channelRef.set(ch); 8  } 9 }); 10 11 // Connect to the remote server 12 long preConnect = System.nanoTime(); 13 // 2. 鏈接到遠程的服務端,返回一個ChannelFuture 對象,調用其 await 方法等待其結果返回。 14 ChannelFuture cf = bootstrap.connect(address); 15 // 3. 等待channelFuture 對象其結果返回。 16 if (!cf.await(conf.connectionTimeoutMs())) { 17 throw new IOException( 18 String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); 19 } else if (cf.cause() != null) { 20 throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); 21 }

在connect 方法中,初始化了handler。handler 被添加到ChannelPipiline以後,使用線程池來處理初始化操做,其調用了 DefaultChannelPipeline的callHandlerAdded0 方法,callHandlerAdded0調用了handler 的 handlerAdded 方法,handlerAdded內部調用了 initChannel 私有方法,initChannel又調用了保護抽象方法 initChannel,其會調用 ChannelInitializer自定義匿名子類的initChannel 方法。在這個 initChannel 方法中調用了TransportContext 的initializePipeline方法,在這個方法中實例化了 TransportClient對象。

咱們再來看一下TransportContext 的initializePipeline方法的核心方法createChannelHandler:

再來看 NettyRpcEnv 是如何初始化transportContext 的:

從上面能夠看到 rpcHandler 是NettyRpcHandler, 其依賴三個對象,Dispatcher 對象,nettyEnv 對象以及StreamManager 對象。

Dispatcher 對象已經有作說明,能夠看個人博客spark 源碼分析之六 -- Spark內置RPC機制剖析之二Dispatcher和Inbox剖析作進一步瞭解。

NettyEnv 對象就是NettyRpcEnv 對象。

NettyRpcHandler已經有作說明,能夠看個人博客spark 源碼分析之九 -- Spark內置RPC機制剖析之五StreamManager和NettyRpcHandler作進一步瞭解。

即channelRpcHandler 就是NettyRpcHandler實例。

關於TransportResponseHandler、TransportRequestHandler和TransportChannelHandler三個類的說明,能夠參照博客spark 源碼分析之十 -- Spark內置RPC機制剖析之六TransportResponseHandler、TransportRequestHandler和TransportChannelHandler剖析 作進一步瞭解。

 

TransportServer

官方說明:

Server for the efficient, low-level streaming service.

即:用於高效,低級別流媒體服務的服務器。

使用TransportContext createServer方法建立:

其構造方法源碼以下:

重點看其init方法:

ServerBootstrap是用於初始化Server的。跟TransportClientFactory建立TransportClient相似,也有ChannelInitializer的回調,跟Bootstrap相似。參照上面的剖析。

至此,TransClient和TransServer的剖析完畢。

相關文章
相關標籤/搜索