Livy探究(六) -- RPC的實現

Livy基於netty構建了一個RPC通訊層。本篇咱們來探究一下Livy的RPC層的實現細節。讀者應當具有netty編程的基礎知識。web

RPC相關的代碼主要在rsc目錄和org.apache.livy.rsc包中。apache

KryoMessageCodec

Kryo是一種對象序列化和反序列化工具。通訊雙方須要互相發送消息,livy選擇了Kryo做爲消息的編解碼器,並在netty框架中實現編碼和解碼接口:編程

class KryoMessageCodec extends ByteToMessageCodec<Object> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...}
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}
}

當請求消息到來時,netty首先會調用decode對消息進行解碼;當消息要發送到對端的最後關頭,netty會調用encode對消息進行編碼。緩存

SaslHandler

livy的rpc通訊支持基於sasl的認證。因此在livy的rpc實現中,有一個叫SaslHandlerSimpleChannelInboundHandler。在正式通訊前,客戶端和服務端須要通過一次認證的過程。這裏不羅列代碼,可是將認證的過程作一個分析。回顧一下第三篇中核心架構細節部分的時序圖,一個session的建立過程爲:livyServer啓動一個RpcServer1和一個SparkSubmit(提交driver)。這時有個細節是,livyServer會生成一個clientId,記錄在內存中,並把clientId經過配置文件傳給driver。driver啓動後要鏈接RpcServer1,就要帶上這個clientId。livy經過SaslMessage消息來封裝clientIdsession

static class SaslMessage {
  final String clientId;
  final byte[] payload;
  SaslMessage() {
    this(null, null);
  }
  SaslMessage(byte[] payload) {
    this(null, payload);
  }
  SaslMessage(String clientId, byte[] payload) {
    this.clientId = clientId;
    this.payload = payload;
  }
}

driver會先發送SaslMessage給RpcServer1,livyServer收到後,從本身內存中尋找是否存在SaslMessage.clientId,若是存在就算認證經過了。driver接下來才得以進一步發送其餘消息。架構

因此,一個rpc信道的創建分爲未認證階段和認證完成階段。livy是基於netty實現的通訊層,咱們知道netty是經過添加pipeline的方式添加處理環節的。在服務端完成bind,或者客戶端完成connect後的pipeline是這樣的:框架

image.png

客戶端經過發送hello發起"認證"(認證的邏輯上面提到了)。認證完成後,SaslHandler會從pipeline中移除,並添加新的業務handler,稱爲RpcDispatcherRpcDispatcher根據功能不一樣有不一樣的實現。下面的代碼片斷中,SaslHandler將自身從netty的pipeline中移除:異步

abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {
...
    @Override
    protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg)
    throws Exception {
        LOG.debug("Handling SASL challenge message...");
        ...
        // If negotiation is complete, remove this handler from the pipeline, and register it with
        // the Kryo instance to handle encryption if needed. 
        ctx.channel().pipeline().remove(this);
        ...
    }
...
}

下面的代碼片斷,在netty中添加須要的RpcDispatcheride

void setDispatcher(RpcDispatcher dispatcher) {
  Utils.checkNotNull(dispatcher);
  Utils.checkState(this.dispatcher == null, "Dispatcher already set.");
  this.dispatcher = dispatcher;
  channel.pipeline().addLast("dispatcher", dispatcher);
  dispatcher.registerRpc(channel, this);
}

RpcDispatcher

RpcDispatcher顧名思義是一種處理請求的分發器,負責把請求分發給合適的處理函數處理。在livy中只要是從鏈路中收到的消息都由RpcDispatcher分發和處理。函數

消息分爲CALLREPLYERROR三類,從源碼的MessageHeader看得出來:

static enum MessageType {
  CALL,
  REPLY,
  ERROR;
}
static class MessageHeader {
  final long id;
 final MessageType type;
 MessageHeader() {
    this(-1, null);
 }
  MessageHeader(long id, MessageType type) {
    this.id = id;
 this.type = type;
 }
}

MessageHeader中包含請求id和請求type。

發起RPC請求一方,會將請求暫存在rpcCalls緩存中,應答方會返回REPLY或者ERROR。請求方的RpcDispatcher此時處理REPLYERROR的時候,從rpcCalls中找到匹配的Promise,並激活。下面的流程展現了這個過程:

image.png

上述利用Promise實現了一種典型的異步請求框架

對於CALL類型的消息,RpcDispatcher採用反射的方式,實現真正的分發動做,與許多web框架的作法十分類似。

image.png

在第五篇"解釋器的實現"中,提到的ReplDriver就是一種RpcDispatcher,回顧一下其中的handle方法:

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = {
    ...
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = {
    ...
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = {
    ...
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = {
    ...
}

綜上所述,經過反射,RpcDispatcher將消息分發給對應的handle方法處理。

在livy中包含以下幾種RpcDispatcher

  • RSCDriver,處理通用Job類消息。在driver側使用
  • ReplDriver,繼承自RSCDriver,處理ReplJob類消息,在driver側使用
  • RegistrationHandler,只處理RemoteDriverAddress消息。是livyServer在啓動driver後,爲了可以接收driver反向發送過來的RemoteDriverAddress

總結

本篇從源碼角度,剖析了livy中rpc通訊的關鍵部分。livy採用kryo作編解碼;在通訊初期採用sasl進行認證和握手;完成認證後,採用反射實現了一套請求分發機制。此外,livy大量採用netty框架提供的Promise,提供了一種異步RPC機制,也值得學習和借鑑。

相關文章
相關標籤/搜索