supervisor經過調用sync-processes函數來啓動worker,關於sync-processes函數的詳細分析請參見"storm啓動supervisor源碼分析-supervisor.clj"。sync-processes函數代碼片斷以下:java
sync-processes函數代碼片斷node
;; sync-processes函數用於管理workers, 好比處理不正常的worker或dead worker, 並建立新的workers
;; supervisor標識supervisor的元數據
(defn sync-processes [supervisor]
.
.
.
;; 忽略了部分代碼
.
.
.
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
(let [id (new-worker-ids port)]
(log-message "Launching worker with assignment "
(pr-str assignment)
" for this supervisor "
(:supervisor-id supervisor)
" on port "
port
" with id "
id
)
;; launch-worker函數負責啓動worker
(launch-worker supervisor
(:storm-id assignment)
port
id)
id)))
))
sync-processes函數調用launch-worker函數啓動worker,launch-worker函數是一個"多重函數",定義以下:
宏defmulti和defmethod常常被用在一塊兒來定義multimethod-"多重函數"。宏defmulti的參數包括一個方法名以及一個dispatch函數,這個dispatch函數的返回值會被用來選擇到底調用哪一個重載的函數。宏defmethod的參數則包括方法名,dispatch的值,參數列表以及方法體。一個特殊的dispatch值:default 是用來表示默認狀況的—即若是其它的dispatch值都不匹配的話,那麼就調用這個方法。defmethod定義名字相同的方法,它們的參數個數必須同樣。傳給multimethod的參數會傳給dipatch函數。實現相似java的重載shell
launch-worker函數數組
(
defmulti
launch-worker (
fn
[
supervisor
&
_
] (
cluster-mode (
:conf
supervisor))))
;; 若是dispatch函數的返回值爲關鍵字:distributed,即storm集羣運行在分佈式模式下,則執行該方法
(
defmethod
launch-worker
;; supervisor標識supervisor的元數據,storm-id標識該worker所屬的topology,port標識該worker佔用的端口號,worker-id是一個32位的uuid,用於標識worker
:distributed
[
supervisor
storm-id
port
worker-id
]
;; conf綁定集羣配置信息
(
let
[
conf (
:conf
supervisor)
;; storm-home綁定storm本地安裝路徑
storm-home (
System/getProperty
"storm.home")
;; storm-log-dir綁定日誌路徑
storm-log-dir (
or (
System/getProperty
"storm.log.dir") (
str
storm-home
"/logs"))
;; stormroot綁定supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}"
stormroot (
supervisor-stormdist-root
conf
storm-id)
;; jlp綁定運行時所依賴的本地庫的路徑,jlp函數生成本地庫路徑,參見jlp函數定義部分
jlp (
jlp
stormroot
conf)
;; stormjar綁定stormjar.jar文件的路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}/stormjar.jar"
stormjar (
supervisor-stormjar-path
stormroot)
;; storm-conf綁定集羣配置信息和storm-id配置信息的並集
storm-conf (
read-supervisor-storm-conf
conf
storm-id)
;; topo-classpath綁定storm-id的classpath集合
topo-classpath (
if-let
[
cp (
storm-conf
TOPOLOGY-CLASSPATH
)]
[
cp
]
[])
;; 將stormjar和topo-classpath所標識的路徑添加到Java的classpath中
classpath (
-> (
current-classpath)
(
add-to-classpath
[
stormjar
])
(
add-to-classpath
topo-classpath))
;; 從集羣配置信息中獲取默認狀況下supervisor啓動worker的jvm參數
worker-childopts (
when-let
[s (
conf
WORKER-CHILDOPTS
)]
(
substitute-childopts s
worker-id
storm-id
port))
;; 從topology的配置信息中獲取爲該topology的worker指定的jvm參數
topo-worker-childopts (
when-let
[s (
storm-conf
TOPOLOGY-WORKER-CHILDOPTS
)]
(
substitute-childopts s
worker-id
storm-id
port))
;; 將該topology特有的依賴庫路徑合併到jlp中,這樣topology-worker-environment綁定的map中就包含了啓動該topology的worker所需的全部的依賴庫
topology-worker-environment (
if-let
[
env (
storm-conf
TOPOLOGY-ENVIRONMENT
)]
(
merge
env
{
"LD_LIBRARY_PATH"
jlp
})
{
"LD_LIBRARY_PATH"
jlp
})
;; 生成該worker的日誌文件worker-{port}.log
logfilename (
str
"worker-"
port
".log")
;; command綁定一個Java -server xxxxxx -cp classpath classname arg_0 arg_1 ... arg_n命令,xxxxxx表示傳遞給java命令的jvm參數
command (
concat
[(
java-cmd)
"-server"
]
worker-childopts
topo-worker-childopts
[(
str
"-Djava.library.path="
jlp)
(
str
"-Dlogfile.name="
logfilename)
(
str
"-Dstorm.home="
storm-home)
(
str
"-Dstorm.log.dir="
storm-log-dir)
(
str
"-Dlogback.configurationFile="
storm-home
"/logback/cluster.xml")
(
str
"-Dstorm.id="
storm-id)
(
str
"-Dworker.id="
worker-id)
(
str
"-Dworker.port="
port)
"-cp"
classpath
"backtype.storm.daemon.worker"
storm-id
(
:assignment-id
supervisor)
port
worker-id
])
;; 去掉command命令數組中的空值
command (
->>
command (
map
str) (
filter (
complement
empty?)))
;; 獲取command命令數組的字符串形式
shell-cmd (
->>
command
(
map
#(
str
\' (
clojure.string/escape
%
{
\'
"\\'"
})
\'))
(
clojure.string/join
" "
))]
(
log-message
"Launching worker with command: "
shell-cmd)
;; 經過ProcessBuilder類來執行command命令,即執行java命令運行backtype.storm.daemon.worker類的main方法建立一個新的進程,傳遞給main方法的參數爲storm-id,supervisor-id,port和worker-id
;; 關於backtype.storm.daemon.worker類的main方法請參見其定義部分
(
launch-process
command
:environment
topology-worker-environment)
))
;; 若是dispatch函數的返回值爲關鍵字:local,即storm集羣運行在本地模式下,則執行該方法
(
defmethod
launch-worker
:local
[
supervisor
storm-id
port
worker-id
]
(
let
[
conf (
:conf
supervisor)
pid (
uuid)
worker (
worker/mk-worker
conf
(
:shared-context
supervisor)
storm-id
(
:assignment-id
supervisor)
port
worker-id
)]
(
psim/register-process
pid
worker)
(
swap! (
:worker-thread-pids-atom
supervisor)
assoc
worker-id
pid)
))
jlp函數定義以下:緩存
;; stormroot綁定supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}",conf綁定集羣配置
(
defn
jlp
[
stormroot
conf
]
;; resource-root綁定supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}/resources"
(
let
[
resource-root (
str
stormroot
File/separator
RESOURCES-SUBDIR)
;; os綁定supervisor服務器的操做系統名
os (
clojure.string/replace (
System/getProperty
"os.name")
#
"\s+"
"_")
;; arch綁定操做系統的架構,如"x86"和"i386"
arch (
System/getProperty
"os.arch")
;; arch-resource-root綁定路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}/resources/{os}-{arch}"
arch-resource-root (
str
resource-root
File/separator
os
"-"
arch
)]
;; 返回"{storm.local.dir}/supervisor/stormdist/{storm-id}/resources/{os}-{arch}:{storm.local.dir}/supervisor/stormdist/{storm-id}/resources:{java.library.path}"
(
str
arch-resource-root
File/pathSeparator
resource-root
File/pathSeparator (
conf
JAVA-LIBRARY-PATH))))
read-supervisor-storm-conf函數定義以下:安全
;; 從supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/stormconf.ser"讀取topology運行配置信息
(
defn
read-supervisor-storm-conf
[
conf
storm-id
]
;; stormroot綁定目錄路徑"{storm.local.dir}/supervisor/stormdist"
(
let
[
stormroot (
supervisor-stormdist-root
conf
storm-id)
;; conf-path綁定文件路徑"{storm.local.dir}/supervisor/stormdist/stormconf.ser"
conf-path (
supervisor-stormconf-path
stormroot)
;; topology-path綁定文件路徑"{storm.local.dir}/supervisor/stormdist/stormcode.ser"
topology-path (
supervisor-stormcode-path
stormroot
)]
;; 返回集羣配置信息和topology配置信息合併後的配置信息map
(
merge
conf (
Utils/deserialize (
FileUtils/readFileToByteArray (
File.
conf-path))))
))
backtype.storm.daemon.worker類定義在worker.clj文件中,經過:gen-class生成一個lava類,其main方法以下:服務器
(
defn
-main
[
storm-id
assignment-id
port-str
worker-id
]
;; 讀取storm集羣配置信息
(
let
[
conf (
read-storm-config
)]
;; 驗證配置信息
(
validate-distributed-mode!
conf)
;; 調用mk-worker函數,mk-worker函數請參見其定義部分
(
mk-worker
conf
nil
storm-id
assignment-id (
Integer/parseInt
port-str)
worker-id)))
mk-worker函數:架構
;; conf綁定集羣配置信息,shared-mq-context綁定共享mq,storm-id標識topology-id,assignment-id標識supervisor-id
(
defserverfn
mk-worker
[
conf
shared-mq-context
storm-id
assignment-id
port
worker-id
]
(
log-message
"Launching worker for "
storm-id
" on "
assignment-id
":"
port
" with id "
worker-id
" and conf "
conf)
;; 若是storm不是"本地模式"運行(即"分佈式模式"運行),則將標準輸入輸出流重定向到slf4j
(
if-not (
local-mode?
conf)
(
redirect-stdio-to-slf4j!))
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
;; 若是storm是"分佈式模式"運行,則在supervisor服務器本地建立文件"{storm.local.dir}/workers/{worker-id}/pids/{process-pid}",process-pid函數主要功能就是獲取jvm進程的id
;; 須要特別注意的是worker-id是咱們人爲分配給該進程的一個標識,建立進程時,咱們沒法指定一個jvm進程的id,進程id是由操做系統分配的,因此咱們須要獲取該進程的實際id,並將咱們指定的worker-id與進程id進行關聯
(
when (
=
:distributed (
cluster-mode
conf))
(
touch (
worker-pid-path
conf
worker-id (
process-pid))))
;; worker綁定該進程的"元數據",worker-data函數的主要功能就是生成進程的"元數據",worker-data函數請參見其定義部分
(
let
[
worker (
worker-data
conf
shared-mq-context
storm-id
assignment-id
port
worker-id)
;; heartbeat-fn綁定一個匿名函數,該匿名函數的功能就是生成worker"本地心跳信息",這裏至關定義了heartbeat-fn函數,do-heartbeat函數請參見其定義部分
heartbeat-fn
#(
do-heartbeat
worker)
;; do this here so that the worker process dies if this fails
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
;; 調用heartbeat-fn函數將worker進程心跳信息保存到本地LocalState對象中
_ (
heartbeat-fn)
;; 定義一個原子類型的引用executors
executors (
atom
nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
;; 將heartbeat-fn函數添加到定時器heartbeat-timer中,延遲執行時間爲0s,每隔WORKER-HEARTBEAT-FREQUENCY-SECS執行一次
_ (
schedule-recurring (
:heartbeat-timer
worker)
0 (
conf
WORKER-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
;; 將#(do-executor-heartbeats worker :executors @executors)函數添加到定時器executor-heartbeat-timer中,延遲執行時間爲0s,每隔TASK-HEARTBEAT-FREQUENCY-SECS執行一次
;; 這樣就能夠將worker進程心跳信息同步到zookeeper中, 以便nimbus能夠馬上知道worker進程已經啓動,do-executor-heartbeats函數請參見其定義部分
_ (
schedule-recurring (
:executor-heartbeat-timer
worker)
0 (
conf
TASK-HEARTBEAT-FREQUENCY-SECS)
#(
do-executor-heartbeats
worker
:executors
@
executors))
;; 更新發送connections,mk-refresh-connections函數請參見其定義部分
refresh-connections (
mk-refresh-connections
worker)
;; 主動調用refresh-connections函數refresh該worker進程所擁有的connections,而且不向zookeeper註冊回調函數
_ (
refresh-connections
nil)
;; 調用refresh-storm-active函數refresh該worker進程緩存的所屬topology的活躍狀態,refresh-storm-active函數請其參見定義部分
_ (
refresh-storm-active
worker
nil)
;; 調用mk-executor函數生成executor對象,保存到executors集合中。關於executor對象的建立將會在之後文章中具體分析
_ (
reset!
executors (
dofor
[
e (
:executors
worker
)] (
executor/mk-executor
worker
e)))
;; 啓動worker進程專有的接收線程,將數據從worker進程的偵聽端口,不停的放到task對應的接收隊列,receive-thread-shutdown綁定該接收線程的關閉函數。launch-receive-thread函數請參見其定義部分
receive-thread-shutdown (
launch-receive-thread
worker)
;; 定義event handler來處理transfer queue裏面的數據。關於消息處理的流程會在之後文章中具體分析
transfer-tuples (
mk-transfer-tuples-handler
worker)
;; 建立transfer-thread。關於消息處理的流程會在之後文章中具體分析
transfer-thread (
disruptor/consume-loop* (
:transfer-queue
worker)
transfer-tuples)
;; 定義worker進程關閉回調函數,當關閉worker進程時調用該函數釋放worker進程所佔有的資源
shutdown* (
fn
[]
(
log-message
"Shutting down worker "
storm-id
" "
assignment-id
" "
port)
;; 關閉該worker進程到其餘worker進程的鏈接
(
doseq
[[
_
socket
]
@(
:cached-node+port->socket
worker
)]
;; this will do best effort flushing since the linger period
;; was set on creation
(
.close
socket))
(
log-message
"Shutting down receive thread")
;; 調用receive-thread-shutdown函數關閉該worker進程的接收線程
(
receive-thread-shutdown)
(
log-message
"Shut down receive thread")
(
log-message
"Terminating messaging context")
(
log-message
"Shutting down executors")
;; 關閉該worker進程所擁有的executor
(
doseq
[
executor
@
executors
] (
.shutdown
executor))
(
log-message
"Shut down executors")
;;this is fine because the only time this is shared is when it's a local context,
;;in which case it's a noop
;; 關閉該worker進程所擁有的backtype.storm.messaging.netty.Context實例
(
.term
^
IContext (
:mq-context
worker))
(
log-message
"Shutting down transfer thread")
;; 關閉transfer-queue
(
disruptor/halt-with-interrupt! (
:transfer-queue
worker))
;; 中斷transfer-thread
(
.interrupt
transfer-thread)
;; 等待transfer-thread結束
(
.join
transfer-thread)
(
log-message
"Shut down transfer thread")
;; 調用cancel-timer函數中斷heartbeat-timer定時器線程
(
cancel-timer (
:heartbeat-timer
worker))
;; 調用cancel-timer函數中斷refresh-connections-timer定時器線程
(
cancel-timer (
:refresh-connections-timer
worker))
;; 調用cancel-timer函數中斷refresh-active-timer定時器線程
(
cancel-timer (
:refresh-active-timer
worker))
;; 調用cancel-timer函數中斷executor-heartbeat-timer定時器線程
(
cancel-timer (
:executor-heartbeat-timer
worker))
;; 調用cancel-timer函數中斷user-timer定時器線程
(
cancel-timer (
:user-timer
worker))
;; 關閉該worker進程所擁有的線程池
(
close-resources
worker)
;; TODO: here need to invoke the "shutdown" method of WorkerHook
;; 調用StormClusterState實例的remove-worker-heartbeat!函數從zookeeper上刪除worker心跳信息
(
.remove-worker-heartbeat! (
:storm-cluster-state
worker)
storm-id
assignment-id
port)
(
log-message
"Disconnecting from storm cluster state context")
;; 關閉zookeeper鏈接
(
.disconnect (
:storm-cluster-state
worker))
(
.close (
:cluster-state
worker))
(
log-message
"Shut down worker "
storm-id
" "
assignment-id
" "
port))
;; ret實現了Shutdownable和DaemonCommon協議
ret (
reify
Shutdownable
(
shutdown
[
this
]
(
shutdown*))
DaemonCommon
(
waiting?
[
this
]
(
and
(
timer-waiting? (
:heartbeat-timer
worker))
(
timer-waiting? (
:refresh-connections-timer
worker))
(
timer-waiting? (
:refresh-active-timer
worker))
(
timer-waiting? (
:executor-heartbeat-timer
worker))
(
timer-waiting? (
:user-timer
worker))
))
)]
;; 將refresh-connections函數添加到定時器refresh-connections-timer中,每隔TASK-REFRESH-POLL-SECS執行一次。refresh-connections函數的無參版本提供一個默認回調函數調用其有參版原本更新所屬 worker進程所擁有的collections,默認回調函數就是再次將refresh-connections函數無參版本添加到定時器refresh-connections-timer中
;; 這樣只要zookeeper上分配信息發生變化,refresh-connections函數的有參版本就會執行,這裏之因此週期執行refresh-connections函數是以防zookeeper的"watcher機制"失效
(
schedule-recurring (
:refresh-connections-timer
worker)
0 (
conf
TASK-REFRESH-POLL-SECS)
refresh-connections)
;; 將函數(partial refresh-storm-active worker)添加到定時器refresh-active-timer中,每隔TASK-REFRESH-POLL-SECS執行一次。refresh-storm-active函數的執行邏輯與refresh-connections函數徹底相 同
(
schedule-recurring (
:refresh-active-timer
worker)
0 (
conf
TASK-REFRESH-POLL-SECS) (
partial
refresh-storm-active
worker))
(
log-message
"Worker has topology config " (
:storm-conf
worker))
(
log-message
"Worker "
worker-id
" for storm "
storm-id
" on "
assignment-id
":"
port
" has finished loading")
;; 返回實現了Shutdownable協議和DaemonCommon協議的實例ret,經過ret咱們能夠關閉worker進程
ret
))
worker-data函數:app
;; worker-data函數生成進程的"元數據"
(
defn
worker-data
[
conf
mq-context
storm-id
assignment-id
port
worker-id
]
;; 爲該進程生成ClusterState實例
(
let
[
cluster-state (
cluster/mk-distributed-cluster-state
conf)
;; 爲該進程生成StormClusterState實例,這樣進程就能夠經過StormClusterState與zookeeper進行交互了
storm-cluster-state (
cluster/mk-storm-cluster-state
cluster-state)
;; 調用read-supervisor-storm-conf函數讀取storm-id的配置信息,read-supervisor-storm-conf函數請參見其定義部分
storm-conf (
read-supervisor-storm-conf
conf
storm-id)
;; executors綁定分配給該進程的executor的id集合,包含system executor的id
executors (
set (
read-worker-executors
storm-conf
storm-cluster-state
storm-id
assignment-id
port))
;; 進程內executor間通訊是經過disruptor實現的,因此這裏爲該worker建立了一個名爲"worker-transfer-queue"的disruptor queue,關於disruptor的內容會在之後詳細介紹
;; 注意transfer-queue是worker相關的,與executor無關
transfer-queue (
disruptor/disruptor-queue
"worker-transfer-queue" (
storm-conf
TOPOLOGY-TRANSFER-BUFFER-SIZE)
:wait-strategy (
storm-conf
TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
;; mk-receive-queue-map函數爲每一個executor建立一個名爲"receive-queue{executor-id}"的disruptor queue,executor-receive-queue-map綁定executor-id->"disruptor接收queue"的map
;; 注意executor-receive-queue-map是executor相關,與worker無關
executor-receive-queue-map (
mk-receive-queue-map
storm-conf
executors)
;; executor可能有多個tasks,相同executor的tasks共用一個"disruptor接收queue",將executor-id->"disruptor接收queue"的map轉化爲task-id->"disruptor接收queue"的map,
;; 如executor-receive-queue-map={[1 2] receive-queue[1 2], [3 4] receive-queue[3 4]},那麼receive-queue-map={1 receive-queue[1 2], 2 receive-queue[1 2], 3 receive-queue[3 4], 4 receive-queue[3 4]}
receive-queue-map (
->>
executor-receive-queue-map
(
mapcat (
fn
[[
e
queue
]] (
for
[
t (
executor-id->tasks
e
)]
[
t
queue
])))
(
into
{}))
;; 調用read-supervisor-topology函數從supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/stormcode.ser"讀取topology對象的序列化文件
topology (
read-supervisor-topology
conf
storm-id
)]
;; recursive-map宏會將下面value都執行一遍,用返回值和key生成新的map做爲worker的"元數據",recursive-map宏見其定義部分
(
recursive-map
;; 保存集羣配置信息
:conf
conf
;; 保存一個傳輸層實例用於worker進程間消息傳遞,storm傳輸層被定義成了"可插拔式"插件,經過實現backtype.storm.messaging.IContext接口就能夠定義本身的消息傳輸層。storm 0.8.x默認傳輸層實例是 backtype.storm.messaging.zmq,可是因爲
;; 1.ZeroMQ是一個本地化的消息庫,它過分依賴操做系統環境,並且ZeroMQ使用的是"堆外內存",沒法使用jvm相關的內存監控工具進行監控管理,存在"堆外內存"泄漏風險
;; 2.安裝起來比較麻煩
;; 3.ZeroMQ的穩定性在不一樣版本之間差別巨大,而且目前只有2.1.7版本的ZeroMQ能與Storm協調的工做。
;; 因此storm 0.9以後默認傳出層實例爲backtype.storm.messaging.netty.Context,Netty有以下優勢:
;; 1.平臺隔離,Netty是一個純Java實現的消息隊列,能夠幫助Storm實現更好的跨平臺特性,同時基於JVM的實現可讓咱們對消息有更好的控制,由於Netty使用jvm的堆內存,而不是堆外內存
;; 2.高性能,Netty的性能要比ZeroMQ快兩倍左右
;; 3. 安全性認證,使得咱們未來要作的worker進程之間的認證受權機制成爲可能。
:mq-context (
if
mq-context
mq-context
(
TransportFactory/makeContext
storm-conf))
;; 記錄所屬storm-id
:storm-id
storm-id
;; 記錄所屬supervisor-id
:assignment-id
assignment-id
;; 記錄端口
:port
port
;; 記錄咱們分配給該進程的worker-id
:worker-id
worker-id
;; 記錄ClusterState實例
:cluster-state
cluster-state
;; 記錄StormClusterState實例,以便worker進程與zookeeper進行交互
:storm-cluster-state
storm-cluster-state
;; 記錄topology的當前活躍狀態爲false
:storm-active-atom (
atom
false)
;; 記錄分佈在該worker進程上的executors的id
:executors
executors
;; 記錄排序後的分佈在該worker進程上的tasks的id
:task-ids (
->>
receive-queue-map
keys (
map
int)
sort)
;; 記錄該topology的配置信息
:storm-conf
storm-conf
;; 記錄topology實例
:topology
topology
;; 記錄添加了acker,system bolt,metric bolt後的topology實例
:system-topology (
system-topology!
storm-conf
topology)
;; 記錄一個名爲"heartbeat-timer"的定時器
:heartbeat-timer (
mk-halting-timer
"heartbeat-timer")
;; 記錄一個名爲"refresh-connections-timer"的定時器
:refresh-connections-timer (
mk-halting-timer
"refresh-connections-timer")
;; 記錄一個名爲"refresh-active-timer"的定時器
:refresh-active-timer (
mk-halting-timer
"refresh-active-timer")
;; 記錄一個名爲"executor-heartbeat-timer"的定時器
:executor-heartbeat-timer (
mk-halting-timer
"executor-heartbeat-timer")
;; 記錄一個名爲"user-timer"的定時器
:user-timer (
mk-halting-timer
"user-timer")
;; 記錄任務id->組件名稱鍵值對的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"},storm-task-info函數請參見其定義部分
:task->component (
HashMap. (
storm-task-info
topology
storm-conf))
; for optimized access when used in tasks later on
;; 記錄"組件名稱"->"stream_id->輸出域Fields對象的map"的map,component->stream->fields函數請參見其定義部分
:component->stream->fields (
component->stream->fields (
:system-topology
<>))
;; 記錄"組件名稱"->排序後task-id集合的map,形如:{"boltA" [1 2 3 4], "boltB" [5 6]}
:component->sorted-tasks (
->> (
:task->component
<>)
reverse-map (
map-val
sort))
;; 記錄一個ReentrantReadWriteLock對象
:endpoint-socket-lock (
mk-rw-lock)
;; 記錄一個node+port->socket的原子類型的map
:cached-node+port->socket (
atom
{})
;; 記錄一個task->node+port的原子類型的map
:cached-task->node+port (
atom
{})
;; 記錄該worker進程的傳輸隊列transfer-queue
:transfer-queue
transfer-queue
;; 記錄executor接收隊列executor-receive-queue-map
:executor-receive-queue-map
executor-receive-queue-map
;; 記錄executor中"開始任務id"->executor接收queue的map,如executor-receive-queue-map={[1 2] receive-queue[1 2], [3 4] receive-queue[3 4]},那麼short-executor-receive-queue-map={1 receive-queue[1 2], 3 receive-queue[3 4]}
:short-executor-receive-queue-map (
map-key
first
executor-receive-queue-map)
;; 記錄task_id->executor中"開始任務id"的map,如executors=#{[1 2] [3 4] [5 6]},task->short-executor={1 1, 2 1, 3 3, 4 3, 5 5, 6 5}
:task->short-executor (
->>
executors
(
mapcat (
fn
[
e
] (
for
[
t (
executor-id->tasks
e
)]
[
t (
first
e
)])))
(
into
{})
(
HashMap.))
;; 記錄一個能夠終止該worker進程的"自殺函數"
:suicide-fn (
mk-suicide-fn
conf)
;; 記錄一個能夠計算該worker進程啓動了多長時間的函數
:uptime (
uptime-computer)
;; 爲該worker進程生成一個線程池
:default-shared-resources (
mk-default-resources
<>)
;; mk-user-resources函數目前版本爲空實現
:user-shared-resources (
mk-user-resources
<>)
;; 記錄一個函數,該函數的主要功能就是接收messages並將message發送到task對應的接收隊列,mk-transfer-local-fn函數請參見其定義部分
:transfer-local-fn (
mk-transfer-local-fn
<>)
;; 記錄每一個worker進程特有的接收線程的個數
:receiver-thread-count (
get
storm-conf
WORKER-RECEIVER-THREAD-COUNT)
;; 將executor處理過的message放到worker進程發送隊列transfer-queue中,mk-transfer-fn函數請參見其定義部分
:transfer-fn (
<>)
)))
read-worker-executors函數:jvm
;; read-worker-executors函數用於讀取分佈在該進程上的executor信息
(
defn
read-worker-executors
[
storm-conf
storm-cluster-state
storm-id
assignment-id
port
]
;; assignment綁定executor->node+port的map,調用StormClusterState實例的assignment-info函數從zookeeper上讀取storm-id的分配信息AssignmentInfo實例
;; AssignmentInfo定義以下:(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
(
let
[
assignment (
:executor->node+port (
.assignment-info
storm-cluster-state
storm-id
nil
))]
;; 返回分配給該進程的executor的id集合,包含system executor的id
(
doall
;; 將system executor的id和topology executor的id合併
(
concat
;; system executor的id,[-1 -1]
[
Constants/SYSTEM_EXECUTOR_ID
]
;; 從分配信息assignment中獲取分配給該進程的executor
(
mapcat (
fn
[[
executor
loc
]]
(
if (
=
loc
[
assignment-id
port
])
[
executor
]
))
assignment)))))
mk-receive-queue-map函數:
;; mk-receive-queue-map函數爲每一個executor建立一個名爲"receive-queue{executor-id}"的disruptor queue,如"receive-queue[1 3]",並返回executor-id->receive-queue的map
(
defn-
mk-receive-queue-map
[
storm-conf
executors
]
;; executors標識了executor-id集合
(
->>
executors
;; TODO: this depends on the type of executor
;; 經過調用map函數爲每一個executor-id建立一個"disruptor接收queue"
(
map (
fn
[
e
]
[
e (
disruptor/disruptor-queue (
str
"receive-queue"
e)
(
storm-conf
TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
:wait-strategy (
storm-conf
TOPOLOGY-DISRUPTOR-WAIT-STRATEGY
))]))
;; 返回executor-id->receive-queue的map
(
into
{})
))
storm-task-info函數:
(
defn
storm-task-info
"Returns map from task -> component id"
[
^
StormTopology
user-topology
storm-conf
]
(
->> (
system-topology!
storm-conf
user-topology)
;; 獲取組件名稱->組件對象鍵值對的map
all-components
;; 返回組件名稱->組件任務數鍵值對的map,如{"boltA" 4, "boltB" 2}
(
map-val (
comp
#(
get
%
TOPOLOGY-TASKS)
component-conf))
;; 按照組件名稱對map進行排序返回結果序列,如(["boltA" 4] ["boltB" 2])
(
sort-by
first)
;; mapcat函數等價於對(map (fn...))的返回結果執行concat函數,返回("boltA" "boltA" "boltA" "boltA" "boltB" "boltB")
(
mapcat (
fn
[[
c
num-tasks
]] (
repeat
num-tasks
c)))
;; {1 "boltA", 2 "boltA",3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
(
map (
fn
[
id
comp
]
[
id
comp
]) (
iterate (
comp int
inc) (
int
1)))
(
into
{})
))
component->stream->fields函數:
(
defn
component->stream->fields
[
^
StormTopology
topology
]
;; 調用ThriftTopologyUtils/getComponentIds方法獲取topology全部組件名稱集合,如#{"boltA", "boltB", "boltC"}
(
->> (
ThriftTopologyUtils/getComponentIds
topology)
;; 獲取每一個組件的stream_id->StreamInfo對象的map,stream->fields函數請參見其定義部分
(
map (
fn
[
c
]
[
c (
stream->fields
topology
c
)]))
;; 生成"組件名稱"->"stream_id->輸出域Fields對象的map"的map
(
into
{})
;; 將其轉化成Java的HashMap
(
HashMap.)))
stream->fields函數:
(
defn-
stream->fields
[
^
StormTopology
topology
component
]
;; 獲取指定組件名的ComponentCommon對象
(
->> (
ThriftTopologyUtils/getComponentCommon
topology
component)
;; 調用ComponentCommon對象的get_streams方法獲取stream_id->StreamInfo對象的map,一個組件能夠有多個輸出流
.get_streams
;; s綁定stream_id,info綁定StremInfo對象,調用StreamInfo對象的get_output_fields獲取輸出域List<String>對象,再用輸出域List<String>對象生成Fields對象
(
map (
fn
[[s
info
]]
[s (
Fields. (
.get_output_fields
info
))]))
;; 生成stream_id->Fields對象的map
(
into
{})
;; 將clojure結構的map轉換成java中的HashMap
(
HashMap.)))
mk-transfer-local-fn函數:
;; mk-transfer-local-fn函數返回一個匿名函數,該匿名函數的主要功能就是接收messages並將message發送到task對應的接收隊列
(
defn
mk-transfer-local-fn
[
worker
]
;; short-executor-receive-queue-map綁定"開始任務id"->executor接收queue的map,如:{1 receive-queue[1 2], 3 receive-queue[3 4]}
(
let
[
short-executor-receive-queue-map (
:short-executor-receive-queue-map
worker)
;; task->short-executor綁定task_id->executor中"開始任務id"的map,如:{1 1, 2 1, 3 3, 4 3}
task->short-executor (
:task->short-executor
worker)
;; task-getter綁定一個由comp生成的組合函數
task-getter (
comp
#(
get
task->short-executor
%)
fast-first
)]
;; 返回一個匿名函數,tuple-batch是一個ArrayList對象,ArrayList的每一個元素都是一個長度爲2的數組[task_id, message],task_id表示該消息由哪一個task處理,message表示消息
(
fn
[
tuple-batch
]
;; 調用fast-group-by函數獲取"executor簡寫id"->須要該executor處理的消息List的map
(
let
[
grouped (
fast-group-by
task-getter
tuple-batch
)]
;; fast-map-iters宏主要用於遍歷map,short-executor標識"executor簡寫id",pairs標識消息[task_id, message]
(
fast-map-iter
[[
short-executor
pairs
]
grouped
]
;; 獲取該executor的接收queue
(
let
[
q (
short-executor-receive-queue-map
short-executor
)]
;; 若是q不爲空,則調用disruptor的publish方法將消息放入disruptor中
(
if
q
(
disruptor/publish
q
pairs)
(
log-warn
"Received invalid messages for unknown tasks. Dropping... ")
)))))))
fast-group-by函數:
;; fast-group-by函數的主要功能就是生成"executor簡寫id"->須要該executor處理的消息List的map
(
defn
fast-group-by
;; afn綁定mk-transfer-local-fn函數中定義的task-getter函數,alist綁定一個ArrayList對象,ArrayList的每一個元素都是一個長度爲2的數組[task_id, message],task_id表示該消息由哪一個task處理,message表示消息
[
afn
alist
]
;; 建立一個HashMap對象ret
(
let
[
ret (
HashMap.
)]
;; fast-list-iter是一個宏,主要功能就是遍歷list
(
fast-list-iter
;; e綁定每一個[task_id, message]數組對象
[
e
alist
]
;; 調用afn綁定的task-getter函數獲取該task_id所屬的"executor的簡寫id",因此key綁定"executor簡寫id"
(
let
[
key (
afn
e)
;; 從ret中獲取key所對應的ArrayList對象,即須要該executor處理的消息列表
^
List
curr (
get-with-default
ret
key (
ArrayList.
))]
;; [task_id, message]數組對象添加到list中
(
.add
curr
e)))
;; 返回ret
ret))
mk-transfer-fn函數:
;; mk-transfer-fn函數主要功能就是將executor處理過的message放到worker進程發送隊列transfer-queue中
(
defn
mk-transfer-fn
[
worker
]
;; local-tasks綁定分佈在該worker進程上的task的id集合
(
let
[
local-tasks (
->
worker
:task-ids
set)
;; local-transfer標識mk-transfer-local-fn返回的匿名函數
local-transfer (
:transfer-local-fn
worker)
;; transfer-queue綁定該worker進程的傳輸隊列transfer-queue
^
DisruptorQueue
transfer-queue (
:transfer-queue
worker)
;; task->node+port綁定task_id->node+port的map
task->node+port (
:cached-task->node+port
worker
)]
;; 返回一個匿名函數,serializer標識一個Kryo序列化器,tuple-batch是一個ArrayList對象,ArrayList的每一個元素都是一個長度爲2的數組[task_id, message],task_id表示該消息由哪一個task處理,即message的目標task,message表示消息
(
fn
[
^
KryoTupleSerializer
serializer
tuple-batch
]
;; local爲ArrayList
(
let
[
local (
ArrayList.)
;; remoteMap爲HashMap
remoteMap (
HashMap.
)]
;; 遍歷tuple-batch
(
fast-list-iter
[[
task
tuple
:as
pair
]
tuple-batch
]
;; 若是接收該消息的task爲本地task,即該task也分佈在該worker進程上,那麼將該消息添加到local中
(
if (
local-tasks
task)
(
.add
local
pair)
;;Using java objects directly to avoid performance issues in java code
;; 不然說明接收該消息的task不是本地task,即該task分佈在其餘worker進程上;node+port標識了運行該task的worker進程所在的節點和端口
(
let
[
node+port (
get
@
task->node+port
task
)]
;; 若是remoteMap不包含node+port,則添加
(
when (
not (
.get
remoteMap
node+port))
(
.put
remoteMap
node+port (
ArrayList.)))
(
let
[
remote (
.get
remoteMap
node+port
)]
;; 首先用task_id和序列化後的tuple生成TaskMessage對象,而後將TaskMessage對象添加到ArrayList中
(
.add
remote (
TaskMessage.
task (
.serialize
serializer
tuple)))
))))
;; 調用local-transfer函數發送須要本地task處理的消息
(
local-transfer
local)
;; 調用disruptor的publish方法將remoteMap放入worker進程的傳輸隊列transfer-queue中,remoteMap的key爲node+port,value爲ArrayList,ArrayList中每一個元素都是須要node+port所對應的worker進行處理
(
disruptor/publish
transfer-queue
remoteMap)
))))
do-heartbeat函數:
(
defn
do-heartbeat
[
worker
]
;; 獲取集羣配置信息
(
let
[
conf (
:conf
worker)
;; 建立WorkerHeartbeat對象
hb (
WorkerHeartbeat.
;; 本次心跳時間
(
current-time-secs)
;; 該worker進程所屬的topology-id
(
:storm-id
worker)
;; 分佈在該worker進程上的executor-id集合
(
:executors
worker)
;; 該worker進程所佔用的端口
(
:port
worker))
;; 建立一個基於目錄"{storm.local.dir}/workers/{worker-id}/heartbeats"的LocalState對象,用於存放worker進程的"本地心跳信息",經過LocalState對象咱們能夠訪問一個序列化到磁盤的map對象
state (
worker-state
conf (
:worker-id
worker
))]
(
log-debug
"Doing heartbeat " (
pr-str
hb))
;; do the local-file-system heartbeat.
;; 將worker進程心跳信息經過LocalState對象存入磁盤,map對象的key爲"worker-heartbeat"字符串,value爲worker心跳信息
(
.put
state
LS-WORKER-HEARTBEAT
hb
false
)
;; 調用LocalState對象的clearup方法,只保留最近60次心跳信息
(
.cleanup
state
60)
; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it
))
do-executor-heartbeats函數:
;; do-executor-heartbeats函數主要功能就是經過worker-heartbeat!函數將worker進程心跳信息寫入zookeeper的workerbeats節點中
(
defnk
do-executor-heartbeats
[
worker
:executors
nil
]
;; stats is how we know what executors are assigned to this worker
;; stats綁定executor對象->executor統計信息的map。當第一次調用do-executor-heartbeats函數時,即第一次心跳時,executors爲nil,map形如:{executor_1 nil, executor_2 nil, ... }
;; 當再次心跳時,將會調用executor對象的get-executor-id函數和render-stats函數,獲取executor_id->executor統計信息的map,因此stats綁定的map在第一次心跳時和再次心跳時是不一樣的,有關executor統計信 息的計算會在之後文章中具體分析。
(
let
[
stats (
if-not
executors
(
into
{} (
map (
fn
[
e
]
{
e
nil
}) (
:executors
worker)))
(
->>
executors
(
map (
fn
[
e
]
{(
executor/get-executor-id
e) (
executor/render-stats
e
)}))
(
apply
merge)))
;; 構建worker進程的心跳信息
zk-hb
{
:storm-id (
:storm-id
worker)
;; 記錄executor統計信息
:executor-stats
stats
;; 記錄worker進程運行了屢次時間
:uptime ((
:uptime
worker))
;; 記錄worker進程心跳時間
:time-secs (
current-time-secs)
}]
;; do the zookeeper heartbeat
;; 調用StormClusterState對象的worker-heartbeat!函數將worker進程心跳信息zk-hb同步到zookeeper的"/workerbeats/{topology-id}/{supervisorId-port}/"節點中
(
.worker-heartbeat! (
:storm-cluster-state
worker) (
:storm-id
worker) (
:assignment-id
worker) (
:port
worker)
zk-hb)
))
mk-refresh-connections函數:
;; mk-refresh-connections函數返回一個名爲this的函數,在"storm啓動supervisor源碼分析-supervisor.clj"中,咱們在mk-synchronize-supervisor函數也見過這種定義函數的方式,是由於這個函數自己要在函數體內被使用。
;; 而且refresh-connections是須要反覆被執行的,即當每次assignment-info發生變化的時候,就須要refresh一次,這裏是經過zookeeper的"watcher機制"實現的
(
defn
mk-refresh-connections
[
worker
]
;; outbound-tasks綁定用於接收該worker進程輸出消息的全部任務,worker-outbound-tasks函數請參見其定義部分
(
let
[
outbound-tasks (
worker-outbound-tasks
worker)
;; conf綁定worker配置信息
conf (
:conf
worker)
;; storm-cluster-state綁定StormClusterState實例
storm-cluster-state (
:storm-cluster-state
worker)
;; storm-id標識該worker進程所屬的topology的id
storm-id (
:storm-id
worker
)]
;; 返回名稱爲this的函數,每次assignment-info發生變化時,就執行一次來refresh該worker進程的connections
(
fn
this
;; 無參版本,提供一個"默認回調函數"調用有參版本,"默認回調函數"就是將this函數無參版本自己添加到worker進程的refresh-connections-timer定時器中,這樣當assignment-info發生變化時,zookeeper的"watcher機制"
;; 就會執行回調函數,refresh-connections-timer定時器線程將會執行this函數。這樣就能夠保證,每次assignment發生變化,定時器都會在後臺作refresh-connections的操做
([]
(
this (
fn
[
&
ignored
] (
schedule (
:refresh-connections-timer
worker)
0
this))))
;; 有參版本
([
callback
]
;; 調用StormClusterState實例的assignment-version函數獲取storm-id的當前分配信息版本,並將callback函數註冊到zookeeper
(
let
[
version (
.assignment-version
storm-cluster-state
storm-id
callback)
;; 若是worker本地緩存的分配版本和zookeeper上獲取的分配版本相等,那麼說明storm-id的分配信息未發生變化,直接從worker本地獲取分配信息
assignment (
if (
=
version (
:version (
get
@(
:assignment-versions
worker)
storm-id)))
(
:data (
get
@(
:assignment-versions
worker)
storm-id))
;; 不然調用assignment-info-with-version函數從zookeeper的"/assignments/{storm-id}"節點從新獲取帶有版本號的分配信息,並註冊回調函數,這樣worker就能感知某個已存在的assignment是否被從新分配
(
let
[
new-assignment (
.assignment-info-with-version
storm-cluster-state
storm-id
callback
)]
;; 將最近分配信息保存到worker本地緩存
(
swap! (
:assignment-versions
worker)
assoc
storm-id
new-assignment)
(
:data
new-assignment)))
;; my-assignment標識"接收該worker進程輸出消息的任務"->[node port]的map
my-assignment (
->
assignment
;; 獲取executor_id->[node port]的map,如:{[1 1] [node1 port1], [4 4] [node1 port1], [2 2] [node2 port1], [5 5] [node2 port1], [3 3] [node3 port1], [6 6] [node3 port1]}
:executor->node+port
;; 獲取task_id->[node port]的map,如:{[1 [node1 port1], 4 [node1 port1], 2 [node2 port1], 5 [node2 port1], 3 [node3 port1], 6 [node3 port1]}
to-task->node+port
;; 選擇"鍵"包含在outbound-tasks集合的鍵值對,假設outbound-tasks=#{4 5 6},過濾後爲{4 [node1 port1], 5 [node2 port1], 6 [node3 port1]}
(
select-keys
outbound-tasks)
;; {4 "node1/port1", 5 "node2/port1", 6 "node3/port1"}
(
#(
map-val
endpoint->string
%)))
;; we dont need a connection for the local tasks anymore
;; 過濾掉分佈在該worker進程上的task,由於分佈在通一個進程上不須要創建socket鏈接。假設該worker進程位於node1的port1上,則needed-assignment={5 "node2/port1", 6 "node3/port1"}
needed-assignment (
->>
my-assignment
(
filter-key (
complement (
->
worker
:task-ids
set))))
;; needed-connections綁定"須要的鏈接"的集合,needed-connections=#{"node2/port1", "node3/port1"}
needed-connections (
->
needed-assignment
vals
set)
;; needed-tasks綁定須要創建鏈接的任務集合,needed-tasks=#{5, 6}
needed-tasks (
->
needed-assignment
keys)
;; current-connections綁定當前該worker進程"已創建的鏈接"的集合
current-connections (
set (
keys
@(
:cached-node+port->socket
worker)))
;; needed-connections和current-connections的差集表示須要"新建的鏈接"的集合,假設current-connections=#{},則new-connections=#{"node2/port1", "node3/port1"}
new-connections (
set/difference
needed-connections
current-connections)
;; current-connections和needed-connections的差集表示須要"刪除的鏈接"的集合
remove-connections (
set/difference
current-connections
needed-connections
)]
;; 將新建的鏈接合併到cached-node+port->socket中
(
swap! (
:cached-node+port->socket
worker)
#(
HashMap. (
merge (
into
{}
%1)
%2))
;; 建立endpoint-str->connection對象的map,即創建新的鏈接。如:{"node2/port1" connect1, "node3/port1" connect2}
(
into
{}
(
dofor
[
endpoint-str
new-connections
:let
[[
node
port
] (
string->endpoint
endpoint-str
)]]
[
endpoint-str
(
.connect
^
IContext (
:mq-context
worker)
storm-id
((
:node->host
assignment)
node)
port)
]
)))
;; 將my-assignment保存到worker進程本地緩存cached-task->node+port中
(
write-locked (
:endpoint-socket-lock
worker)
(
reset! (
:cached-task->node+port
worker)
(
HashMap.
my-assignment)))
;; close須要"刪除的鏈接"
(
doseq
[
endpoint
remove-connections
]
(
.close (
get
@(
:cached-node+port->socket
worker)
endpoint)))
;; 將須要"刪除的鏈接"從worker進程本地緩存cached-node+port->socket中刪除,經過worker進程本地緩存cached-task->node+port和cached-node+port->socket,咱們就能夠或得task和socket的對應關係
(
apply
swap!
(
:cached-node+port->socket
worker)
#(
HashMap. (
apply dissoc (
into
{}
%1)
%
&))
remove-connections)
;; 查找出未創建鏈接的task
(
let
[
missing-tasks (
->>
needed-tasks
(
filter (
complement
my-assignment
)))]
;; 若是存在未創建鏈接的task,則記錄日誌文件
(
when-not (
empty?
missing-tasks)
(
log-warn
"Missing assignment for following tasks: " (
pr-str
missing-tasks))
)))))))
worker-outbound-tasks函數:
;; worker-outbound-tasks函數主要功能就是獲取接收來自該worker消息的組件的task-id集合
(
defn
worker-outbound-tasks
"Returns seq of task-ids that receive messages from this worker"
[
worker
]
;; context綁定backtype.storm.task.WorkerTopologyContext對象,worker-context函數請參見其定義部分
(
let
[
context (
worker-context
worker)
;; 對分佈在該worker進程上的每一個任務的task_id調用匿名函數(fn [task-id] ... ),並對返回結果進行concat操做,components綁定了接收組件id的集合
components (
mapcat
(
fn
[
task-id
]
;; 調用context的getComponentId方法獲取該task-id所屬的組件(spout/bolt)的名稱
(
->> (
.getComponentId
context (
int
task-id))
;; 調用context的getTargets方法,獲取哪些組件接收了componentId輸出的消息
(
.getTargets
context)
vals
;; 獲取接收組件id的集合
(
map
keys)
(
apply
concat)))
;; 獲取分佈在該worker進程上的task_id集合
(
:task-ids
worker
))]
(
->
worker
;; 獲取任務id->組件名稱鍵值對的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
:task->component
;; 結果形如:{"boltA" [1 2 3 4], "boltB" [5 6]}
reverse-map
;; 過濾出"鍵"包含在components集合中的鍵值對
(
select-keys
components)
vals
flatten
;; 獲取接收組件全部任務的id的集合
set )))
worker-context函數:
(
defn
worker-context
[
worker
]
;; 返回backtype.storm.task.WorkerTopologyContext對象
(
WorkerTopologyContext. (
:system-topology
worker)
(
:storm-conf
worker)
(
:task->component
worker)
(
:component->sorted-tasks
worker)
(
:component->stream->fields
worker)
(
:storm-id
worker)
(
supervisor-storm-resources-path
(
supervisor-stormdist-root (
:conf
worker) (
:storm-id
worker)))
(
worker-pids-root (
:conf
worker) (
:worker-id
worker))
(
:port
worker)
(
:task-ids
worker)
(
:default-shared-resources
worker)
(
:user-shared-resources
worker)
))
getTargets方法:
;; WorkerTopologyContext類繼承GeneralTopologyContext類,getTargets方法是GeneralTopologyContext類實例方法,主要功能就是獲取哪些組件接收了componentId輸出的消息
;; 返回值爲一個stream_id->{receive_component_id->Grouping}的map,receive_component_id就是接收組件的id
public
Map<String,
Map<String,
Grouping>>
getTargets(
String
componentId)
{
;; 建立返回結果map,ret
Map<String,
Map<String,
Grouping>>
ret
=
new
HashMap<String,
Map<String,
Grouping>>();
;; 獲取該topology的全部組件ids,並遍歷
for(
String
otherComponentId
:
getComponentIds())
{
;; 經過組件id獲取組件的ComponentCommon對象,而後再獲取其輸入信息inputs
Map<GlobalStreamId,
Grouping>
inputs
=
getComponentCommon(
otherComponentId)
.get_inputs();
;; 遍歷輸入信息,GlobalStreamId對象有兩個成員屬性,一個是流id,一個是發送該流的組件id
for(
GlobalStreamId
id
:
inputs.keySet())
{
;; 若是輸入流的組件id和componentId相等,那麼說明該組件接收來自componentId的輸出,則將其添加到ret中
if(
id.get_componentId()
.equals(
componentId))
{
Map<String,
Grouping>
curr
=
ret.get(
id.get_streamId());
if(
curr==null)
curr
=
new
HashMap<String,
Grouping>();
curr.put(
otherComponentId,
inputs.get(
id));
ret.put(
id.get_streamId(),
curr);
}
}
}
return
ret;
}
refresh-storm-active函數:
;; refresh-storm-active函數主要功能就是refresh指定worker進程緩存的所屬topology的活躍狀態
(
defn
refresh-storm-active
;; "無回調函數"版本,使用默認回調函數調用"有回調函數"版本,默認回調函數將refresh-storm-active函數自己添加到refresh-active-timer定時器
([
worker
]
(
refresh-storm-active
worker (
fn
[
&
ignored
] (
schedule (
:refresh-active-timer
worker)
0 (
partial
refresh-storm-active
worker)))))
;; "有回調函數"版本
([
worker
callback
]
;; 調用StormClusterState實例的storm-base函數,從zookeeper的"/storms/{storm-id}"節點獲取該topology的StormBase數據,並將回調函數callback註冊到zookeeper的"/storms/{storm-id}"節點
;; 這樣當該節點數據發生變化時,callback函數將被執行,即將refresh-storm-active函數添加到refresh-active-timer定時器,refresh-active-timer定時器線程將會執行refresh-storm-active函數
(
let
[
base (
.storm-base (
:storm-cluster-state
worker) (
:storm-id
worker)
callback
)]
;; 更新worker進程緩存的topology的活躍狀態
(
reset!
(
:storm-active-atom
worker)
(
=
:active (
->
base
:status
:type))
))
))
launch-receive-thread函數:
;; 爲worker進程啓動專有接收線程
(
defn
launch-receive-thread
[
worker
]
(
log-message
"Launching receive-thread for " (
:assignment-id
worker)
":" (
:port
worker))
;; launch-receive-thread!函數請參見其定義部分
(
msg-loader/launch-receive-thread!
;; 鏈接實例,0.9版本開始默認使用netty,backtype.storm.messaging.netty.Context實例
(
:mq-context
worker)
(
:storm-id
worker)
;; 接收線程數
(
:receiver-thread-count
worker)
(
:port
worker)
;; 獲取本地消息傳輸函數transfer-local-fn,transfer-local-fn函數將消息發送給分佈在該worker進程上的task相應隊列
(
:transfer-local-fn
worker)
;; 獲取worker進程輸入隊列大小
(
->
worker
:storm-conf (
get
TOPOLOGY-RECEIVER-BUFFER-SIZE))
:kill-fn (
fn
[
t
] (
exit-process!
11))))
launch-receive-thread!函數:
;; launch-receive-thread!函數定義在loader.clj文件中,用於啓動指定worker進程的接收線程
(
defnk
launch-receive-thread!
[
context
storm-id
receiver-thread-count
port
transfer-local-fn
max-buffer-size
:daemon
true
:kill-fn (
fn
[
t
] (
System/exit
1))
:priority
Thread/NORM_PRIORITY
]
;; max-buffer-size綁定worker進程最大輸入隊列大小
(
let
[
max-buffer-size (
int
max-buffer-size)
;; 調用backtype.storm.messaging.netty.Context的bind方法創建一個服務器端的鏈接,socket綁定backtype.storm.messaging.netty.Server實例
socket (
.bind
^
IContext
context
storm-id
port)
;; thread-count綁定接收線程數,默認值爲1
thread-count (
if
receiver-thread-count
receiver-thread-count
1)
;; 調用mk-receive-threads函數建立接收線程,vthreads綁定接收線程所對應的SmartThread實例,經過該實例咱們能夠start、join、interrupt接收線程,mk-receive-threads函數請參見其定義部分
vthreads (
mk-receive-threads
context
storm-id
port
transfer-local-fn
daemon
kill-fn
priority
socket
max-buffer-size
thread-count
)]
;; 返回一個匿名函數,該匿名函數的主要功能就是經過向task_id=-1的任務發送一個空消息來關閉接收線程
(
fn
[]
;; 向本地端口port建立鏈接
(
let
[
kill-socket (
.connect
^
IContext
context
storm-id
"localhost"
port
)]
(
log-message
"Shutting down receiving-thread: ["
storm-id
", "
port
"]")
;; 向task_id=-1的任務發送一個空消息,接收線程在接收消息時,首先檢查是不是發送給task_id=-1消息,若是是則關閉接收線程
(
.send
^
IConnection
kill-socket
-1 (
byte-array
[]))
;; 關閉鏈接
(
.close
^
IConnection
kill-socket)
(
log-message
"Waiting for receiving-thread:["
storm-id
", "
port
"] to die")
;; 等待全部接收線程結束
(
for
[
thread-id (
range
thread-count
)]
(
.join (
vthreads
thread-id)))
(
log-message
"Shutdown receiving-thread: ["
storm-id
", "
port
"]")
))))
mk-receive-threads函數:
;; mk-receive-threads函數循環調用mk-receive-thread函數建立接收線程,mk-receive-thread請參見其定義部分
(
defn-
mk-receive-threads
[
context
storm-id
port
transfer-local-fn
daemon
kill-fn
priority
socket
max-buffer-size
thread-count
]
(
into
[] (
for
[
thread-id (
range
thread-count
)]
(
mk-receive-thread
context
storm-id
port
transfer-local-fn
daemon
kill-fn
priority
socket
max-buffer-size
thread-id))))
mk-receive-thread函數:
(
defn-
mk-receive-thread
[
context
storm-id
port
transfer-local-fn
daemon
kill-fn
priority
socket
max-buffer-size
thread-id
]
;; async-loop函數接收一個"函數"或"函數工廠"做爲參數生成一個java thread,這個java thread不斷循環執行這個"函數"或"函數工廠"生產的函數。async-loop函數返回實現SmartThread協議的實例,經過該實例咱們能夠start、join、interrupt接收線程
(
async-loop
;; 這個參數就是一個"函數工廠","函數工廠"就是一個返回函數的函數
(
fn
[]
(
log-message
"Starting receive-thread: [stormId: "
storm-id
", port: "
port
", thread-id: "
thread-id
" ]")
;; 生成的java thread的run方法不斷循環執行該函數
(
fn
[]
;; batched是一個ArrayList對象
(
let
[
batched (
ArrayList.)
;; backtype.storm.messaging.netty.Server的recv方法返回ArrayList<TaskMessage>的Iterator<TaskMessage>。關於消息的處理流程會在之後文章中具體分析
^
Iterator
iter (
.recv
^
IConnection
socket
0
thread-id)
closed (
atom
false
)]
;; 當iter不爲nil,遍歷iter
(
when
iter
(
while (
and (
not
@
closed) (
.hasNext
iter))
;; packet綁定一個TaskMessage對象,TaskMessage有兩個成員屬性task和message,task表示處理該消息的任務id,message表示消息的byte數組
(
let
[
packet (
.next
iter)
;; task綁定接收該消息的任務id
task (
if
packet (
.task
^
TaskMessage
packet))
;; message綁定消息的byte數組
message (
if
packet (
.message
^
TaskMessage
packet
))]
;; 若是task=-1,則關閉接收線程
(
if (
=
task
-1)
(
do (
log-message
"Receiving-thread:["
storm-id
", "
port
"] received shutdown notice")
(
.close
socket)
(
reset!
closed
true))
;; 不然將數組[task message]添加到batched
(
when
packet (
.add
batched
[
task
message
]))))))
;; 若是接收線程關閉標識closed值爲false,則調用transfer-local-fn函數將接收到的一批消息發送給task對應的接收隊列
(
when (
not
@
closed)
(
do
(
if (
> (
.size
batched)
0)
(
transfer-local-fn
batched))
;; 0表示函數執行完一次不須要sleep,直接進行下一次執行
0)))))
;; 表示參數是一個"函數工廠"
:factory?
true
;; daemon的值爲true,因此接收線程是一個守護線程
:daemon
daemon
;; 指定kill函數
:kill-fn
kill-fn
;; 指定java thread的優先級
:priority
priority
;; 指定接收線程的名稱爲"worker-receiver-thread-"+thread-id
:thread-name (
str
"worker-receiver-thread-"
thread-id)))
以上就是supervisor啓動worker的源碼分析,啓動worker的過程當中涉及了executor的相關內容,這裏沒有詳細分析,會在之後進行分析。同時也涉及了跟消息隊列相關的內容也會在之後進行詳細分析。