在上篇 spark 源碼分析之五 -- Spark內置RPC機制剖析之一建立NettyRPCEnv 中,涉及到了Diapatcher 內容,未作過多的剖析。本篇來剖析一下它的工做原理。html
Dispatcher 是消息的分發器,負責將消息分發給適合的 endpoint安全
其實這個類仍是比較簡單的,先來看它的類圖:多線程
咱們從成員變量入手分析整個類的內部構造和機理:app
1. endpoints是一個 ConcurrentMap[String, EndpointData], 負責存儲 endpoint name 和 EndpointData 的映射關係。其中,EndpointData又包含了 endpoint name, RpcEndpoint 以及 NettyRpcEndpointRef 的引用以及Inbox 對象(包含了RpcEndpoint 以及 NettyRpcEndpointRef 的引用)。oop
2. endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] 包含了 RpcEndpoint 和 RpcEndpointRef 的映射關係。源碼分析
3. receivers 是一個 LinkedBlockingQueue[EndpointData] 消息阻塞隊列,用於存放 EndpointData 對象。它主要用於追蹤 那些可能會包含須要處理消息receiver(即EndpointData)。在post消息到Dispatcher 時,通常會先post 到 EndpointData 的 Inbox 中, 而後,再將 EndpointData對象放入 receivers 中,源碼以下:post
// Posts a message to a specific endpoint. private def postMessage( endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { val error = synchronized { // 1. 先根據endpoint name從路由中找到data val data = endpoints.get(endpointName) if (stopped) { Some(new RpcEnvStoppedException()) } else if (data == null) { Some(new SparkException(s"Could not find $endpointName.")) } else { // 2. 將待消費的消息發送到 inbox中 data.inbox.post(message) // 3. 將 data 放到待消費的receiver 中 receivers.offer(data) None } } // We don't need to call `onStop` in the `synchronized` block error.foreach(callbackIfStopped) }
4. stopped 標誌 Dispatcher 是否已經中止了
5. threadpool 是 ThreadPoolExecutor 對象, 其中的 線程的 core 數量的計算以下:
val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, availableCores))
獲取到線程數以後, 會初始化 一個固定的線程池,用來執行 MessageLoop 任務,MessageLoop 是一個Runnable 對象。它會不停地從 receiver 堵塞隊列中, 把放入的 EndpointData對象取出來,而且去調用其inbox成員變量的 process 方法。
6. PoisonPill 是一個空的EndpointData對象,起了一個標誌位的做用,若是想要中止 Diapatcher ,會把PoisonPill 餵給 receiver 吃,當threadpool 執行 MessageLoop 任務時, 吃到了毒藥,立刻退出,線程也就死掉了。PoisonPill命名很形象,關閉線程池的方式也是優雅的,是值得咱們在工做中去學習和應用的。學習
從上面的成員變量分析部分能夠知道,數據經過 postMessage 方法將 InboxMessage 數據 post 到 EndpointData的Inbox對象中,並將待處理的EndpointData 對象放入到 receivers 中,線程池會不斷從這個隊列中拿數據,分發數據。this
其實,data 就包含了 RpcEndpoint 和 RpcEndpointRef 對象,本能夠在Dispatcher 中就能夠調用 endpoint 的方法去處理。爲何還要設計出來一個 Inbox 層次的抽象呢?
下面咱們就趁熱剖析一下 Inbox 這個對象。spa
Inbox 的官方解釋:
An inbox that stores messages for an RpcEndpoint and posts messages to it thread-safely.
其實就是它爲RpcEndpoint 對象保存了消息,而且將消息 post給 RpcEndpoint,同時保證了線程的安全性。
類圖以下:
跟 put 和 get 語義類似的有兩個方法, 分別是post 和 process。其實這兩個方法都是給 Dispatcher 對象調用的。post 將數據 存放到 堵塞消息隊列隊尾, pocess 則堵塞式 從消息隊列中取出數據來,並處理之。
這兩個關鍵方法源碼以下:
1 def post(message: InboxMessage): Unit = inbox.synchronized { 2 if (stopped) { 3 // We already put "OnStop" into "messages", so we should drop further messages 4 onDrop(message) 5 } else { 6 messages.add(message) 7 false 8 } 9 } 10 11 12 /** 13 * Calls action closure, and calls the endpoint's onError function in the case of exceptions. 14 */ 15 private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = { 16 try action catch { 17 case NonFatal(e) => 18 try endpoint.onError(e) catch { 19 case NonFatal(ee) => 20 if (stopped) { 21 logDebug("Ignoring error", ee) 22 } else { 23 logError("Ignoring error", ee) 24 } 25 } 26 } 27 } 28 29 /** 30 * Process stored messages. 31 */ 32 def process(dispatcher: Dispatcher): Unit = { 33 var message: InboxMessage = null 34 inbox.synchronized { 35 if (!enableConcurrent && numActiveThreads != 0) { 36 return 37 } 38 message = messages.poll() 39 if (message != null) { 40 numActiveThreads += 1 41 } else { 42 return 43 } 44 } 45 while (true) { 46 safelyCall(endpoint) { 47 message match { 48 case RpcMessage(_sender, content, context) => 49 try { 50 endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => 51 throw new SparkException(s"Unsupported message $message from ${_sender}") 52 }) 53 } catch { 54 case e: Throwable => 55 context.sendFailure(e) 56 // Throw the exception -- this exception will be caught by the safelyCall function. 57 // The endpoint's onError function will be called. 58 throw e 59 } 60 61 case OneWayMessage(_sender, content) => 62 endpoint.receive.applyOrElse[Any, Unit](content, { msg => 63 throw new SparkException(s"Unsupported message $message from ${_sender}") 64 }) 65 66 case OnStart => 67 endpoint.onStart() 68 if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { 69 inbox.synchronized { 70 if (!stopped) { 71 enableConcurrent = true 72 } 73 } 74 } 75 76 case OnStop => 77 val activeThreads = inbox.synchronized { inbox.numActiveThreads } 78 assert(activeThreads == 1, 79 s"There should be only a single active thread but found $activeThreads threads.") 80 dispatcher.removeRpcEndpointRef(endpoint) 81 endpoint.onStop() 82 assert(isEmpty, "OnStop should be the last message") 83 84 case RemoteProcessConnected(remoteAddress) => 85 endpoint.onConnected(remoteAddress) 86 87 case RemoteProcessDisconnected(remoteAddress) => 88 endpoint.onDisconnected(remoteAddress) 89 90 case RemoteProcessConnectionError(cause, remoteAddress) => 91 endpoint.onNetworkError(cause, remoteAddress) 92 } 93 } 94 95 inbox.synchronized { 96 // "enableConcurrent" will be set to false after `onStop` is called, so we should check it 97 // every time. 98 if (!enableConcurrent && numActiveThreads != 1) { 99 // If we are not the only one worker, exit 100 numActiveThreads -= 1 101 return 102 } 103 message = messages.poll() 104 if (message == null) { 105 numActiveThreads -= 1 106 return 107 } 108 } 109 } 110 }
其中,InboxMessage 繼承關係以下:
這些InboxMessage子類型在process 方法源碼中有體現。其中OneWayMessage和RpcMessage 都是自帶消息content 的,其餘的幾種都是消息事件,自己不帶任何除事件類型信息以外的信息。
在process 處理過程當中,考慮到了 一次性批量處理消息問題、多線程安全問題、異常拋出問題,多消息分支處理問題等等。
此時能夠回答上面咱們的疑問了,抽象出來 Inbox 的緣由在於,Diapatcher 的職責變得單一,只須要把數據分發就能夠了。具體分發數據要如何處理的問題留給了 Inbox,Inbox 把關注點放在了 如何處理這些消息上。考慮並解決了 一次性批量處理消息問題、多線程安全問題、異常拋出問題,多消息分支處理問題等等問題。
下面看一下Outbox, 它的內部構造和Inbox很相似,再也不剖析。
OutboxMessage的繼承關係以下:
其中,OneWayOutboxMessage 的行爲是特定的。源碼以下:
它沒有回調方法。
RpcOutboxMessage 的回調則是經過構造方法傳進來的。其源碼以下:
RpcOutboxMessage 是有回調的,回調方法經過構造方法指定,內部onFailure和onSuccess是模板方法。