Storm源碼閱讀總結(1) -- Client Nimbus Supervisor

Client

客戶端提交做業

NimbusClient: RPC 客戶端, 向RPC服務端即Nimbus Server發起RPC調用.node

App經過StormSubmitter提交計算拓撲做業submitTopology:
首先提交jar包, 會向Nimbus服務器發起beginFileUpload, 申請到要上傳的路徑後, 調用uploadChunk開始將jar包上傳到nimbus服務器
文件傳輸完畢, 調用finishFileUpload結束上傳jar包過程. 最後會調用NimbusClient的submitTopology算法

Nimbus啓動初始化

Nimbus啓動時, 建立匿名的INimbus內部類.
初始時會根據系統中的supervisors建立可用的WorkSlot空閒槽位. --> allSlotsAvailableForScheduling服務器

在一個機器上一個supervisor能夠配置多個端口, 每一個端口都會對應一個工做者線程, 即建立對應數量的WorkSlot.
Nimbus要分配任務給Supervisor上的Worker進行工做, 而每一個Supervisor會有多個worker.架構

Thrift RPC步驟

編寫thrift文件
自動生成接口相關的類, 好比Iface, Processor, Client內部類
自定義Handler實現Iface業務邏輯, 對應thrift文件中的方法
服務端使用ServerTransport和Handler建立的Processor建立出Server並啓動服務器
客戶端使用Transport鏈接服務端, 使用Protocol協議構造出Client代理類, 而後調用接口的方法, 完成RPC調用負載均衡

Nimbus

Thrift RPC Server

啓動Nimbus會啓動它的thrift服務. Nimbus會做爲Thrift RPC協議的服務端, 處理客戶端發起的RPC調用請求.
nimbus的thrift服務的實現類定義在storm.thrift文件中, 對應的實現方法是service-handler[服務處理器].dom

Nimbus與ZooKeeper

Nimbus啓動時會建立調度器, 它爲集羣中的須要調度的計算拓撲分配任務. 新的任務能夠經過cluster.getAssignments()獲取.異步

nimbus在啓動時會鏈接ZooKeeper, 並在zookeeper中建立節點
以便在做業運行過程當中將Storm的全部的狀態信息都是保存在Zookeeper裏面.分佈式

nimbus經過在zookeeper上面寫狀態信息來分配任務,
supervisor,task經過從zookeeper中讀狀態來領取任務,
同時supervisor, task也會定義發送心跳信息到zookeeper,
使得nimbus能夠監控整個storm集羣的狀態, 從而能夠重啓一些掛掉的task函數

注意Nimbus和Supervisor之間沒有直接交互, 狀態都是保存在Zookeeper上ui

問題1: storm的jar包是上傳到zookeeper上,仍是nimbus, supervisor上?
答: jar包上傳到nimbus上, 在zookeeper的assignments中只是記錄了topology做業在nimbus上的代碼目錄.

watcher和callback

storm使用curator framework建立districtbuted-cluster-state[分佈式的集羣狀態].
建立客戶端時指定watcher函數, 當節點狀態發生改變時, 會觸發curator上註冊的監聽器, 回調watcher方法.

註冊的watcher使用callbacks. ClusterState自定義的register實現會將回調實現傳入, 最終觸發回調的調用.
callback和watcher都是function, 都經過回調的方式.

Storm的集羣狀態StormClusterState包括任務的分配assignments, supervisors, workers, hearbeat, errors等信息

Topology執行流程

定義在nimbus的service-handler的submitTopology裏, 步驟包括:
上傳topology代碼
運行topology前的校驗
在nimbus上創建topology的本地目錄: setup-storm-code
創建Zookeeper heartbeats: setup-heartbeats!
啓動storm: start-storm, 在zk上寫入StormBase信息,會記錄component和任務並行度的關係. 用於後面任務的分配.
分配任務: mk-assignments

Topology Supervisor Worker Executor Task
1. topology 物理上由一個nimbus機器和多臺supervisor機器組成

  1. supervisor 能夠配置多個slots. 每一個slots佔用一個端口, 配置在storm.yaml的supervisor.slots.ports
    一個worker對應supervisor的一個端口. 所以一個supervisor配置了幾個slots, 就對應有幾個worker.

  2. topology 邏輯上由 spout 和 bolt 組成. spout和bolt統稱爲component
    spout和bolt均可以配置並行度, 這個並行度的數量就是component的executors的數量.

component若是沒有設置task的數量, 默認一個executor運行一個task.
若是設置了task的數量(task數量要大於並行度的值), 則一個executor能夠運行多個task.
固然由於executor是針對具體component的(即指定的spout或bolt), 因此executor裏運行的多個task都是同一種component.

  1. executors 是topology中全部component每一個task起始和結束編號的序列: ([start-task end-task]), 可是沒有對應的component信息.
    executors->component 是一個Map: {[start-task end-task] component-id}, 給上面的executors添加了component-id信息.
    topology->executors 也是一個Map: {storm-id [start-task end-task]}, 把executors做爲topology-id的value

假設builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(10);
則表示RandomSentenceSpout這個component有5個executors, 設置了10個任務tasks.
那麼這個component的executors = ([1 2] [3 4] [5 6] [7 8] [9 10]).
對應的executors->component = {[1 2] "spout" [3 4] "spout" [5 6] "spout" [7 8] "spout" [9 10] "spout"}
注意: executors和executors->component的數據應該是包括了topology全部的component!
這裏由於只列出一個component,因此只給出部分數據.

  1. executor是屬於邏輯上的, 而任務是要運行在物理機器上的. 因此就涉及到邏輯上的executor運行在物理上的supervisor的問題.
    而咱們知道supervisor經過slots端口配置能夠分紅多個worker. worker由node-id+port組成. node-id實際上就是supervisor-id.
    所以一個worker能夠運行多個executor. 每一個executor是運行在node-id + port上的.
    executor->node+port 的結構是: {executor [node port]}. 表示每一個executor運行在哪一個supervisor的哪一個端口上.
    經過這種方式能夠將同一個component的多個executor分佈在多個機器上執行
    topology->executor->node+port 的結構: {topology-id -> {executor [node port]}}

任務的分配

產生executor->node+port關係, 將executor分配到哪一個node的哪一個slot上

mk-assignments的第一步的流程: 準備階段讀取全部active topology的信息
1. storm-cluster-state.active-storms 獲取集羣中全部活動的topology-id. 對每一個topology-id作以下處理:
2. read-topology-details 根據topology-id讀取每一個topology的詳細信息, 並組合成{topology-id TopologyDetails}
2.1 storm-cluster-state.storm-base 讀取/storm/storms/topology-id節點的數據爲StormBase對象,得到保存在其中的numWorkers
2.2 讀取storm-conf和storm-topology造成topology-conf和topology對象
2.3 executor->component 的映射關係: {ExecutorDetails component-id}
2.3.1 compute-executor->component 返回: {[start-task end-task] component-id}
2.3.1.1 component->executors 是component-id和executors並行度的數量的映射
2.3.1.2 storm-task-info 給每一個executor添加編號
2.3.1.3 compute-executors 列出全部executor的[start-task end-task]序列
2.3.1.4 通過join等一些列操做, 造成最後的{[start-task end-task] component-id}
2.3.2 將上面的[start-task end-task]封裝成ExecutorDetails, 造成最終的{ExecutorDetails component-id}
2.4 將上面的storm-id, topology-conf, topology, numWorkers, executor->component構形成TopologyDetails
3. 將全部的{topology-id TopologyDetails} 封裝成Topologies.

分配新任務

計算拓撲和compoent的並行度: topology->executors和topology->alive-executors
找出Supervisor中dead的WorkerSlot: supervisor->dead-ports, 用於獲得Supervisor的信息SupervisorDetail
爲alive-executors生成SchedulerAssignment: topology->scheduler-assignment
用supervisors信息和topology->scheduler-assignment會生成Cluster
Nimbus的調度器開始調度拓撲做業, 使用集羣信息topologies和cluster開始調度做業
分配的任務會寫到zk的assignments節點中, 下次調度時會根據已經分配的任務找出須要新分配的任務

計算拓撲和集羣信息

Topologies包含當前集羣裏面運行的全部Topology的信息:StormTopology對象,配置信息,
以及從task到組件(bolt, spout)id的映射信息(executor->component-id)。

Cluster對象則包含了當前集羣的全部狀態信息:全部的supervisor信息,
系統全部Topology的task分配信息(topology->scheduler-assignment, executor->slot),

任務調度

nimbus的任務分配算法:
在slot充沛的狀況下,可以保證全部topology的task被均勻的分配到整個機器的全部機器上
在slot不足的狀況下,它會把topology的全部的task分配到僅有的slot上去,這時候其實不是理想狀態,因此。。
在nimbus發現有多餘slot的時候,它會從新分配topology的task分配到空餘的slot上去以達到理想狀態。
在沒有slot的時候,它什麼也不作

Nimbus的邊城世界

nimbus中最重要的方法是mk-assignments分配任務. 它的調用有幾個地方:

service-handler的定時線程schedule-recurring會定時調用.
service-handler的匿名內部類Nimbus$Iface的submitTopology第一次提交拓撲做業時
do-rebalance進行負載均衡時, 是由state-transitions來斷定nimbus的狀態達到rebalance狀態時調用的.

第一次提交的topology做業, 就會馬上調用mk-assignments由nimbus分配任務.
可是做業一提交併不必定可以分配到任務. 好比集羣的計算資源很是緊張沒有可用slot的狀況下.
所以須要一個定時器定時調用mk-assignments方法, 在計算資源可用的狀況下爲沒有分配的topology分配任務.
也有一種多是初次提交做業後, nimbus只爲這個topology分配了一部分任務, 所以在下一輪迴時還要繼續爲這個做業分配任務.
這種狀況常見於: 一個很長的topology流計算做業, 前面的blot任務還沒完成時, 若是爲後面的bolt分配任務,
後面的blot任務就一直佔用這計算資源, 與其這樣, 還不如集中精力把計算資源都分配給前面的blot task.
或者是集羣的資源比較緊張, 只夠分配topology的一部分任務, 剩餘的任務因爲沒有可用的空閒槽位須要等到下次才能申請到資源.

mk-assignments中最重要的方法當屬compute-new-topology->executor->node+port
計算新的topology到executor, 再到executor所屬的supervisor node+port的映射關係.

用戶編寫的topology做業是由自定義的spout和blot組成, 它們統稱爲component.
能夠爲component定義並行度, 這就是topology的executors. executors其實是自定義任務的封裝.
集羣的supervisor可配置多個port, 對應的是集羣中的計算資源WorkerSlot.
任務是要運行在WorkerSlot上的. 因此分配任務就是要將executor分配到指定的WorkerSlot上.

因爲上面集中做業的任務分配的不肯定性, 爲topology分配完任務後, 須要記錄已經分配了哪些任務,
這樣下次的任務分配就不會爲已經分配的任務再次申請計算資源了. nimbus將任務的分配寫入到zk的assignments中.

任務分配後要進行調度纔算真正的執行. 調度器的工做調用cluster.assign傳入
做業topology, 計算資源worker-slot和表示任務的executors.
經過topology獲得做業的SchedulerAssignment, 將slot和executors傳給SchedulerAssignment的assign.
雖然SchedulerAssignmentImpl只是在內存中記錄了ExecutorDetails和WorkerSlot的映射關係.
可是這對於任務的運行而言已經足夠了, 由於任務編號記錄在ExecutorDetails中, 任務執行的節點記錄在WorkerSlot裏.
接下來任務就能夠真正跑在集羣的計算資源裏了.

Supervisor

物理上的Supervisor配置了多個端口, 對於集羣而言工做節點就是計算資源. 一個端口是一個WorkerSlot.
Nimbus負責任務的調度分配, 將任務分配信息Assignment寫到ZooKeeper中. Supervisor會負責讀取ZK中的任務分配信息.

mk-synchronize-supervisor和synchronize-processes是zk的assignment節點發生變化觸發執行的回調函數.
當有任務寫到ZK中, Supervisor會調用回調函數同步topology代碼到本地,
因爲Supervisor負責啓動Worker進程,也會同步Worker信息.
同步topology和worker信息的事件會放到一個隊列線程裏異步地執行.

同步supervisor會從nimbus下載topology代碼到本地目錄, 並持久化到LocalState狀態信息裏.
同步worker會找出新分配的worker-id, 負責啓動Worker進程.

Nimbus是集羣的總管, 只有一臺. 而Supervisor監督者有多個, 對於Master-Slave的架構, Slave要按期發送心跳信息給Master. 一個監督者也有多個Worker, 因此Worker也要發送心跳信息給Supervisor. Supervisor和Worker的心跳信息都保存在ZK節點上. 可是注意supervisor從nimbus下載的topology信息和已經處理完成的worker信息是保存在supervisor的本地目錄中.

相關文章
相關標籤/搜索