Spark內置框架rpc通信機制及RpcEnv基礎設施-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java

Spark商業環境實戰及調優進階系列

1. Spark 內置框架rpc通信機制

TransportContext 內部握有建立TransPortClient和TransPortServer的方法實現,但卻屬於最底層的RPC通信設施。爲何呢?算法

由於成員變量RPCHandler是抽象的,並無具體的消息處理,並且TransportContext功能也在於建立TransPortClient客戶端和TransPortServer服務端。具體解釋以下:apache

Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
 setup Netty Channel pipelines with a
 {@link org.apache.spark.network.server.TransportChannelHandler}.
複製代碼

因此TransportContext只能爲最底層的通信基礎。上層爲NettyRPCEnv高層封裝,並持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通信回調Handler處理。TransportContext代碼片斷以下:bootstrap

/* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
 * channel. As each TransportChannelHandler contains a TransportClient, this enables server
 * processes to send messages back to the client on an existing channel.
 */
  public class TransportContext {
  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private final MessageEncoder encoder;
  private final MessageDecoder decoder;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }
複製代碼

1.1 客戶端和服務端統一的消息接收處理器 TransportChannelHandlerer

TransportClient 和TransportServer 在配置Netty的pipeLine的handler處理器時,均採用TransportChannelHandler, 來作統一的消息receive處理。爲何呢?在於統一消息處理入口,TransportChannelHandlerer根據消息類型執行不一樣的處理,代碼片斷以下:緩存

public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
   } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
   } else {
      ctx.fireChannelRead(request);
   }
複製代碼

}架構

TransportContext初始化Pipeline的代碼片斷:app

public TransportChannelHandler initializePipeline(
  SocketChannel channel,
  RpcHandler channelRpcHandler) {
  try {
    
  TransportChannelHandler channelHandler = createChannelHandler(channel,
  
  channelRpcHandler);
  channel.pipeline()
    .addLast("encoder", ENCODER)
    .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
    .addLast("decoder", DECODER)
    .addLast("idleStateHandler", new IdleStateHandler(0, 0,   
                   conf.connectionTimeoutMs() / 1000))
                   
    .addLast("handler", channelHandler);
    
  return channelHandler;
} catch (RuntimeException e) {
  logger.error("Error while initializing Netty pipeline", e);
  throw e;
}
複製代碼

客戶端和服務端統一的消息接收處理器 TransportChannelHandlerer 是這個函數:createChannelHandler(channel, channelRpcHandler)實現的,也即統一了這個netty的消息接受處理,代碼片斷以下:框架

/**
    * Creates the server- and client-side handler which is used to handle both RequestMessages and
    * ResponseMessages. The channel is expected to have been successfully created, though certain
    * properties (such as the remoteAddress()) may not be available yet.
    */
    
    private TransportChannelHandler createChannelHandler(Channel channel,                                    RpcHandler rpcHandler) {
    
    TransportResponseHandler responseHandler = new                     
    TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
    rpcHandler, conf.maxChunksBeingTransferred());
    
    return new TransportChannelHandler(client, responseHandler, requestHandler,
        conf.connectionTimeoutMs(), closeIdleConnections);
    }
複製代碼

不過transportClient對應的是TransportResponseHander,TransportServer對應的的是TransportRequestHander。 在進行消息處理時,首先會通過TransportChannelHandler根據消息類型進行處理器選擇,分別進行netty的消息生命週期管理:ide

  • exceptionCaught
  • channelActive
  • channelInactive
  • channelRead
  • userEventTriggered

1.2 transportClient對應的是ResponseMessage

客戶端一旦發送消息(均爲Request消息),就會在函數

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches

中緩存,用於回調處理。

1.3 transportServer對應的是RequestMessage

服務端接收消息類型(均爲Request消息)

  • ChunkFetchRequest
  • RpcRequest
  • OneWayMessage
  • StremRequest

服務端響應類型(均爲Response消息):

  • ChunkFetchSucess
  • ChunkFetchFailure
  • RpcResponse
  • RpcFailure

2. Spark RpcEnv基礎設施

2.1 上層建築NettyRPCEnv

上層建築NettyRPCEnv,持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通信回調Handler處理

  • Dispatcher
  • TransportContext
  • TransPortClientFactroy
  • TransportServer
  • TransportConf

2.2 RpcEndPoint 與 RPCEndPointRef 端點

  • RpcEndPoint 爲服務端
  • RPCEndPointRef 爲客戶端

2.2 Dispacher 與 Inbox 與 Outbox

  • 一個端點對應一個Dispacher,一個Inbox , 多個OutBox
  1. RpcEndpoint:RPC端點 ,Spark針對於每一個節點(Client/Master/Worker)都稱之一個Rpc端點 ,且都實現RpcEndpoint接口,內部根據不一樣端點的需求,設計不一樣的消息和不一樣的業務處理,若是須要發送(詢問)則調用Dispatcher
  2. RpcEnv:RPC上下文環境,每一個Rpc端點運行時依賴的上下文環境稱之爲RpcEnv
  3. Dispatcher:消息分發器,針對於RPC端點須要發送消息或者從遠程RPC接收到的消息,分發至對應的指令收件箱/發件箱。若是指令接收方是本身存入收件箱,若是指令接收方爲非自身端點,則放入發件箱
  4. Inbox:指令消息收件箱,一個本地端點對應一個收件箱,Dispatcher在每次向Inbox存入消息時,都將對應EndpointData加入內部待Receiver Queue中,另外Dispatcher建立時會啓動一個單獨線程進行輪詢Receiver Queue,進行收件箱消息消費
  5. OutBox:指令消息發件箱,一個遠程端點對應一個發件箱,當消息放入Outbox後,緊接着將消息經過TransportClient發送出去。消息放入發件箱以及發送過程是在同一個線程中進行,這樣作的主要緣由是遠程消息分爲RpcOutboxMessage, OneWayOutboxMessage兩種消息,而針對於須要應答的消息直接發送且須要獲得結果進行處理
  6. TransportClient:Netty通訊客戶端,根據OutBox消息的receiver信息,請求對應遠程TransportServer
  7. TransportServer:Netty通訊服務端,一個RPC端點一個TransportServer,接受遠程消息後調用Dispatcher分發消息至對應收發件箱

Spark在Endpoint的設計上核心設計即爲Inbox與Outbox,其中Inbox核心要點爲:

  1. 內部的處理流程拆分爲多個消息指令(InboxMessage)存放入Inbox
  2. 當Dispatcher啓動最後,會啓動一個名爲【dispatcher-event-loop】的線程掃描Inbox待處理InboxMessage,並調用Endpoint根據InboxMessage類型作相應處理
  3. 當Dispatcher啓動最後,默認會向Inbox存入OnStart類型的InboxMessage,Endpoint在根據OnStart指令作相關的額外啓動工做,端點啓動後全部的工做都是對OnStart指令處理衍生出來的,所以能夠說OnStart指令是相互通訊的源頭。
  • 注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,能夠看到 inbox在Dispacher 中且在EndPointData內部:

    private final RpcHandler rpcHandler;
    /**
    * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
    */
     private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     private class EndpointData(
        val name: String,
        val endpoint: RpcEndpoint,
        val ref: NettyRpcEndpointRef) {
      val inbox = new Inbox(ref, endpoint)
    }
    private val endpoints = new ConcurrentHashMap[String, EndpointData]
    private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
    
    // Track the receivers whose inboxes may contain messages.
    private val receivers = new LinkedBlockingQueue[EndpointData]
    複製代碼

  • 注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,能夠看到 OutBox在NettyRpcEnv內部:

    private[netty] class NettyRpcEnv(
      val conf: SparkConf,
      javaSerializerInstance: JavaSerializerInstance,
      host: String,
      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
      
      private val dispatcher: Dispatcher = new Dispatcher(this)
      
      private val streamManager = new NettyStreamManager(this)
      private val transportContext = new TransportContext(transportConf,
      new NettyRpcHandler(dispatcher, this, streamManager))
      
    /**
     * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
     * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
     */
    private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
    複製代碼

2.3 Dispacher 與 Inbox 與 Outbox

Dispatcher的代碼片斷中,包含了核心的消息發送代碼邏輯,意思是:向服務端發送一條消息,也即同時放進Dispatcher中的receiverrs中,也放進inbox的messages中。這個高層封裝,如Master和Worker端點發送消息都是經過NettyRpcEnv中的 Dispatcher來實現的。在Dispatcher中有一個線程,叫作MessageLoop,實現消息的及時處理。

/**
 * Posts a message to a specific endpoint.
 *
 * @param endpointName name of the endpoint.
 * @param message the message to post
  * @param callbackIfStopped callback function if the endpoint is stopped.
 */
 private def postMessage(
  endpointName: String,
  message: InboxMessage,
  callbackIfStopped: (Exception) => Unit): Unit = {
   val error = synchronized {
   val data = endpoints.get(endpointName)
   
  if (stopped) {
    Some(new RpcEnvStoppedException())
  } else if (data == null) {
    Some(new SparkException(s"Could not find $endpointName."))
  } else {
  
    data.inbox.post(message)
    receivers.offer(data)
    
    None
  }
 }
複製代碼

注意:默認第一條消息爲onstart,爲何呢?看這裏:

看到下面的 new EndpointData(name, endpoint, endpointRef) 了嗎?

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
 val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
  if (stopped) {
    throw new IllegalStateException("RpcEnv has been stopped")
  }
  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
    throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
  }
  val data = endpoints.get(name)
  endpointRefs.put(data.endpoint, data.ref)
  receivers.offer(data)  // for the OnStart message
}
endpointRef
複製代碼

}

注意EndpointData裏面包含了inbox,所以Inbox初始化的時候,放進了onstart

private class EndpointData(
  val name: String,
  val endpoint: RpcEndpoint,
  val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
複製代碼

}

onstart在Inbox初始化時出現了,注意每個端點只有一個inbox,好比:master 節點。

2.4 發送消息流程爲分爲兩種,一種端點(Master)本身把消息發送到本地Inbox,一種端點(Master)接收到消息後,經過TransPortRequestHander接收後處理,扔進Inbox

2.4.1 端點(Master)本身把消息發送到本地Inbox
- endpoint(Master) -> NettyRpcEnv-> Dispatcher ->  postMessage -> MessageLoop(Dispatcher) -> inbox -> process -> endpoint.receiveAndReply
複製代碼

解釋以下:端點經過本身的RPCEnv環境,向本身的Inbox中發送消息,而後交由Dispatch來進行消息的處理,調用了端點本身的receiveAndReply方法

  • 這裏着重講一下MessageLoop是何時啓動的,參照Dispatcher的代碼段以下,一旦初始化就會啓動,由於是成員變量:

    private val threadpool: ThreadPoolExecutor = {
      val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
        math.max(2, Runtime.getRuntime.availableProcessors()))
      val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
      for (i <- 0 until numThreads) {
        pool.execute(new MessageLoop)
      }
       pool
     }
    複製代碼
  • 接着講nettyRpcEnv是什麼時候初始化的,Dispatcher是什麼時候初始化的?

master初始化RpcEnv環境時,調用NettyRpcEnvFactory().create(config)進行初始化nettyRpcEnv,而後其成員變量Dispatcher開始初始化,而後Dispatcher內部成員變量threadpool開始啓動messageLoop,而後開始處理消息,可謂是一環套一環啊。以下是Master端點初始化RPCEnv。

在NettyRpcEnv中,NettyRpcEnvFactory的create方法以下:

其中nettyRpcEnv.startServer,代碼段以下,而後調用底層 transportContext.createServer來建立Server,並初始化netty 的 pipeline:

server = transportContext.createServer(host, port, bootstraps)
    dispatcher.registerRpcEndpoint(
     RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
複製代碼

最終端點開始不斷向本身的Inboxz中發送消息便可,代碼段以下:

private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
      error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
           Some(new RpcEnvStoppedException())
      } else if (data == null) {
          Some(new SparkException(s"Could not find $endpointName."))
      } else {
      
         data.inbox.post(message)
         receivers.offer(data)
         
         None
      }
    }
複製代碼
2.4.2 端點(Master)接收到消息後,經過TransPortRequestHander接收後處理,扔進Inbox
- endpointRef(Worker) ->TransportChannelHandler -> channelRead0 -> TransPortRequestHander -> handle -> processRpcRequest ->NettyRpcHandler(在NettyRpcEnv中)  -> receive ->  internalReceive -> dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) (響應)-> dispatcher.postRemoteMessage(messageToDispatch, callback) (發送遠端來的消息放進inbox)-> postMessage -> inbox -> process
複製代碼

以下圖展現了整個消息接收到inbox的流程:

下圖展現了 TransportChannelHandler接收消息:

@Override
 public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
 if (request instanceof RequestMessage) {
  requestHandler.handle((RequestMessage) request);
} else {
  responseHandler.handle((ResponseMessage) request);
}
 }
複製代碼

而後TransPortRequestHander來進行消息匹配處理:

最終交給inbox的process方法,實際上由端點 endpoint.receiveAndReply(context)方法處理:

/**
 * Process stored messages.
 */
 def process(dispatcher: Dispatcher): Unit = {
  var message: InboxMessage = null
    inbox.synchronized {
  if (!enableConcurrent && numActiveThreads != 0) {
    return
  }
  message = messages.poll()
  if (message != null) {
    numActiveThreads += 1
  } else {
    return
  }
}
while (true) {
  safelyCall(endpoint) {
    message match {
      case RpcMessage(_sender, content, context) =>
        try {
          endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
            throw new SparkException(s"Unsupported message $message from ${_sender}")
          })
        } catch {
          case NonFatal(e) =>
            context.sendFailure(e)
            // Throw the exception -- this exception will be caught by the safelyCall function.
            // The endpoint's onError function will be called.
            throw e
        }

      case OneWayMessage(_sender, content) =>
        endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
          throw new SparkException(s"Unsupported message $message from ${_sender}")
        })

      case OnStart =>
        endpoint.onStart()
        if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
          inbox.synchronized {
            if (!stopped) {
              enableConcurrent = true
            }
          }
        }

      case OnStop =>
        val activeThreads = inbox.synchronized { inbox.numActiveThreads }
        assert(activeThreads == 1,
          s"There should be only a single active thread but found $activeThreads threads.")
        dispatcher.removeRpcEndpointRef(endpoint)
        endpoint.onStop()
        assert(isEmpty, "OnStop should be the last message")

      case RemoteProcessConnected(remoteAddress) =>
        endpoint.onConnected(remoteAddress)

      case RemoteProcessDisconnected(remoteAddress) =>
        endpoint.onDisconnected(remoteAddress)

      case RemoteProcessConnectionError(cause, remoteAddress) =>
        endpoint.onNetworkError(cause, remoteAddress)
    }
  }

  inbox.synchronized {
    // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
    // every time.
    if (!enableConcurrent && numActiveThreads != 1) {
      // If we are not the only one worker, exit
      numActiveThreads -= 1
      return
    }
    message = messages.poll()
    if (message == null) {
      numActiveThreads -= 1
      return
    }
  }
}
複製代碼

}

3 結語

本文花了將近兩天時間進行剖析Spark的 Rpc 工做原理,真是不容易,關鍵是你看懂了嗎?歡迎評論

秦凱新

相關文章
相關標籤/搜索