Livy基於netty構建了一個RPC通訊層。本篇咱們來探究一下Livy的RPC層的實現細節。讀者應當具有netty編程的基礎知識。web
RPC相關的代碼主要在rsc目錄和org.apache.livy.rsc包中。apache
Kryo
是一種對象序列化和反序列化工具。通訊雙方須要互相發送消息,livy選擇了Kryo做爲消息的編解碼器,並在netty框架中實現編碼和解碼接口:編程
class KryoMessageCodec extends ByteToMessageCodec<Object> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...} @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...} }
當請求消息到來時,netty首先會調用decode對消息進行解碼;當消息要發送到對端的最後關頭,netty會調用encode對消息進行編碼。緩存
livy的rpc通訊支持基於sasl的認證。因此在livy的rpc實現中,有一個叫SaslHandler
的SimpleChannelInboundHandler
。在正式通訊前,客戶端和服務端須要通過一次認證的過程。這裏不羅列代碼,可是將認證的過程作一個分析。回顧一下第三篇中核心架構細節部分的時序圖,一個session的建立過程爲:livyServer啓動一個RpcServer1
和一個SparkSubmit(提交driver)。這時有個細節是,livyServer會生成一個clientId
,記錄在內存中,並把clientId經過配置文件傳給driver。driver啓動後要鏈接RpcServer1
,就要帶上這個clientId。livy經過SaslMessage
消息來封裝clientId
:session
static class SaslMessage { final String clientId; final byte[] payload; SaslMessage() { this(null, null); } SaslMessage(byte[] payload) { this(null, payload); } SaslMessage(String clientId, byte[] payload) { this.clientId = clientId; this.payload = payload; } }
driver會先發送SaslMessage給RpcServer1
,livyServer收到後,從本身內存中尋找是否存在SaslMessage.clientId
,若是存在就算認證經過了。driver接下來才得以進一步發送其餘消息。架構
因此,一個rpc信道的創建分爲未認證階段和認證完成階段。livy是基於netty實現的通訊層,咱們知道netty是經過添加pipeline的方式添加處理環節的。在服務端完成bind,或者客戶端完成connect後的pipeline是這樣的:框架
客戶端經過發送hello
發起"認證"(認證的邏輯上面提到了)。認證完成後,SaslHandler
會從pipeline中移除
,並添加新的業務handler
,稱爲RpcDispatcher
。RpcDispatcher
根據功能不一樣有不一樣的實現。下面的代碼片斷中,SaslHandler
將自身從netty的pipeline中移除:異步
abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> { ... @Override protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg) throws Exception { LOG.debug("Handling SASL challenge message..."); ... // If negotiation is complete, remove this handler from the pipeline, and register it with // the Kryo instance to handle encryption if needed. ctx.channel().pipeline().remove(this); ... } ... }
下面的代碼片斷,在netty中添加須要的RpcDispatcher
:ide
void setDispatcher(RpcDispatcher dispatcher) { Utils.checkNotNull(dispatcher); Utils.checkState(this.dispatcher == null, "Dispatcher already set."); this.dispatcher = dispatcher; channel.pipeline().addLast("dispatcher", dispatcher); dispatcher.registerRpc(channel, this); }
RpcDispatcher
顧名思義是一種處理請求的分發器,負責把請求分發給合適的處理函數處理。在livy中只要是從鏈路中收到的消息都由RpcDispatcher
分發和處理。函數
消息分爲CALL
,REPLY
,ERROR
三類,從源碼的MessageHeader
看得出來:
static enum MessageType { CALL, REPLY, ERROR; } static class MessageHeader { final long id; final MessageType type; MessageHeader() { this(-1, null); } MessageHeader(long id, MessageType type) { this.id = id; this.type = type; } }
MessageHeader中包含請求id和請求type。
發起RPC請求一方,會將請求暫存在rpcCalls
緩存中,應答方會返回REPLY
或者ERROR
。請求方的RpcDispatcher
此時處理REPLY
,ERROR
的時候,從rpcCalls
中找到匹配的Promise
,並激活。下面的流程展現了這個過程:
上述利用Promise實現了一種典型的異步請求框架
對於CALL
類型的消息,RpcDispatcher
採用反射的方式,實現真正的分發動做,與許多web框架的作法十分類似。
在第五篇"解釋器的實現"中,提到的ReplDriver
就是一種RpcDispatcher
,回顧一下其中的handle方法:
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = { ... } def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = { ... } def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = { ... } def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = { ... }
綜上所述,經過反射,RpcDispatcher
將消息分發給對應的handle
方法處理。
在livy中包含以下幾種RpcDispatcher
:
RSCDriver
,處理通用Job
類消息。在driver側使用ReplDriver
,繼承自RSCDriver
,處理ReplJob
類消息,在driver側使用RegistrationHandler
,只處理RemoteDriverAddress
消息。是livyServer在啓動driver後,爲了可以接收driver反向發送過來的RemoteDriverAddress
。本篇從源碼角度,剖析了livy中rpc通訊的關鍵部分。livy採用kryo作編解碼;在通訊初期採用sasl進行認證和握手;完成認證後,採用反射實現了一套請求分發機制。此外,livy大量採用netty框架提供的Promise,提供了一種異步RPC機制,也值得學習和借鑑。