spark 源碼分析之六--Spark RPC剖析之Dispatcher和Inbox、Outbox剖析

在上篇 spark 源碼分析之五 -- Spark內置RPC機制剖析之一建立NettyRPCEnv 中,涉及到了Diapatcher 內容,未作過多的剖析。本篇來剖析一下它的工做原理。html

Dispatcher 是消息的分發器,負責將消息分發給適合的 endpoint安全

其實這個類仍是比較簡單的,先來看它的類圖:多線程

咱們從成員變量入手分析整個類的內部構造和機理:app

1. endpoints是一個 ConcurrentMap[String, EndpointData], 負責存儲 endpoint name 和 EndpointData 的映射關係。其中,EndpointData又包含了 endpoint name, RpcEndpoint 以及 NettyRpcEndpointRef 的引用以及Inbox 對象(包含了RpcEndpoint 以及 NettyRpcEndpointRef 的引用)。oop

2. endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] 包含了 RpcEndpoint 和 RpcEndpointRef 的映射關係。源碼分析

3. receivers 是一個 LinkedBlockingQueue[EndpointData] 消息阻塞隊列,用於存放 EndpointData 對象。它主要用於追蹤 那些可能會包含須要處理消息receiver(即EndpointData)。在post消息到Dispatcher 時,通常會先post 到 EndpointData 的 Inbox 中, 而後,再將 EndpointData對象放入 receivers 中,源碼以下:post

// Posts a message to a specific endpoint.
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      // 1. 先根據endpoint name從路由中找到data
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        // 2. 將待消費的消息發送到 inbox中
        data.inbox.post(message)
        // 3. 將 data 放到待消費的receiver 中
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

4. stopped 標誌 Dispatcher 是否已經中止了
5. threadpool 是 ThreadPoolExecutor 對象, 其中的 線程的 core 數量的計算以下:
val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, availableCores))
獲取到線程數以後, 會初始化 一個固定的線程池,用來執行 MessageLoop 任務,MessageLoop 是一個Runnable 對象。它會不停地從 receiver 堵塞隊列中, 把放入的 EndpointData對象取出來,而且去調用其inbox成員變量的 process 方法。
6. PoisonPill 是一個空的EndpointData對象,起了一個標誌位的做用,若是想要中止 Diapatcher ,會把PoisonPill 餵給 receiver 吃,當threadpool 執行 MessageLoop 任務時, 吃到了毒藥,立刻退出,線程也就死掉了。PoisonPill命名很形象,關閉線程池的方式也是優雅的,是值得咱們在工做中去學習和應用的。學習

從上面的成員變量分析部分能夠知道,數據經過  postMessage  方法將 InboxMessage 數據 post 到 EndpointData的Inbox對象中,並將待處理的EndpointData 對象放入到 receivers 中,線程池會不斷從這個隊列中拿數據,分發數據。this

引出Inbox

其實,data 就包含了 RpcEndpoint 和 RpcEndpointRef 對象,本能夠在Dispatcher 中就能夠調用 endpoint 的方法去處理。爲何還要設計出來一個 Inbox 層次的抽象呢?
下面咱們就趁熱剖析一下 Inbox 這個對象。spa

Inbox剖析

Inbox 的官方解釋:
An inbox that stores messages for an RpcEndpoint and posts messages to it thread-safely.
其實就是它爲RpcEndpoint 對象保存了消息,而且將消息 post給 RpcEndpoint,同時保證了線程的安全性。

類圖以下:

 

跟 put 和 get 語義類似的有兩個方法, 分別是post 和 process。其實這兩個方法都是給 Dispatcher 對象調用的。post 將數據 存放到 堵塞消息隊列隊尾, pocess 則堵塞式 從消息隊列中取出數據來,並處理之。

這兩個關鍵方法源碼以下:

  1 def post(message: InboxMessage): Unit = inbox.synchronized {
  2     if (stopped) {
  3       // We already put "OnStop" into "messages", so we should drop further messages
  4       onDrop(message)
  5     } else {
  6       messages.add(message)
  7       false
  8     }
  9 }
 10 
 11 
 12 /**
 13    * Calls action closure, and calls the endpoint's onError function in the case of exceptions.
 14    */
 15   private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
 16     try action catch {
 17       case NonFatal(e) =>
 18         try endpoint.onError(e) catch {
 19           case NonFatal(ee) =>
 20             if (stopped) {
 21               logDebug("Ignoring error", ee)
 22             } else {
 23               logError("Ignoring error", ee)
 24             }
 25         }
 26     }
 27 }
 28 
 29 /**
 30    * Process stored messages.
 31    */
 32   def process(dispatcher: Dispatcher): Unit = {
 33     var message: InboxMessage = null
 34     inbox.synchronized {
 35       if (!enableConcurrent && numActiveThreads != 0) {
 36         return
 37       }
 38       message = messages.poll()
 39       if (message != null) {
 40         numActiveThreads += 1
 41       } else {
 42         return
 43       }
 44     }
 45     while (true) {
 46       safelyCall(endpoint) {
 47         message match {
 48           case RpcMessage(_sender, content, context) =>
 49             try {
 50               endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
 51                 throw new SparkException(s"Unsupported message $message from ${_sender}")
 52               })
 53             } catch {
 54               case e: Throwable =>
 55                 context.sendFailure(e)
 56                 // Throw the exception -- this exception will be caught by the safelyCall function.
 57                 // The endpoint's onError function will be called.
 58                 throw e
 59             }
 60 
 61           case OneWayMessage(_sender, content) =>
 62             endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
 63               throw new SparkException(s"Unsupported message $message from ${_sender}")
 64             })
 65 
 66           case OnStart =>
 67             endpoint.onStart()
 68             if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
 69               inbox.synchronized {
 70                 if (!stopped) {
 71                   enableConcurrent = true
 72                 }
 73               }
 74             }
 75 
 76           case OnStop =>
 77             val activeThreads = inbox.synchronized { inbox.numActiveThreads }
 78             assert(activeThreads == 1,
 79               s"There should be only a single active thread but found $activeThreads threads.")
 80             dispatcher.removeRpcEndpointRef(endpoint)
 81             endpoint.onStop()
 82             assert(isEmpty, "OnStop should be the last message")
 83 
 84           case RemoteProcessConnected(remoteAddress) =>
 85             endpoint.onConnected(remoteAddress)
 86 
 87           case RemoteProcessDisconnected(remoteAddress) =>
 88             endpoint.onDisconnected(remoteAddress)
 89 
 90           case RemoteProcessConnectionError(cause, remoteAddress) =>
 91             endpoint.onNetworkError(cause, remoteAddress)
 92         }
 93       }
 94 
 95       inbox.synchronized {
 96         // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
 97         // every time.
 98         if (!enableConcurrent && numActiveThreads != 1) {
 99           // If we are not the only one worker, exit
100           numActiveThreads -= 1
101           return
102         }
103         message = messages.poll()
104         if (message == null) {
105           numActiveThreads -= 1
106           return
107         }
108       }
109     }
110 }

其中,InboxMessage 繼承關係以下:

這些InboxMessage子類型在process 方法源碼中有體現。其中OneWayMessage和RpcMessage 都是自帶消息content 的,其餘的幾種都是消息事件,自己不帶任何除事件類型信息以外的信息。

在process 處理過程當中,考慮到了 一次性批量處理消息問題、多線程安全問題、異常拋出問題,多消息分支處理問題等等。

此時能夠回答上面咱們的疑問了,抽象出來 Inbox 的緣由在於,Diapatcher 的職責變得單一,只須要把數據分發就能夠了。具體分發數據要如何處理的問題留給了 Inbox,Inbox 把關注點放在了 如何處理這些消息上。考慮並解決了 一次性批量處理消息問題、多線程安全問題、異常拋出問題,多消息分支處理問題等等問題。

Outbox

下面看一下Outbox, 它的內部構造和Inbox很相似,再也不剖析。

OutboxMessage的繼承關係以下:

其中,OneWayOutboxMessage 的行爲是特定的。源碼以下:

它沒有回調方法。

RpcOutboxMessage 的回調則是經過構造方法傳進來的。其源碼以下:

 RpcOutboxMessage 是有回調的,回調方法經過構造方法指定,內部onFailure和onSuccess是模板方法。

相關文章
相關標籤/搜索