Spark 中的 RPC

Spark 是一個 通用的分佈式計算系統,既然是分佈式的,必然存在不少節點之間的通訊,那麼 Spark 不一樣組件之間就會經過 RPC(Remote Procedure Call)進行點對點通訊。java

Spark 的 RPC 主要在兩個模塊中:apache

1,spark-core 中,主要承載了更好的封裝 server 和 client 的做用,以及和 scala 語言的融合,它依賴 spark-network-common 模塊;框架

2,spark-network-common 中,該模塊是 java 語言寫的,最新版本是基於 Netty 開發的;async

Spark 早期版本中使用 Netty 通訊框架作大塊數據的傳輸,使用 Akka 用做 RPC 通訊。自 Spark2.0 以後已經把 Akka 框架玻璃出去了(詳見SPARK-5293),是由於不少用戶會使用 Akka 作消息傳遞,會與 Spark 內嵌的版本產生衝突。在 Spark2.0 以後,基於底層的 spark-network-commen 模塊實現了一個相似 Akka Actor 消息傳遞模式的 scala 模塊,封裝在 spark-core 中。分佈式

看一張 UML 圖,圖內展現了 Spark RPC 模塊內的類的關係,白色的是 spark-core 中的類,黃色的 spark-common 中的類:spa

整個 Spark 的 RPC 模塊大概有幾個主要的類構成:.net

1,RpcEndPonit 和 RpcCallContext,RpcEndPoint 是一個能夠相應請求的服務,相似於 Akka 中的 Actor。其中有 receive 方法用來接收客戶端發送過來的信息,也有 receiveAndReply 方法用來接收並應答,應答經過 RpcContext 回調。能夠看下面代碼:scala

def receive: PartialFunction[Any, Unit] = {
    case _ => throw new RpcException(self + " does not implement 'receive'")
}

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new RpcException(self + " won't reply anything"))
}
複製代碼

2,RpcEndpointRef,相似於 Akka 中的 ActorRef,是 RpcEndPoint 的引用,持有遠程 RpcEndPoint 的地址名稱等,提供了 send 方法和 ask 方法用於發送請求。能夠看看 RpcEndPoint 內部的成員變量和方法:code

/** * return the address for the [[RpcEndpointRef]] */
  def address: RpcAddress

  def name: String

  /** * Sends a one-way asynchronous message. Fire-and-forget semantics. */
  def send(message: Any): Unit

  /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within the specified timeout. * * This method only sends the message once and never retries. */
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
複製代碼

3,RpcEnv 和 NettyRpcEnvcdn

RpcEnv 相似於 ActorSystem,服務端和客戶端均可以使用它來作通訊。

對於 server 端來講,RpcEnv 是 RpcEndpoint 的運行環境,負責 RpcEndPoint 的生命週期管理,解析 Tcp 層的數據包以及反序列化數據封裝成 RpcMessage,而後根據路由傳送到對應的 Endpoint;

對於 client 端來講,能夠經過 RpcEnv 獲取 RpcEndpoint 的引用,也就是 RpcEndpointRef,而後經過 RpcEndpointRef 與對應的 Endpoint 通訊。

RpcEnv 中有兩個最經常使用的方法:

// 註冊endpoint,必須指定名稱,客戶端路由就靠這個名稱來找endpoint
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef 

// 拿到一個endpoint的引用
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
複製代碼

NettyRpcEnv 是 spark-core 和 spark-network-common 的橋樑,內部 leverage 底層提供的通訊能力,同事包裝了一個類 Actor 的語義。

4,Dispatcher ,NettyRpcEnv 中包含 Dispatcher,主要針對服務端,幫助路由到指定的 RpcEndPoint,並調用起業務邏輯。

參考:

1,blog.csdn.net/justlpf/art…

2,zhuanlan.zhihu.com/p/28893155

相關文章
相關標籤/搜索