twitter storm源碼走讀之2 -- tuple消息發送場景分析

歡迎轉載,轉載請註明出處源自徽滬一郎。本文嘗試分析tuple發送時的具體細節,本博的另外一篇文章《bolt消息傳遞路徑之源碼解讀》主要從消息接收方面來闡述問題,兩篇文章互爲補充。node

worker進程內消息接收與處理全景圖

先上幅圖簡要勾勒出worker進程接收到tuple消息以後的處理全過程api

IConnection的創建與使用

話說在mk-threads :bolt函數的實現中有這麼一段代碼,其主要功能是實現tuple的emit功能app

bolt-emit (fn [stream anchors values task]
          (let [out-tasks (if task
                  (tasks-fn task stream values)
                (tasks-fn stream values))]
        (fast-list-iter [t out-tasks]
                (let [anchors-to-ids (HashMap.)]
                  (fast-list-iter [^TupleImpl a anchors]
                          (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
                            (when (pos? (count root-ids))
                              (let [edge-id (MessageId/generateId rand)]
                            (.updateAckVal a edge-id)
                            (fast-list-iter [root-id root-ids]
                                    (put-xor! anchors-to-ids root-id edge-id))
                            ))))
                  (transfer-fn t
                           (TupleImpl. worker-context
                               values
                               task-id
                               stream
                               (MessageId/makeId anchors-to-ids)))))
        (or out-tasks [])))

 

加亮爲藍色的部分實現的功能是另外發送tuple,那麼transfer-fn函數的定義在哪呢?見mk-threads的let部分,能見到下述一行代碼socket

:transfer-fn (mk-executor-transfer-fn batch-transfer->worker)

在繼續往下看每一個函數實現以前,先肯定一下這節代碼閱讀的目的。storm在線程之間使用disruptor進行通信,在進程之間進行消息通信使用的是zeromq或netty, 因此須要從transfer-fn追蹤到使用zeromq或netty api的位置。ide

再看mk-executor-transfer-fn函數實現函數

(defn mk-executor-transfer-fn [batch-transfer->worker]
  (fn this
      ([task tuple block? ^List overflow-buffer]
       (if (and overflow-buffer (not (.isEmpty overflow-buffer)))
       (.add overflow-buffer [task tuple])
     (try-cause
      (disruptor/publish batch-transfer->worker [task tuple] block?)
      (catch InsufficientCapacityException e
         (if overflow-buffer
             (.add overflow-buffer [task tuple])
           (throw e))
         ))))
      ([task tuple overflow-buffer]
       (this task tuple (nil? overflow-buffer) overflow-buffer))
      ([task tuple]
       (this task tuple nil)
       )))

disruptor/publish表示將消息從本線程發送出去,至於誰是該消息的接收者,請繼續往下看。oop

worker進程中,有一個receiver-thread是用來專門接收來自外部進程的消息,那麼與之相對的是有一個transfer-thread用來將本進程的消息發送給外部進程。因此剛纔的disruptor/publish發送出來的消息應該被transfer-thread接收到。ui

在transfer-thread中,能找到這行下述一行代碼this

transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)

對於接收到來自本進程中其它線程發送過來的消息利用transfer-tuples進行處理,transfer-tuples使用mk-transfer-tuples-handler來建立,因此須要看看mk-transfer-tuples-handler可否與zeromq或netty聯繫上呢?url

(defn mk-transfer-tuples-handler [worker]
  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
            drainer (ArrayList.)
            node+port->socket (:cached-node+port->socket worker)
            task->node+port (:cached-task->node+port worker)
            endpoint-socket-lock (:endpoint-socket-lock worker)
            ]
    (disruptor/clojure-handler
     (fn [packets _ batch-end?]
     (.addAll drainer packets)
     (when batch-end?
       (read-locked endpoint-socket-lock
            (let [node+port->socket @node+port->socket
                        task->node+port @task->node+port]
              ;; consider doing some automatic batching here (would need to not be serialized at this point to remo
              ;; try using multipart messages ... first sort the tuples by the target node (without changing the lo
              17
              (fast-list-iter [[task ser-tuple] drainer]
                      ;; TODO: consider write a batch of tuples here to every target worker
                      ;; group by node+port, do multipart send
                      (let [node-port (get task->node+port task)]
                        (when node-port
                          (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                        ))))
       (.clear drainer))))))

上述代碼中出現了與zeromq可能有聯繫的部分了即加亮爲紅色的一行。

那憑什麼說加亮的IConnection一行與zeromq有關係的,這話得慢慢提及,須要從配置文件開始。

在storm.yaml中有這麼一行配置項,即

storm.messaging.transport: "backtype.storm.messaging.zmq"

這個配置項與worker中的mqcontext相對應,因此在worker中以mqcontext爲線索,就可以一步步找到IConnection的實現。connections在函數mk-refresh-connections中創建

refresh-connections (mk-refresh-connections worker)

mk-refresh-connection函數中與mq-context相關聯的一部分代碼以下所示

(swap! (:cached-node+port->socket worker)
       #(HashMap. (merge (into {} %1) %2))
       (into {}
         (dofor [endpoint-str new-connections
                  :let [[node port] (string->endpoint endpoint-str)]]
             [endpoint-str
               (.connect ^IContext (:mq-context worker)
            storm-id
            ((:node->host assignment) node)
            port)
               ]
            )))

注意加亮部分,利用mq-conext中connect函數來建立IConnection. 當打開zmq.clj時候,就能驗證咱們的猜想。

(^IConnection connect [this ^String storm-id ^String host ^int port]
          (require 'backtype.storm.messaging.zmq)
          (-> context
          (mq/socket mq/push)
          (mq/set-hwm hwm)
          (mq/set-linger linger-ms)
          (mq/connect (get-connect-zmq-url local? host port))
          mk-connection))

代碼走到這裏,IConnection何時創建起來的謎底就揭開了,消息是如何從bolt或spout線程傳遞到transfer-thread,再由zeromq將tuple發送給下跳的路徑打通了。

tuple的分發策略 grouping

從一個bolt中產生的tuple能夠有多個bolt接收,到底發送給哪個bolt呢?這牽扯到分發策略問題,其實在twitter storm中有兩個層面的分發策略問題,一個是對於task level的,在講topology submit的時候已經涉及到。另外一個就是如今要討論的針對tuple level的分發。

再次將視線拉回到bolt-emit中,此次將目光集中在變量t的前先後後。

  (let [out-tasks (if task
(tasks-fn task stream values)
(tasks-fn stream values))]
(fast-list-iter [t out-tasks]
(let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
(let [edge-id (MessageId/generateId rand)]
(.updateAckVal a edge-id)
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
(transfer-fn t
(TupleImpl. worker-context
values
task-id
stream
(MessageId/makeId anchors-to-ids)))))

上述代碼顯示t從out-tasks來,而out-tasks是tasks-fn的返回值

    tasks-fn (:tasks-fn task-data)

一談tasks-fn,原來從未涉及的文件task.clj此次被掛上了,task-data與由task/mk-task建立。將中間環節跳過,調用關係以下所列。

  • mk-task
  • mk-task-data
  • mk-tasks-fn

tasks-fn中會使用到grouping,處理代碼以下

fn ([^Integer out-task-id ^String stream ^List values]
          (when debug?
            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
          (let [target-component (.getComponentId worker-context out-task-id)
                component->grouping (get stream->component->grouper stream)
               grouping (get component->grouping target-component)
                out-task-id (if grouping out-task-id)]
            (when (and (not-nil? grouping) (not= :direct grouping))
              (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
            (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
            (when (emit-sampler)
              (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
              (stats/emitted-tuple! executor-stats stream)
              (if out-task-id
                (stats/transferred-tuples! executor-stats stream 1)
                (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
            (if out-task-id [out-task-id])
            ))

而每一個topology中的grouping策略又是如何被executor知道的呢,這從另外一端executor-data提及。

在mk-executor-data中有下面一行代碼 

:stream->component->grouper (outbound-components worker-context component-id)

outbound-components的定義以下

(defn outbound-components
  "Returns map of stream id to component id to grouper"
  [^WorkerTopologyContext worker-context component-id]
  (->> (.getTargets worker-context component-id)
       clojurify-structure
       (map (fn [[stream-id component->grouping]]
        [stream-id
         (outbound-groupings
          worker-context
          component-id
          stream-id
          (.getComponentOutputFields worker-context component-id stream-id)
          component->grouping)]))
       (into {})
       (HashMap.)))
相關文章
相關標籤/搜索