supervisor是storm集羣重要組成部分,supervisor主要負責管理各個"工做節點"。supervisor與zookeeper進行通訊,經過zookeeper的"watch機制"能夠感知到是否有新的任務須要認領或哪些任務被從新分配。咱們能夠通用執行bin/storm supervisor >/dev/null 2>&1 &來啓動supervisor。bin/storm是一個python腳本,在這個腳本中定義了一個supervisor函數:java
supervisor函數
def
supervisor(
klass
=
"backtype.storm.daemon.supervisor"
):
"""Syntax: [storm supervisor]
Launches the supervisor daemon. This command should be run
under supervision with a tool like daemontools or monit.
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths
=
[
STORM_DIR
+
"/log4j"
,
STORM_DIR
+
"/conf"
]
jvmopts
=
parse_args(
confvalue(
"supervisor.childopts"
,
cppaths))
+
[
"-Dlogfile.name=supervisor.log"
,
"-Dlog4j.configuration=storm.log.properties"
,
]
exec_storm_class(
klass
,
jvmtype
=
"-server"
,
extrajars
=
cppaths
,
jvmopts
=
jvmopts)
klass參數的默認值爲backtype.storm.daemon.supervisor,backtype.storm.daemon.supervisor標識一個java類。STORM_DIR標識storm的安裝目錄,cppaths集合存放了log4j配置文件路徑和storm配置文件storm.yaml路徑,jvmopts存放傳遞給jvm的參數,包括log4j配文件路徑、storm.yaml路徑、log4j日誌名稱和log4j配置文件名稱。exec_storm_class函數的邏輯比較簡單,具體實現以下:node
exec_storm_class函數
def
exec_storm_class(
klass
,
jvmtype
=
"-server"
,
jvmopts
=
[],
extrajars
=
[],
args
=
[],
fork
=
False
):
global
CONFFILE
all_args
=
[
"java"
,
jvmtype
,
get_config_opts
(),
"-Dstorm.home="
+
STORM_DIR
,
"-Djava.library.path="
+
confvalue(
"java.library.path"
,
extrajars
),
"-Dstorm.conf.file="
+
CONFFILE
,
"-cp"
,
get_classpath(
extrajars
),
]
+
jvmopts
+
[
klass
]
+
list(
args)
print
"Running: "
+
" "
.
join(
all_args)
if
fork
:
os
.
spawnvp(
os
.
P_WAIT
,
"java"
,
all_args)
else
:
os
.
execvp(
"java"
,
all_args)
# replaces the current process and never returns
get_config_opts()獲取jvm的默認配置信息,confvalue("java.library.path", extrajars)獲取storm使用的本地庫JZMQ加載路徑,get_classpath(extrajars)獲取全部依賴jar包的完整路徑,而後拼接一個java -cp命令運行klass的main方法。klass默認值爲backtype.storm.daemon.supervisor,因此exec_storm_class函數最終調用backtype.storm.daemon.supervisor類的main方法。
backtype.storm.daemon.supervisor類定義在supervisor.clj文件中,定義以下:python
backtype.storm.daemon.supervisor類
(ns
backtype.storm.daemon.supervisor
(
:import
[
backtype.storm.scheduler
ISupervisor
])
(
:use
[
backtype.storm
bootstrap
])
(
:use
[
backtype.storm.daemon
common
])
(
:require
[
backtype.storm.daemon
[
worker
:as
worker
]])
(
:gen-class
:methods
[
^
{
:static
true
}
[
launch
[
backtype.storm.scheduler.ISupervisor
]
void
]]))
(
bootstrap)
;; ... ...
;; 其餘方法
;; ... ...
(
defn
-main
[]
(
-launch (
standalone-supervisor)))
:gen-class指示Clojure生成Java類backtype.storm.daemon.supervisor,而且聲明一個靜態方法launch,launch方法接收一個實現backtype.storm.scheduler.ISupervisor接口的實例做爲參數。launch函數的參數是由standalone-supervisor函數生成的。standalone-supervisor函數定義以下:返回一個實現ISupervisor接口的實例。git
standalone-supervisor函數
;; 該函數主要返回一個實現了ISupervisor接口的實例。
(
defn
standalone-supervisor
[]
(
let
[
conf-atom (
atom
nil)
id-atom (
atom
nil
)]
(
reify
ISupervisor
;; prepare方法主要功能是建立一個基於磁盤的存放K/V對的database——LocalState對象,LocalState類參見其定義部分
(
prepare
[
this
conf
local-dir
]
;; conf-atom原子類型,綁定storm集羣配置信息
(
reset!
conf-atom
conf)
;; state綁定LocalState對象,local-dir標識database在磁盤上的根目錄,database實際就是一個HashMap對象序列化後存放到磁盤local-dir目錄下
(
let
[
state (
LocalState.
local-dir)
;; LS-ID值爲字符串"supervisor-id",定義在common.clj文件中。若是state中存放了該supervisor的id,那麼curr-id綁定該id,不然curr-id綁定32爲uuid
curr-id (
if-let
[
id (
.get
state
LS-ID
)]
id
;; 調用uuid函數生成一個32的id
(
generate-supervisor-id
))]
;; 調用state的put函數,更新該supervisor的id
(
.put
state
LS-ID
curr-id)
;; id-atom原子類型,綁定該supervisor的id
(
reset!
id-atom
curr-id))
)
;; 返回true
(
confirmAssigned
[
this
port
]
true)
;; 從storm配置信息中獲取supervisor的全部端口,由於clojure中的map函數返回的是"懶惰序列",因此須要調用doall函數對"懶惰序列"進行徹底實例化
(
getMetadata
[
this
]
(
doall (
map int (
get
@
conf-atom
SUPERVISOR-SLOTS-PORTS))))
;; 獲取supervisor的id
(
getSupervisorId
[
this
]
@
id-atom)
;; 獲取supervisor的分配id即其id
(
getAssignmentId
[
this
]
@
id-atom)
;; killedWorker空實現
(
killedWorker
[
this
port
]
)
;; assigned空實現
(
assigned
[
this
ports
]
))))
LocalState類是個java類,定義見LocalState.java,這個類有一個VersionedStore類型對象,VersionedStore類見VersionedStore.java,因爲這兩個類是java實現,並且也比較簡單,這樣就在詳細分析。github
mk-supervisor函數定義以下:數據庫
mk-supervisor函數
;; conf綁定storm集羣配置信息,isupervisor綁定standalone-supervisor函數返回的實現ISupervisor接口的實例
(
defserverfn
mk-supervisor
[
conf
shared-context
^
ISupervisor
isupervisor
]
、
;; 打印日誌信息
(
log-message
"Starting Supervisor with conf "
conf)
;; supervisor-isupervisor-dir函數調用了supervisor-local-dir函數,supervisor-local-dir函數從storm配置中獲取storm的安裝路徑,而後在supervisor上建立目錄{storm.local.dir}/supervisor,並返回目錄
;; supervisor-isupervisor-dir函數返回字符串"{storm.local.dir}/supervisor/isupervisor"做爲一個LocalState對象的根目錄,該LocalState對象只用來存放supervisor的id
;; 調用isupervisor的prepare方法,建立一個LocalState對象,並生成該supervisor的id,將其存入LocalState對象
(
.prepare
isupervisor
conf (
supervisor-isupervisor-dir
conf))
;; supervisor-tmp-dir函數在supervisor上建立{storm.local.dir}/supervisor/tmp目錄;FileUtils類的cleanDirectory方法清空該目錄
(
FileUtils/cleanDirectory (
File. (
supervisor-tmp-dir
conf)))
;; supervisor綁定supervisor元數據信息,supervisor-data參見其定義部分
(
let
[
supervisor (
supervisor-data
conf
shared-context
isupervisor)
;; event-manager和processes-event-manager分別綁定一個EventManager實例,managers綁定包含兩個EventManager實例的集合。event-manager函數請參見文章"storm事件管理器EventManager源碼分析-event.clj"
[
event-manager
processes-event-manager
:as
managers
]
[(
event/event-manager
false) (
event/event-manager
false
)]
;; partial用於定義"偏函數",所謂偏函數就是給一個指定函數的某些參數預賦值,這樣就獲得了一個新函數。sync-processes就綁定這個新函數,sync-processes參見其定義部分
sync-processes (
partial
sync-processes
supervisor)
;; synchronize-supervisor綁定一個函數,該函數主要功能就是當assignment發生變化時, 從nimbus同步topology的代碼到本地,當assignment發生變化時, check workers狀態, 保證被分配的work的狀態都是valid
synchronize-supervisor (
mk-synchronize-supervisor
supervisor
sync-processes
event-manager
processes-event-manager)
;; heartbeat-fn綁定一個匿名函數,該匿名函數主要功能是調用StormClusterState實例的supervisor-heartbeat!函數將該supervisor的心跳信息SupervisorInfo實例寫入zookeeper的"/supervisors/supervisor-id"節點中
heartbeat-fn (
fn
[] (
.supervisor-heartbeat!
;; StormClusterState實例
(
:storm-cluster-state
supervisor)
;; supervisor-id
(
:supervisor-id
supervisor)
;; 建立SupervisorInfo實例,即該supervisor的心跳信息
(
SupervisorInfo. (
current-time-secs)
;; 主機名
(
:my-hostname
supervisor)
;; assignment-id即supervisor-id
(
:assignment-id
supervisor)
;; 當前集羣已使用的全部port
(
keys
@(
:curr-assignment
supervisor))
;; used ports
;; 該supervisor上全部可用port,即在storm配置文件中配置的port
(
.getMetadata
isupervisor)
(
conf
SUPERVISOR-SCHEDULER-META)
;; supervisor啓動時間
((
:uptime
supervisor
)))))]
;; 調用heartbeat-fn綁定的匿名函數,將supervisor心跳心跳寫入zookeeper
(
heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
;; 調用timer.clj中的schedule-recurring函數向該supervisor的定時器中添加一個週期執行的定時任務heartbeat-fn--"向zookeeper彙報superior的心跳信息",關於storm定時器的詳細信息請參看"storm定時器timer源碼分析-timer.clj"
(
schedule-recurring (
:timer
supervisor)
0
;; 指定每隔多長時間彙報一次心跳信息
(
conf
SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
;; 若是supervisor.enable值爲true時(默認值就是true,並且不會改變,因此必定會執行),那麼將synchronize-supervisor綁定的函數(mk-synchronize-supervisor函數返回的函數)每隔10s加入event-manager事件管理器中,
;; 這樣即便zookeeper的"watcher機制"異常時,supervisor也能夠主動的獲取分配信息的變化。同時將sync-processes綁定的函數(sync-processes函數)每隔SUPERVISOR-MONITOR-FREQUENCY-SECS秒加入processes-event-manager事件管理器中,
;; 這樣即便zookeeper的"watcher機制"異常時,supervisor也能夠正常管理worker
(
when (
conf
SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
(
schedule-recurring (
:timer
supervisor)
0
10 (
fn
[] (
.add
event-manager
synchronize-supervisor)))
(
schedule-recurring (
:timer
supervisor)
0
(
conf
SUPERVISOR-MONITOR-FREQUENCY-SECS)
(
fn
[] (
.add
processes-event-manager
sync-processes))))
(
log-message
"Starting supervisor with id " (
:supervisor-id
supervisor)
" at host " (
:my-hostname
supervisor))
;; 返回實現了Shutdownable接口、SupervisorDaemon協議和DaemonCommon協議的實例
(
reify
Shutdownable
;; 關閉supervisor,就是關閉該supervisor所擁護的資源
(
shutdown
[
this
]
(
log-message
"Shutting down supervisor " (
:supervisor-id
supervisor))
(
reset! (
:active
supervisor)
false)
(
cancel-timer (
:timer
supervisor))
(
.shutdown
event-manager)
(
.shutdown
processes-event-manager)
(
.disconnect (
:storm-cluster-state
supervisor)))
SupervisorDaemon
;; 返回集羣配置信息
(
get-conf
[
this
]
conf)
;; 返回supervisor-id
(
get-id
[
this
]
(
:supervisor-id
supervisor))
;; 見名知意,關閉全部worker
(
shutdown-all-workers
[
this
]
(
let
[
ids (
my-worker-ids
conf
)]
(
doseq
[
id
ids
]
(
shutdown-worker
supervisor
id)
)))
DaemonCommon
(
waiting?
[
this
]
(
or (
not
@(
:active
supervisor))
(
and
;; 定時器線程是否處於sleep狀態
(
timer-waiting? (
:timer
supervisor))
;; 調用事件管理器的waiting?函數檢查event-manager和processes-event-manager內事件執行線程是否處於sleep狀態,memfn宏能夠自動生成代碼以使得java方法能夠當成clojure裏面的函數
(
every? (
memfn
waiting?)
managers)))
))))
supervisor-data函數定義以下:bootstrap
supervisor-data函數返回一個包含了supervisor元數據的map對象。windows
supervisor-data函數
(
defn
supervisor-data
[
conf
shared-context
^
ISupervisor
isupervisor
]
;; 保存集羣配置信息
{
:conf
conf
;; 啓動supervisor時,shared-context爲nil
:shared-context
shared-context
;; 保存supervisor實例
:isupervisor
isupervisor
;; 保存supervisor是不是活躍的(默認是活躍的)
:active (
atom
true)
;; 保存supervisor啓動時間
:uptime (
uptime-computer)
;; 保存工做線程id
:worker-thread-pids-atom (
atom
{})
;; 保存StormClusterState對象
:storm-cluster-state (
cluster/mk-storm-cluster-state
conf)
;; 保存supervisor的LocalState對象,該LocalState對象的根目錄是"{storm.local.dir}/supervisor/localstate"
:local-state (
supervisor-state
conf)
;; 保存supervisor的id
:supervisor-id (
.getSupervisorId
isupervisor)
;; 保存supervisor的分配id,分配id與supervisor_id相同
:assignment-id (
.getAssignmentId
isupervisor)
;; 保存supervisor的主機名,若是配置conf(map對象)中包含"storm.local.hostname",那麼就使用配置的主機名,不然經過調用InetAddress.getLocalHost().getCanonicalHostName()獲取主機名
:my-hostname (
if (
contains?
conf
STORM-LOCAL-HOSTNAME)
(
conf
STORM-LOCAL-HOSTNAME)
(
local-hostname))
;; 心跳時彙報當前集羣的全部分配信息
:curr-assignment (
atom
nil)
;; used for reporting used ports when heartbeating
;; 保存一個storm定時器timer,kill-fn函數會在timer-thread發生exception的時候被調用
:timer (
mk-timer
:kill-fn (
fn
[
t
]
(
log-error
t
"Error when processing event")
(
exit-process!
20
"Error when processing an event")
))
;; 建立一個用於存放帶有版本號的分配信息的map
:assignment-versions (
atom
{})
})
sync-processes函數定義以下:緩存
sync-processes函數
;; sync-processes函數用於管理workers, 好比處理不正常的worker或dead worker, 並建立新的workers
;; supervisor標識supervisor的元數據
(
defn
sync-processes
[
supervisor
]
;; conf綁定storm的配置信息map
(
let
[
conf (
:conf
supervisor)
;; local-state綁定supervisor的LocalState實例
^
LocalState
local-state (
:local-state
supervisor)
;; 從supervisor的LocalState實例中獲取本地分配信息端口port->LocalAssignment實例的map,LocalAssignment實例封裝了storm-id和分配給該storm-id的executors
assigned-executors (
defaulted (
.get
local-state
LS-LOCAL-ASSIGNMENTS)
{})
;; now綁定當前時間
now (
current-time-secs)
;; allocated綁定worker-id->worker狀態和心跳的map,read-allocated-workers函數請參見其定義部分
allocated (
read-allocated-workers
supervisor
assigned-executors
now)
;; 過濾掉allocated中state不等於:valid的元素,並將過濾後的結果綁定到keepers
keepers (
filter-val
(
fn
[[
state
_
]] (
=
state
:valid))
allocated)
;; keep-ports綁定keepers中心跳信息所包含的端口
keep-ports (
set (
for
[[
id
[
_
hb
]]
keepers
] (
:port
hb)))
;; reassign-executors綁定assigned-executors中端口不在集合keep-ports的鍵值對構成的map,也就是說已分配的線程所對應的進程掛掉了,須要從新進行分配
reassign-executors (
select-keys-pred (
complement
keep-ports)
assigned-executors)
;; new-worker-ids綁定port->worker-id的map,new-worker-ids保存了須要從新啓動進程的worker-id
new-worker-ids (
into
{}
(
for
[
port (
keys
reassign-executors
)]
[
port (
uuid
)]))
]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; - read pids, kill -9 and individually remove file
;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
;; 3. of the rest, figure out what assignments aren't yet satisfied
;; 4. generate new worker ids, write new "approved workers" to LS
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch
(
log-debug
"Syncing processes")
(
log-debug
"Assigned executors: "
assigned-executors)
(
log-debug
"Allocated: "
allocated)
;; allocated綁定worker-id->worker狀態和心跳的map,id綁定worker-id,state綁定worker狀態,heartbeat綁定worker心跳時間
(
doseq
[[
id
[
state
heartbeat
]]
allocated
]
;; 若是worker的狀態不是:valid,那麼就關閉worker
(
when (
not=
:valid
state)
(
log-message
"Shutting down and clearing state for id "
id
". Current supervisor time: "
now
". State: "
state
", Heartbeat: " (
pr-str
heartbeat))
;; shutdown-worker函數關閉進程,shutdown-worker函數請參見其定義部分
(
shutdown-worker
supervisor
id)
))
;; new-worker-ids保存了須要從新啓動進程的worker-id,遍歷new-worker-ids,爲每一個worker-id建立本地目錄"{storm.local.dir}/workers/{worker_id}"
(
doseq
[
id (
vals
new-worker-ids
)]
(
local-mkdirs (
worker-pids-root
conf
id)))
;; 將合併後的map從新保存到local-state的LS-APPROVED-WORKERS中
(
.put
local-state
LS-APPROVED-WORKERS
;; 將new-worker-ids的鍵值交換由原來的port->worker-id轉換成worker-id->port,並與local-state的LS-APPROVED-WORKERS合併
(
merge
;; select-keys函數從local-state的LS-APPROVED-WORKERS中獲取key包含在keepers中的鍵值對,返回結果是一個map
(
select-keys (
.get
local-state
LS-APPROVED-WORKERS)
(
keys
keepers))
;; zipmap函數返回new-worker-ids的worker-id->port的map
(
zipmap (
vals
new-worker-ids) (
keys
new-worker-ids))
))
;; wait-for-workers-launch函數等待全部worker啓動完成,請參見wait-for-workers-launch函數定義部分
(
wait-for-workers-launch
conf
;; assignment綁定在該port運行的executor信息
(
dofor
[[
port
assignment
]
reassign-executors
]
;; id爲port所對應的worker-id
(
let
[
id (
new-worker-ids
port
)]
(
log-message
"Launching worker with assignment "
(
pr-str
assignment)
" for this supervisor "
(
:supervisor-id
supervisor)
" on port "
port
" with id "
id
)
;; launch-worker函數負責啓動worker,關於worker啓動的相關分析會在之後的文章中詳細介紹,在此再也不介紹
(
launch-worker
supervisor
(
:storm-id
assignment)
port
id)
id)))
))
read-allocated-workers函數定義以下:服務器
read-allocated-workers函數
;; 返回worker-id->worker狀態和心跳的map,若是worker心跳爲nil,那麼worker是"dead"
(
defn
read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
;; supervisor綁定supervisor元數據,assigned-executors綁定supervisor分配信息端口port->LocalAssignment實例的map,now綁定當前時間
[
supervisor
assigned-executors
now
]
;; 獲取集羣配置信息
(
let
[
conf (
:conf
supervisor)
;; 獲取supervisor的LocalState實例
^
LocalState
local-state (
:local-state
supervisor)
;; id->heartbeat綁定supervisor上運行進程的worker-id->心跳信息的map
id->heartbeat (
read-worker-heartbeats
conf)
;; approved-ids綁定supervisor的LocalState實例中保存的worker-id的集合
approved-ids (
set (
keys (
.get
local-state
LS-APPROVED-WORKERS
)))]
;; 生成worker-id->[state hb]的map
(
into
{}
(
dofor
[[
id
hb
]
id->heartbeat
]
;; cond至關於if...else嵌套
(
let
[
state (
cond
;; 若是心跳信息爲nil,那麼state值爲:not-started關鍵字
(
not
hb)
:not-started
;; 若是approved-ids不包含id或者matches-an-assignment?返回false,那麼state值爲:disallowed關鍵字
(
or (
not (
contains?
approved-ids
id))
;; matches-an-assignment?函數經過比較心跳信息和分配信息中的storm-id和線程id集合是否相同,來斷定該worker是否已分配
(
not (
matches-an-assignment?
hb
assigned-executors)))
:disallowed
;; 若是當前時間-上次心跳時間>心跳超時時間,state值爲:timed-out關鍵字
(
> (
-
now (
:time-secs
hb))
(
conf
SUPERVISOR-WORKER-TIMEOUT-SECS))
:timed-out
;; 以上條件均不知足時,state值爲:valid關鍵字
true
:valid
)]
(
log-debug
"Worker "
id
" is "
state
": " (
pr-str
hb)
" at supervisor time-secs "
now)
[
id
[
state
hb
]]
))
)))
read-worker-heartbeats函數定義以下:
read-worker-heartbeats函數
;; 獲取supervisor上運行進程的worker-id->心跳信息的map
(
defn
read-worker-heartbeats
"Returns map from worker id to heartbeat"
[
conf
]
;; ids綁定supervisor上進程的worker-id集合
(
let
[
ids (
my-worker-ids
conf
)]
;; 生成worker-id->心跳信息的map
(
into
{}
(
dofor
[
id
ids
]
;; read-worker-heartbeat函數獲取指定worker-id的心跳信息,從supervisor上"{storm.local.dir}/workers/{worker-id}/heartbeats"中獲取心跳信息
[
id (
read-worker-heartbeat
conf
id
)]))
))
my-worker-ids函數定義以下:
my-worker-ids函數
;; 獲取supervisor上運行的進程的worker-id
(
defn
my-worker-ids
[
conf
]
;; worker-root函數返回supervisor本地目錄"{storm.local.dir}/workers",read-dir-contents函數獲取目錄"{storm.local.dir}/workers"下全部文件名的集合(即該supervisor上正在運行的全部進程的worker-id)
(
read-dir-contents (
worker-root
conf)))
matches-an-assignment?函數定義以下:
matches-an-assignment?函數
;; worker-heartbeat標識心跳信息,assigned-executors標識supervisor分配信息port->LocalAssignment實例的map
(
defn
matches-an-assignment?
[
worker-heartbeat
assigned-executors
]
;; 從worker-heartbeat中獲取進程佔用的端口,進而從assigned-executors中獲取LocalAssignment實例
(
let
[
local-assignment (
assigned-executors (
:port
worker-heartbeat
))]
;; 若是local-assignment不爲nil,且心跳信息中的storm-id和分配信息中的storm-id相等,且心跳信息中的線程id集合和分配信息中的線程id集合相等,那麼返回true;不然返回false
(
and
local-assignment
(
= (
:storm-id
worker-heartbeat) (
:storm-id
local-assignment))
;; Constants/SYSTEM_EXECUTOR_ID標識"系統bolt"的線程id,我定義的topology除了咱們指定的spout和bolt外,還包含一些"系統bolt"
(
= (
disj (
set (
:executors
worker-heartbeat))
Constants/SYSTEM_EXECUTOR_ID)
(
set (
:executors
local-assignment))))))
shutdown-worker函數定義以下:
shutdown-worker函數
;; 關閉進程,supervisor標識supervisor元數據,id標識worker_id
(
defn
shutdown-worker
[
supervisor
id
]
(
log-message
"Shutting down " (
:supervisor-id
supervisor)
":"
id)
;; conf綁定集羣配置信
(
let
[
conf (
:conf
supervisor)
;; 注意當storm集羣"分佈式模式"運行時,supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路徑中存放了worker實際對應的jvm進程id
;; 從supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路徑獲取進程id,worker_id標識咱們指定的進程id,pids目錄存放了該worker實際對應的jvm進程的id
pids (
read-dir-contents (
worker-pids-root
conf
id))
;; 注意當storm集羣"本地模式"運行時,supervisor元數據中關鍵字:worker-thread-pids-atom所對應的map用於存放worker_id->線程id集合的鍵值對
;; 先從supervisor元數據中獲取worker-id(咱們人爲分配給worker的id)->jvm進程id的map,thread-pid實際上綁定的是worker的jvm進程id
thread-pid (
@(
:worker-thread-pids-atom
supervisor)
id
)]
;; 當thread-pid不爲空時,kill掉該進程
(
when
thread-pid
;; 調用backtype.storm.process-simulator中的kill-process函數kill掉進程
(
psim/kill-process
thread-pid))
;; 遍歷pids集合,kill掉每一個進程
(
doseq
[
pid
pids
]
;; 調用backtype.storm.util中的kill-process-with-sig-term函數,kill-process-with-sig-term函數又調用了send-signal-to-process函數,send-signal-to-process函數實現比較簡單就是執行系統命令"kill -15 pid",kill掉進程
;; 注意在建立worker進程時爲worker進程指定了關閉回調函數,當調用"kill -15 pid"關閉worker進程時會觸發回調函數執行,回調函數是在worker.clj的mk-worker函數中添加的
(
kill-process-with-sig-term
pid))
;; 若是pids不爲空,sleep 1秒,等着"清理函數"--關閉回調函數執行完畢
(
if-not (
empty?
pids) (
sleep-secs
1))
;; allow 1 second for execution of cleanup threads on worker.
;; 經過調用"kill -15 pid"命令未能關閉的進程,將經過調用force-kill-process函數關閉,force-kill-process函數只是調用了"kill -9 pid"命令
(
doseq
[
pid
pids
]
(
force-kill-process
pid)
(
try
;; 刪除"{storm.local.dir}/workers/{worker_id}/pids"
(
rmpath (
worker-pid-path
conf
id
pid))
(
catch
Exception
e)))
;; on windows, the supervisor may still holds the lock on the worker directory
;; try-cleanup-worker函數清理本地目錄,try-cleanup-worker函數參見其定義部分
(
try-cleanup-worker
conf
id))
(
log-message
"Shut down " (
:supervisor-id
supervisor)
":"
id))
try-cleanup-worker函數定義以下:
try-cleanup-worker函數
;; 清理本地目錄
(
defn
try-cleanup-worker
[
conf
id
]
(
try
;; 刪除"{storm.local.dir}/workers/{worker_id}/heartbeats"目錄
(
rmr (
worker-heartbeats-root
conf
id))
;; this avoids a race condition with worker or subprocess writing pid around same time
;; 刪除"{storm.local.dir}/workers/{worker_id}/pids"目錄
(
rmpath (
worker-pids-root
conf
id))
;; 刪除"{storm.local.dir}/workers/{worker_id}"目錄
(
rmpath (
worker-root
conf
id))
(
catch
RuntimeException
e
(
log-warn-error
e
"Failed to cleanup worker "
id
". Will retry later")
)
(
catch
java.io.FileNotFoundException
e (
log-message (
.getMessage
e)))
(
catch
java.io.IOException
e (
log-message (
.getMessage
e)))
))
wait-for-workers-launch函數定義以下:
wait-for-workers-launch函數
;; wait-for-workers-launch函數等待全部worker啓動完成
(
defn-
wait-for-workers-launch
[
conf
ids
]
(
let
[
start-time (
current-time-secs
)]
(
doseq
[
id
ids
]
;; 調用wait-for-worker-launch函數
(
wait-for-worker-launch
conf
id
start-time))
))
wait-for-worker-launch函數定義以下:
wait-for-worker-launch函數
;; wait-for-worker-launch函數等待worker啓動完成,worker啓動完成的條件是:若是worker在規定的心跳超時時間內有一次心跳那麼就說明worker成功啓動
(
defn-
wait-for-worker-launch
[
conf
id
start-time
]
(
let
[
state (
worker-state
conf
id
)]
(
loop
[]
(
let
[
hb (
.get
state
LS-WORKER-HEARTBEAT
)]
(
when (
and
(
not
hb)
(
<
(
- (
current-time-secs)
start-time)
(
conf
SUPERVISOR-WORKER-START-TIMEOUT-SECS)
))
(
log-message
id
" still hasn't started")
(
Time/sleep
500)
(
recur)
)))
(
when-not (
.get
state
LS-WORKER-HEARTBEAT)
(
log-message
"Worker "
id
" failed to start")
)))
mk-synchronize-supervisor函數定義以下:
mk-synchronize-supervisor函數
;; mk-synchronize-supervisor函數返回一個名字爲"this"的函數,
(
defn
mk-synchronize-supervisor
[
supervisor
sync-processes
event-manager
processes-event-manager
]
(
fn
this
[]
;; conf綁定集羣配置信息
(
let
[
conf (
:conf
supervisor)
;; storm-cluster-state綁定StormClusterState對象
storm-cluster-state (
:storm-cluster-state
supervisor)
;; isupervisor綁定實現了ISupervisor接口的實例
^
ISupervisor
isupervisor (
:isupervisor
supervisor)
;; local-state綁定LocalState實例
^
LocalState
local-state (
:local-state
supervisor)
;; sync-callback綁定一個匿名函數,這個匿名函數的主要功能就是將上面定義的"this"函數添加到event-manager中,這樣"this"函數將會在一個新的線程內執行
;; 每次執行,都須要再一次把sync-callback註冊到zookeeper中做爲回調函數,以保證下次能夠被繼續觸發,當zookeeper的子節點"/assignments"發生變化時執行回調函數sync-callback
sync-callback (
fn
[
&
ignored
] (
.add
event-manager
this))
;; assignment-versions綁定帶有版本號的分配信息,topology-id->分配信息的map
assignment-versions
@(
:assignment-versions
supervisor)
;; assignments-snapshot綁定topoloy-id->分配信息AssignmentInfo對象的map,versions綁定帶有版本號的分配信息,assignments-snapshot函數從zookeeper的子節點"/assignments"獲取分配信息(當前集羣分配信息快照),並將回調函數添加到子節點"/assignments"上,assignments-snapshot函數參見其定義部分
{
assignments-snapshot
:assignments
versions
:versions
} (
assignments-snapshot
storm-cluster-state
sync-callback
assignment-versions)
;; 調用read-storm-code-locations函數獲取topology-id->nimbus上該topology代碼目錄的map
storm-code-map (
read-storm-code-locations
assignments-snapshot)
;; read-downloaded-storm-ids函數從supervisor本地的"{storm.local.dir}/stormdist"目錄讀取已經下載了代碼jar包的topology-id
downloaded-storm-ids (
set (
read-downloaded-storm-ids
conf))
;; all-assignment綁定該supervisor上的全部分配信息,即port->LocalAssignment對象的map
all-assignment (
read-assignments
assignments-snapshot
(
:assignment-id
supervisor))
;; 調用isupervisor對象的confirmAssigned函數驗證all-assignment的key即port的有效性,將經過驗證的保存到new-assignment中。isupervisor對象是在standalone-supervisor函數中建立的,查看standalone-supervisor函數,咱們能夠發現isupervisor對象的confirmAssigned函數只是返回true,因此new-assignment=all-assignment
new-assignment (
->>
all-assignment
(
filter-key
#(
.confirmAssigned
isupervisor
%)))
;; assigned-storm-ids綁定分配給該supervisor的topology-id的集合
assigned-storm-ids (
assigned-storm-ids-from-port-assignments
new-assignment)
;; existing-assignment綁定該supervisor上已經存在的分配信息
existing-assignment (
.get
local-state
LS-LOCAL-ASSIGNMENTS
)]
(
log-debug
"Synchronizing supervisor")
(
log-debug
"Storm code map: "
storm-code-map)
(
log-debug
"Downloaded storm ids: "
downloaded-storm-ids)
(
log-debug
"All assignment: "
all-assignment)
(
log-debug
"New assignment: "
new-assignment)
;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
;; should we only download when topology is assigned to this supervisor?
;; storm-code-map綁定當前集羣上已分配的全部topology-id->nimbus上代碼jar包目錄的鍵值對的map
(
doseq
[[
storm-id
master-code-dir
]
storm-code-map
]
;; 若是downloaded-storm-ids集合不包含該storm-id,且assigned-storm-ids集合包含該storm-id(代表該storm-id須要在該superior上運行,可是該storm-id的代碼jar包尚未從nimbus服務器下載到本地),則調用download-storm-code函數下載代碼jar包
(
when (
and (
not (
downloaded-storm-ids
storm-id))
(
assigned-storm-ids
storm-id))
(
log-message
"Downloading code for storm id "
storm-id
" from "
master-code-dir)
;; 從nimbus服務器上下載該storm-id相關的代碼jar包,序列化後的topology對象,運行時所需的配置信息,並將其保存到"{storm.local.dir}/nimbus/stormdist/{storm-id}/"目錄
(
download-storm-code
conf
storm-id
master-code-dir)
(
log-message
"Finished downloading code for storm id "
storm-id
" from "
master-code-dir)
))
(
log-debug
"Writing new assignment "
(
pr-str
new-assignment))
;; existing-assignment與new-assignment的差集表示不須要在該supervisor上運行的分配的集合,因此要把這些分配對應的worker關閉
(
doseq
[p (
set/difference (
set (
keys
existing-assignment))
(
set (
keys
new-assignment
)))]
;; 當前storm版本0.9.2中,killedWorker爲空實現,因此什麼都沒作
(
.killedWorker
isupervisor (
int p)))
;; assigned函數爲空實現,什麼也沒有作
(
.assigned
isupervisor (
keys
new-assignment))
;; 將最新分配信息new-assignment保存到local-state數據庫中
(
.put
local-state
LS-LOCAL-ASSIGNMENTS
new-assignment)
;; 將帶有版本號的分配信息versions存入supervisor緩存:assignment-versions中
(
swap! (
:assignment-versions
supervisor)
versions)
;; 從新設置supervisor緩存的:curr-assignment值爲new-assignment,即保存當前storm集羣上最新分配信息
(
reset! (
:curr-assignment
supervisor)
new-assignment)
;; remove any downloaded code that's no longer assigned or active
;; important that this happens after setting the local assignment so that
;; synchronize-supervisor doesn't try to launch workers for which the
;; resources don't exist
;; 若是當前supervisor服務器的操做系統是"Windows_NT"系統,那麼執行shutdown-disallowed-workers函數,關閉狀態爲:disallowed的worker
(
if
on-windows? (
shutdown-disallowed-workers
supervisor))
;; 遍歷downloaded-storm-ids集合,該集合內存放了已經下載了jar包等信息的topology的id
(
doseq
[
storm-id
downloaded-storm-ids
]
;; 若是storm-id不在assigned-storm-ids集合內,則遞歸刪除"{storm.local.dir}/supervisor/stormdist/{storm-id}"目錄。assigned-storm-ids表示當前須要在該supervisor上運行的topology的id
(
when-not (
assigned-storm-ids
storm-id)
(
log-message
"Removing code for storm id "
storm-id)
(
try
(
rmr (
supervisor-stormdist-root
conf
storm-id))
(
catch
Exception
e (
log-message (
.getMessage
e))))
))
;; 將sync-processes函數添加到processes-event-manager事件管理器中,這樣就能夠在一個單獨線程內執行sync-processes函數。由於sync-processes函數比較耗時,因此須要在一個新的線程內執行
(
.add
processes-event-manager
sync-processes)
)))
assignments-snapshot函數定義以下:
assignments-snapshot函數
;; assignments-snapshot函數從zookeeper的子節點"/assignments"獲取分配信息,並將回調函數添加到子節點"/assignments"上,assignment-versions綁定該supervisor本地緩存的帶有版本號的分配信息
(
defn-
assignments-snapshot
[
storm-cluster-state
callback
assignment-versions
]
;; storm-ids綁定已分配的topology-id的集合,獲取/assignments的子節點列表,若是callback不爲空,將其賦值給assignments-callback,並對/assignments添加"節點觀察",這樣supervisor就能感知集羣是否有新的assignment或者有assignment被刪除
(
let
[
storm-ids (
.assignments
storm-cluster-state
callback
)]
;; new-assignments綁定最新分配信息
(
let
[
new-assignments
(
->>
;; sid綁定topology-id
(
dofor
[
sid
storm-ids
]
;; recorded-version綁定該supervisor上緩存的該sid的分配信息版本號
(
let
[
recorded-version (
:version (
get
assignment-versions
sid
))]
;; assignment-version綁定zookeeper上"/assignments/{sid}"節點數據及其版本號,並註冊回調函數
(
if-let
[
assignment-version (
.assignment-version
storm-cluster-state
sid
callback
)]
;; 若是緩存的分配版本號和zookeeper上獲取的分配版本號相等,則返回sid->緩存的分配信息的map,不然從zookeeper的"/assignments/{sid}"節點從新獲取帶有版本號的分配信息,並註冊回調函數,這樣supervisor就能感知某個已存在的assignment是否被從新分配
(
if (
=
assignment-version
recorded-version)
{
sid (
get
assignment-versions
sid
)}
{
sid (
.assignment-info-with-version
storm-cluster-state
sid
callback
)})
;; 若是從zookeeper上獲取分配信息失敗,值爲{sid nil}
{
sid
nil
})))
;; 將dofor結果進行合併,形如:{sid_1 {:data data_1 :version version_1}, sid_2 {:data data_2 :version version_2},......sid_n {:data data_n :version version_n} }
(
apply
merge)
;; 保留值不空的鍵值對
(
filter-val
not-nil?
))]
;; 返回的map形如:{:assignments {sid_1 data_1, sid_2 data_2, ...... , sid_n data_n}, :versions {sid_1 {:data data_1 :version version_1}, sid_2 {:data data_2 :version version_2},......sid_n {:data data_n :version version_n} } }
;; data_x是一個AssignmentInfo對象,AssignmentInfo對象包含對應的nimbus上的代碼目錄,全部task的啓動時間,每一個task與機器、端口的映射
{
:assignments (
into
{} (
for
[[
k
v
]
new-assignments
]
[
k (
:data
v
)]))
:versions
new-assignments
})))
read-assignments函數定義以下:
read-assignments函數
(
defn-
read-assignments
"Returns map from port to struct containing :storm-id and :executors"
;; assignments-snapshot綁定topology-id->分配信息AssignmentInfo對象的map,assignment-id綁定supervisor-id
[
assignments-snapshot
assignment-id
]
;; 遍歷read-my-executors函數返回結果,檢查是否存在多個topology分配到同一個端口,若是存在則拋出異常。檢查的方式特別巧妙,經過對返回結果調用merge-with函數,若是返回結果中存在相同的port,那麼就會調用
;; 匿名函數(fn [& ignored] ......),這樣就會拋出異常
(
->> (
dofor
[
sid (
keys
assignments-snapshot
)] (
read-my-executors
assignments-snapshot
sid
assignment-id))
(
apply merge-with (
fn
[
&
ignored
] (
throw-runtime
"Should not have multiple topologies assigned to one port")))))
read-my-executor函數定義以下:
read-my-executor函數
;; assignments-snapshot綁定topology-id->分配信息AssignmentInfo對象的map,assignment-id綁定supervisor-id,storm-id爲topoloy-id
(
defn-
read-my-executors
[
assignments-snapshot
storm-id
assignment-id
]
(
let
[
assignment (
get
assignments-snapshot
storm-id)
;; my-executors綁定分配給該supervisor的executor信息,即executor->node+port的map
my-executors (
filter (
fn
[[
_
[
node
_
]]] (
= node
assignment-id))
(
:executor->node+port
assignment))
;; port-executors綁定port->executor-id集合的map,merge-with函數的做用就是對key相同的value調用concat函數
port-executors (
apply
merge-with
concat
(
for
[[
executor
[
_
port
]]
my-executors
]
{
port
[
executor
]}
))]
;; 返回port->LocalAssignment對象的map,LocalAssignment包含兩個屬性:topology-id和executor-id集合
(
into
{} (
for
[[
port
executors
]
port-executors
]
;; need to cast to int b/c it might be a long (due to how yaml parses things)
;; doall is to avoid serialization/deserialization problems with lazy seqs
[(
Integer.
port) (
LocalAssignment.
storm-id (
doall
executors
))]
))))
download-storm-code函數定義以下:
download-storm-code函數
(
defmulti
download-storm-code
cluster-mode)
download-storm-code
函數是一個
"多重函數"
,根據
cluster-mode
函數的返回值決定調用哪一個函數,
cluster-mode
函數可能返回關鍵字
:distributed
和
:local
,若是返回
:distributed
,那麼會調用下面這個函數。
(
defmethod
download-storm-code
;; master-code-dir綁定storm-id的代碼jar包在nimbus服務器上的路徑
:distributed
[
conf
storm-id
master-code-dir
]
;; Downloading to permanent location is atomic
;; tmproot綁定supervisor本地路徑"{storm.local.dir}/supervisor/tmp/{uuid}",臨時存放從nimbus上下載的代碼jar包
(
let
[
tmproot (
str (
supervisor-tmp-dir
conf)
file-path-separator (
uuid))
;; stormroot綁定該storm-id的代碼jar包在supervisor上的路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}"
stormroot (
supervisor-stormdist-root
conf
storm-id
)]
;; 建立臨時目錄tmproot
(
FileUtils/forceMkdir (
File.
tmproot))
;; 將nimbus服務器上的"{storm.local.dir}/nimbus/stormdist/{storm-id}/stormjar.jar"文件下載到supervisor服務器的tmproot目錄中,stormjar.jar包含這個topology全部代碼
(
Utils/downloadFromMaster
conf (
master-stormjar-path
master-code-dir) (
supervisor-stormjar-path
tmproot))
;; 將nimbus服務器上的"{storm.local.dir}/nimbus/stormdist/{storm-id}/stormcode.ser"文件下載到supervisor服務器的tmproot目錄中,stormcode.ser是這個topology對象的序列化
(
Utils/downloadFromMaster
conf (
master-stormcode-path
master-code-dir) (
supervisor-stormcode-path
tmproot))
;; 將nimbus服務器上的"{storm.local.dir}/nimbus/stormdist/{storm-id}/stormconf.ser"文件下載到supervisor服務器的tmproot目錄中,stormconf.ser包含運行這個topology的配置
(
Utils/downloadFromMaster
conf (
master-stormconf-path
master-code-dir) (
supervisor-stormconf-path
tmproot))
;; RESOURCES-SUBDIR值爲字符串"resources",extract-dir-from-jar函數主要做用就是將jar包解壓,而後將jar包中路徑以"resources"開頭的文件解壓到"{tmproot}/resources/......"目錄
(
extract-dir-from-jar (
supervisor-stormjar-path
tmproot)
RESOURCES-SUBDIR
tmproot)
;; 將臨時目錄tmproot中的文件剪切到stormroot目錄中,這樣"{storm.local.dir}/nimbus/stormdist/{storm-id}/"目錄中將包括resources目錄,stormjar.jar文件,stormcode.ser文件,stormconf.ser文件
(
FileUtils/moveDirectory (
File.
tmproot) (
File.
stormroot))
))
extract-dir-from-jar函數定義以下:
extract-dir-from-jar函數
;; jarpath標識jar路徑,dir標識"resources",destdir標識"{tmproot}"路徑
(
defn
extract-dir-from-jar
[
jarpath
dir
destdir
]
(
try-cause
;; 使用類ZipFile來解壓jar包,jarpath綁定ZipFile對象
(
with-open
[
jarpath (
ZipFile.
jarpath
)]
;; 調用entries方法,返回一個枚舉對象,而後調用enumeration-seq函數獲取文件的ZIP條目對象
(
let
[
entries (
enumeration-seq (
.entries
jarpath
))]
;; 遍歷entries中路徑以"resources"開頭的文件
(
doseq
[
file (
filter (
fn
[
entry
](
and (
not (
.isDirectory
entry)) (
.startsWith (
.getName
entry)
dir)))
entries
)]
;; 在"tmproot"目錄中建立文件的完整父路徑
(
.mkdirs (
.getParentFile (
File.
destdir (
.getName
file))))
;; 將文件複製到"{tmproot}/{在壓縮文件中的路徑}"
(
with-open
[
out (
FileOutputStream. (
File.
destdir (
.getName
file
)))]
(
io/copy (
.getInputStream
jarpath
file)
out)))))
(
catch
IOException
e
(
log-message
"Could not extract "
dir
" from "
jarpath))))
以上就是storm啓動supervisor的完整流程,啓動supervisor的工做主要是在mk-supervisor函數中進行的,因此閱讀該部分源碼時,要首先從該函數入手,而後依次分析在該函數中所調用的其餘函數,根據函數的控制流程分析每一個函數。