文檔對RpcEndpoint的解釋:
An end point for the RPC that defines what functions to trigger given a message. It is guaranteed that onStart, receive and onStop will be called in sequence. The life-cycle of an endpoint is: constructor -> onStart -> receive* -> onStop Note: receive can be called concurrently. If you want receive to be thread-safe, please use ThreadSafeRpcEndpoint If any error is thrown from one of RpcEndpoint methods except onError, onError will be invoked with the cause. If onError throws an error, RpcEnv will ignore it.安全
其子類繼承關係以下:網絡
其下面還有一個抽象子接口:ThreadSafeRpcEndpoint併發
文檔對ThreadSafeRpcEndpoint的解釋以下:
須要RpcEnv線程安全地向其發送消息的trait。線程安全意味着在經過相同的ThreadSafeRpcEndpoint處理一條消息完成後再處理下一個消息。換句話說,在處理下一條消息時,能夠看到對ThreadSafeRpcEndpoint的內部字段的更改,而且ThreadSafeRpcEndpoint中的字段不須要是volatile或等效的。可是,不能保證同一個線程將爲不一樣的消息執行相同的ThreadSafeRpcEndpoint。
即順序處理消息,不能同時併發處理。traint RpcEndpoint的方法以下:異步
對其變量和方法解釋以下:async
1. rpcEnv:RpcEndpoint 註冊的那個 RpcEnv 對象spa
2. self : RpcEndpoint 對應的 RpcEndpointRef。onStart 方法被調用的時候,RpcEndpointRef有效,onStop 調用後,self會是null,注意因爲在onStart以前,RpcEndpoint 尚未被註冊,尚未有效的RpcEndpointRef,因此不要在onStart以前調用 self 方法.net
3. receive :處理從RpcEndpointRef.send 或 RpcCallContext.reply 過來的消息,若是接收到一個未匹配的消息,會拋出 SparkException 而且發送給onError 方法線程
4. receiveAndReply:處理從RpcEndpointRef.ask發過來的消息,若是接收到一個未匹配的消息,會拋出 SparkException 而且發送給onError 方法3d
5. onError: 在消息處理過程當中,若是有異常都會調用此方法netty
6. onConnected:當remoteAddress 鏈接上當前節點時被調用
7. onDisconnected: 噹噹前節點丟失掉 remoteAddress 後被調用
8. onNetworkError:當鏈接當前節點和remoteAddress時,有網絡錯誤發生時被調用
9. onStart:在RpcEndpoint開始處理其餘消息以前被調用
10. onStop:當RpcEndpoint中止時被調用,self 將會是null,不能用於發送消息
11. stop: 中止RpcEndpoint
RpcEndPointRef:遠程的RpcEndpoint引用,RpcEndpointRef是線程安全的。
有一個跟RpcEndPoint 很像的類 -- RpcEndPointRef。先來看 RpcEndpointRef抽象類。下面咱們重點來看一下它內部構造。
首先看它的繼承結構:
它的父類是 RpcEndpointRef。先來剖析它的內部變量和方法的解釋:
有三個成員變量:
1. maxRetries: 最大嘗試鏈接次數。能夠經過 spark.rpc.numRetries 參數來指定,默認是 3 次。 該變量暫時沒有使用。
2. retryWaitMs:每次嘗試鏈接最大等待毫秒值。能夠經過 spark.rpc.retry.wait 參數來指定,默認是 3s。該變量暫時沒有使用。
3. defaultAskTimeout: spark 默認 ask 請求操做超時時間。 能夠經過 spark.rpc.askTimeout 或 spark.network.timeout參數來指定,默認是120s。
成員方法:
1. address : 抽象方法,返回 RpcEndpointRef的RpcAddress
2. name:抽象方法,返回 endpoint 的name
3. send: 抽象方法,Sends a one-way asynchronous message. Fire-and-forget semantics. 發送單向的異步消息,知足 即發即忘 語義。
4. ask:抽象方法。發送消息到相應的 RpcEndpoint.receiveAndReply , 並返回 Future 以在默認超時內接收返回值。它有兩個重載方法:其中沒有RpcTimeOut 的ask方法添加一個 defaultAskTimeout 參數繼續調用 有RpcTimeOut 的ask方法。
5. askSync:調用抽象方法ask。跟ask相似,有兩個重載方法:其中沒有RpcTimeOut 的askSync方法添加一個 defaultAskTimeout 參數繼續調用 有RpcTimeOut 的askSync方法。有RpcTimeOut 的askSync方法 會調用 ask 方法生成一個Future 對象,而後等待任務執行完畢後返回。
注意,這裏面其實就涉及到了模板方法模式。ask跟askSync都是設定好了,ask 要返回一個Future 對象,askSync則是 調用 ask 返回的Future 對象,而後等待 future 的 result 方法返回。
下面看RpcEndpointRef 的惟一實現類 - NettyRpcEndpointRef
RpcEndpointRef的NettyRpcEnv版本。此類的行爲取決於它的建立位置。在「擁有」RpcEndpoint的節點上,它是RpcEndpointAddress實例的簡單包裝器。在接收序列化版本引用的其餘計算機上,行爲會發生變化。實例將跟蹤發送引用的TransportClient,以便經過客戶端鏈接發送到端點的消息,而不須要打開新鏈接。此ref的RpcAddress能夠爲null;這意味着ref只能經過客戶端鏈接使用,由於託管端點的進程不會偵聽傳入鏈接。不該與第三方共享這些引用,由於它們將沒法向端點發送消息。
先來看 成員變量:
1. conf : 是一個SparkConf 實例
2. endpointAddress:是一個RpcEndpointAddress 實例,主要包含了 RpcAddress (host和port) 和 rpc endpoint name的信息
3. nettyEnv:是一個NettyRpcEnv實例
4. client: 是一個TransportClient實例,這個client 是不參與序列化的。
成員方法:
1. 實現並重寫了繼承自超類的ask方法, 以下:
2. 實現並重寫了繼承自超類的send方法,以下:
3. 關於序列化和反序列化的兩個方法:writeObject(序列化方法)和 readObject(反序列化方法),以下:
順便,咱們來看RequestMessage對象,代碼以下:
RequestMessage裏面的消息是sender 發給 receiver 的,RequestMessage主要負責sender RpcAddress, receiver RpcAddress,receiver rpcendpoint name以及 消息 content 的序列化。
總結: 本文主要剖析了 RpcEndpoint和RpcEntpointRef兩個類,順便,也介紹了支持序列化的 RequestMessage 類。
注:到目前爲止,Spark RPC組件尚未所有剖析完畢,預計還有三到四篇文章才能徹底剖析完, be patient 😊