Spark-RPC理解

基本架構

Akka Actor式RPC架構

  1. Spark採用的是AkkaActor架構實現RPC,可是實際使用過程爲了兼容不一樣節點之間的文件下載,採用Netty來實現Actor功能。
  2. Spark RPC由三部分組成:
  • RpcEnv RPC的執行上下文,等同於ActorSystem,用於管理RpcEndpoint和RpcEndpointRef
  • RpcEndpoint RPC通訊實體的抽象,等同於Actor,用於接收客戶端發送來的請求,方法主要有receive,onConnected, onDisconnnected, onStart, onStop, onError等
  • RpcEndpointRef RPC通訊實體的引用,等同於ActorRef,在客戶端被調用,用來向服務端請求,主要方法是ask和askWithRetry網絡

    核心組件

    Dispatcher

  • InboxMessage: 外部發送過來的消息(onStart, onStop, RPCMessage, OneWayMessage...)
  • EndpointData: 包裝(RpcEndpoint, NettyRpcEndpointRef, Inbox(InboxMessge隊列))
  • MessageLoop: 經過線程池調度,讀取阻塞隊列中是否有消息,有的話就直接讀取,不然阻塞
  • Inbox來源(消息來源):
  • [x] - 註冊RpcEndpoint(會生成OnStart消息)
  • [x] - 去註冊RpcEndpoint(會生成onStop消息)
  • [x] - postMessage, 投遞消息給指定的RpcEndpoint
  • [x] - 中止Dispatcher架構

    TransportClientFactory

    RPC客戶端的工廠類,用於批量生成TransportClient
  • ClientPool,ClientFactory內部經過<sokectAddress, ClientPool> 創建套接字(Socket網絡鏈接)與ClientPool(TransportClient)的關聯,同時經過object與TransportClient創建1V1的鎖關聯關係;即對於一個socket,會有多個TransportClient與其關聯,spark經過每個TransportClient使用不一樣的lock(object),來進行並行,本質仍是一個利用線程池(鏈接緩衝池)的思想
    其類型定義爲
class TransClientFactory {
    ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
}
class ClientPool {
    TransportClient[] clients;
    Object[] locks;
}
  • TransportClient
    包含5種發送消息的方法: fetchChunk, stream, sendRPC, sendRPCSyns, send
  • TransportClientBootstrap 由TransportContext傳入,啓動加載(如 SAAL和加密認證之類的啓動操做)框架

    TransportContext

    經過createClientFactory建立TransportClientFactory,間接經過createClient建立TransportClient; 經過createServer建立TransServer實例
  • TransportConf 配置穩健加載
  • RpcHandler,是一個abstract類,實現類爲NettyRpcHandler,internalReceive負責將ByteBuffer轉換成RequestMessage; postMessage用於投遞消息, 而後交由對應的RPCEndpoint處理
val msgDispatch = internalReceive(client, message)
dispatcher.postMessage(msgDispatch, callback)
  • NettyStreamMessage: 提供文件服務能力socket

    NettyRPCEnv

  • timeoutScheduler 超時請求的調度器,使用的ScheduleredExcutorService
  • clientConnectExecutor
  • outboxes: 在send()時在messages中add消息,而後調用drainOutbox()循環遍歷發送messages中全部消息;drainOutbox()在沒有client時會調用launchuConnectTask()建立TransportClient
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
class Outbox {
    nettyEnv;   //所在環境
    address;    //遠端NettyRpcEnv地址
    messages;   //向外發送的消息列表
    client; // TransportClient
    connectFuture; //鏈接任務的Future引用
    stopped; //是否中止
    draining;   //Outbox正有線程處理消息
}
  • RPC客戶端發送請求流程
  1. 調用NettyRpcEndpointRef的send/ask方法向RpcEndpoint發送消息;

a) 若是是同一節點,直接使用Dispatcher的postLocalMessage和postOneWayMessage,直接將消息放入EndpointData的Inbox中;oop

b) 若是發送方在遠處,將消息封裝成OutboxMessage,放入遠端RpcEndpoint對應的Outbox的messages列表中;post

  1. Outbox的drainOutbox循環從messages獲取OutboxMessage,調用TransportClient向遠端發送消息;
  2. 與遠端的TransportServer創建鏈接以後,經Netty管道,NettyRpcHandler處理,投遞到遠端的Dispatcher的EndpointData的Inbox中進行處理fetch

    TransportServer

    -TransportRequestHandler:主要是handle()方法,該方法根據request的類型,調用不一樣的 processXX()方法進行處理
processFetchRequest 處理獲取塊請求
processRPCRequest 處理RPC請求
processStreamRequest 處理Stream請求
processOneWayMessage 處理無需回覆的請求

RPC服務端實現加密

  • TransportServerspa

    要點總結

  1. Spark RPC是用Netty實現了數據流傳輸,以及Actor這種RPC框架的,其中NettyRpcEnv至關於ActorySysm, RpcEndpoint至關於Actor(遠端的服務,或者說接口,註冊在服務端), RpcEndpointRef至關於ActorRef(服務引用,在客戶端使用),雙方通訊經過Message這個載體;
  2. 客戶端發送消息時,經過<address, Outbox[messages, client]>這種結構,向address不斷地發送消息;
  3. 服務端經過NettyRpcHandler進行消息的receive,轉換成InboxMessage,放入Dispatcher中,Dispatcher使用messageLoop循環遍歷Inbox,取出InboxMessage,根據消息路由,調用相應方法進行處理,即路由功能
相關文章
相關標籤/搜索