spark 源碼分析之十--Spark RPC剖析之TransportResponseHandler、TransportRequestHandler和TransportChannelHandler剖析

spark 源碼分析之十--Spark RPC剖析之TransportResponseHandler、TransportRequestHandler和TransportChannelHandler剖析安全

TransportResponseHandler分析

先來看類說明:服務器

Handler that processes server responses, in response to requests issued from a [[TransportClient]]. It works by tracking the list of outstanding requests (and their callbacks). Concurrency: thread safe and can be called from multiple threads.網絡

即處理服務器響應的處理程序,以響應TransportClient發出的請求。它的工做原理是跟蹤未完成的請求(及其回調)列表。它是線程安全的。

其關鍵的成員字段做以下說明:ide

1. channel:與之綁定的SocketChannel對象源碼分析

2. outstandingFetches:是一個ConcurrentHashMap,主要保存StreamChunkId和ChunkReceivedCallback的映射關係。fetch

3. outstandingRpcs:是一個ConcurrentHashMap,主要保存 request id 和RpcResponseCallback的映射關係。this

4. streamCallbacks 是一個ConcurrentLinkedQueue隊列,保存了Pair<String, StreamCallback>,其中String是stream idspa

5. timeOfLastRequestNs:記錄了上次rpc 請求或 chunk fetching 的系統時間,以納秒計算.net

其關鍵方法 handle 以下:線程

 

TransportRequestHandler分析

類說明以下:

A handler that processes requests from clients and writes chunk data back. Each handler is attached to a single Netty channel, and keeps track of which streams have been fetched via this channel, in order to clean them up if the channel is terminated (see #channelUnregistered). The messages should have been processed by the pipeline setup by TransportServer.

它是一個handler,處理來自於client 的 請求,返回chunk 給 client。每個handler與一個netty channel 關聯,並追蹤那個chunk 已經被chennel獲取到了。其中消息應該已經被TransportServer創建起來的管道處理過了。

其成員變量說明以下:

1. channel: 是Channel對象,與之關聯的SocketChannel對象

2. reverseClient:是TransportClient對象,同一個channel 上的client,這樣,就能夠給消息的請求者通訊了

3. rpcHandler:是一個RpcHandler對象,處理全部的 RPC 消息

4. streamManager: 是一個StreamManager對象,返回一個流的 任意一部分chunk

5. maxChunksBeingTransferred: 正在傳輸的流的chunk 下標

其關鍵方法 handle 以下:

咱們只看一個分支做爲示例:

其調用了rpcHandler 的 receive 方法,該方法處理完畢後返回,若是成功,則返回RpcResponse對象,不然返回RpcResponse對象,因爲這個返回多是須要跨網絡傳輸的,因此,有進一步封裝了response 方法,以下:

即經過response 方法將server 端的請求結果返回給客戶端。 

TransportChannelHandler分析

類說明以下:

The single Transport-level Channel handler which is used for delegating requests to the TransportRequestHandler and responses to the TransportResponseHandler. All channels created in the transport layer are bidirectional. When the Client initiates a Netty Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server also gets a handle on the same Channel, so it may then begin to send RequestMessages to the Client. This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler, for the Client's responses to the Server's requests. This class also handles timeouts from a io.netty.handler.timeout.IdleStateHandler. We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not timeout if the client is continuously sending but getting no responses, for simplicity.

傳輸層的handler,負責委託請求給TransportRequestHandler,委託響應給TransportResponseHandler。

在傳輸層中建立的全部通道都是雙向的。當客戶端使用RequestMessage啓動Netty通道(由服務器的RequestHandler處理)時,服務器將生成ResponseMessage(由客戶端的ResponseHandler處理)。可是,服務器也會在同一個Channel上獲取句柄,所以它可能會開始向客戶端發送RequestMessages。這意味着客戶端還須要一個RequestHandler,而Server須要一個ResponseHandler,用於客戶端對服務器請求的響應。此類還處理來自io.netty.handler.timeout.IdleStateHandler的超時。若是存在未完成的提取或RPC請求可是至少在「requestTimeoutMs」上沒有通道上的流量,咱們認爲鏈接超時。請注意,這是雙工流量;若是客戶端不斷髮送可是沒有響應,咱們將不會超時。

關鍵方法channelRead以下:

該方法,負責將請求委託給TransportRequestHandler,將響應委託給TransportResponseHandler。

由於這個channel最終被添加到了channel上,因此消息從channel中傳輸(流出或流入)都會觸發這個方法,進而調用響應的方法。

即Spark RPC經過netty的channel發送請求,獲取響應。

相關文章
相關標籤/搜索