參考連接:https://blog.csdn.net/u013332124/article/details/79682782html
Storm 是一個分佈式的,可靠的,容錯的數據流處理系統。下面我將分別從storm的總體架構以及部分原理進行講解。java
storm中服務器節點分爲主節點和從節點,Nimbus爲主節點和Supervisor爲從節點。以及若干組件構成。下面爲對一些術語進行簡單的介紹:
Nimbus:主節點,是一個調度中心,負責分發任務
Supervisor:從節點,任務執行的地方
Worker:任務工做進程,一個Supervisor中能夠有多個Worker。
Executor:Worker進程在執行任務時,會啓動多個Executor線程
Topology:任務的抽象概念。因爲storm是流式計算的框架,它的數據流和拓撲圖很像,因此它的任務就叫topology。
Spout:從數據源獲取數據並進行分發。
Bolt:獲得Spout或者上一個Bolt的數據,而後進行處理後交給下一個Bolt處理。
Tuple:在storm中,一條數據能夠理解爲是一個Tuple。apache
Nimbus是調度中心,Supervisor是任務執行的地方。Supervisor上面有若干個Worker,每一個Worker都有本身的端口,Worker能夠理解爲一個進程。另外,每一個Worker中還能夠運行若干個線程。服務器
當客戶端向storm集羣提交一個Topology時,這裏的提交就是在集羣上經過命令storm jar xxx
啓動topology。若是咱們是在Supervisor節點上執行storm jar xxx
,那麼Supervisor會將jar包拷貝到Nimbus,以後Nimbus對Topology進行調度。架構
Nimbus會根據Topology所須要的Worker進行分配,將其分配到各個Supervisor的節點上執行。併發
如今假設咱們咱們有4個Supervisor節點,每一個Supervisor都配置4個Worker。這是咱們提交了一個Topology,須要4個Worker,那可能的分配狀況可能以下圖所示:
框架
啓動完Topology後,相關組件就開始運行起來了。在Storm中,Spout組件主要用來從數據源拉取數據,造成一個Tuple後轉交給Bolt處理。Bolt接受到Tuple處理完後,能夠選擇繼續交給下一個Bolt處理,也能夠選擇不往下傳。這樣數據以Tuple的形式一個接一個的往下執行,就造成了一個拓撲數據流。分佈式
storm數據在組件間的流向以下圖所示:
.net
在Storm中,Worker不是組件執行的最小單位。Executor纔是,Executor能夠理解爲是一個線程。咱們在建立topology的時候,能夠設置執行spout的線程數和bolt的線程數。線程
假設spout和bolt的線程數加起來設置了8個,而後設置了2個worker,那麼這8個線程可能就會隨機分配到2個worker中,可能一個worker3個,一個worker5個。也有可能各自分配4個。以下圖所示:
在實際應用中,Bolt組件的實例可能有多個,Tuple在流向Bolt時,選擇哪一個Bolt實例的策略就是grouping策略。
下面是Storm中的6種Grouping策略:
1. Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple, 保證每一個bolt接收到的tuple數目相同。輪詢,平均分配。
2. Fields Grouping:按字段分組, 好比按userid來分組, 具備一樣userid的tuple會被分到相同的Bolts, 而不一樣的userid則會被分配到不一樣的Bolts。
3. All Grouping: 廣播發送, 對於每個tuple, 全部的Bolts都會收到。
4. Global Grouping: 全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
5. Non Grouping: 不分組, 這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是同樣的效果,不平均分配。
6. Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者舉鼎由消息接收者的哪一個task處理這個消息。 只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。並且這種消息tuple必須使用emitDirect方法來發射。消息處理者能夠經過TopologyContext來或者處理它的消息的taskid(OutputCollector.emit方法也會返回taskid)
一條數據在Spout中造成一個Tuple,而後交給一個個Bolt執行,那咱們怎麼保證這個Tuple被完整的執行了呢?這裏的完整執行說的是這個Tuple必須在後面的每個Bolt都成功處理,假設在一個Bolt中發生異常致使失敗,這就不能算完整處理。
爲了保證消息處理過程當中的可靠性,storm使用了ack機制。storm會專門啓動若干acker線程,來追蹤tuple的處理過程。acker線程數量能夠設置。
每個Tuple在Spout中生成的時候,都會分配到一個64位的messageId。經過對messageId進行哈希咱們能夠執行要對哪一個acker線程發送消息來通知它監聽這個Tuple。
acker線程收到消息後,會將發出消息的Spout和那個messageId綁定起來。而後開始跟蹤該tuple的處理流程。若是這個tuple所有都處理完,那麼acker線程就會調用發起這個tuple的那個spout實例的ack()方法。若是超過必定時間這個tuple還沒處理完,那麼acker線程就會調用對應spout的fail()方法,通知spout消息處理失敗。spout組件就能夠從新發送這個tuple。
從上面的介紹咱們知道了,tuple數據的流向會造成一個拓撲圖,也能夠理解成是一個tuple樹。這個拓撲圖的節點可能會有不少個,若是要把這些節點所有保存起來,處理大量的數據時勢必會形成內存溢出。
對於這個難題,storm使用了一種很是巧妙的方法,使用20個字節就能夠追蹤一個tuple是否被完整的執行。這也是storm的一個突破性的技術。
咱們都知道,本身異或本身,結果確定爲零( a ^ a = 0)。ack中就利用這個特性
舉個例子,好比發射了某個tuple,就 ack-val ^ tupleId,而後ack了某個tuple,就再ack-val ^ tupleId,這樣,ack-val 最終又變成了0,說明tuple已經所有處理成功了。
使用ack機制保證數據處理的高可用
Supervisor會自動重啓worker線程。
能夠在其餘節點重啓該supervisor任務。
在storm1.0以前,Nimbus是不支持HA的。Nimbus若是掛了,重啓Nimbus進程就能夠了,不會影響到現有topology的運行。
由於Nimbus只是一個調度中心,Nimbus和Supervisor的狀態都保存在本地文件和ZooKeeper,所以他們進程能夠隨便殺死,而後重啓,不會影響到Worker進程的運行。
另外,Nimbus的做用在就是在拓撲任務開始階段,負責將任務提交到集羣,後期負責拓撲任務的管理,好比任務查看,終止等操做。在一般狀況下,nimbus的任務壓力並不會很大,在天然狀況下不會出現宕機的狀況。
storm1.0後Nimbus的HA策略尚未具體研究過,有興趣的小夥伴可自行前往官網查看文檔。http://storm.apache.org/releases/1.2.1/nimbus-ha-design.html
Storm的架構及原理總體理解起來不算很難,但不少細節仍是須要在實踐中才能發現。有興趣的小夥伴能夠去讀讀storm的源碼,storm源碼大多數都是用Clojure實現,對Clojure語言不熟悉的朋友能夠去看一下JStorm的源碼實現。這是阿里基於Storm用java實現的框架,聽說更加穩定高效。