storm入門分析

1.集羣模式提交Topology流程分析

1)提交代碼到nimbus

storm jar . . .首先這個命令會把jar文件上傳到nimnus所在的機器上。也就是說,不管這個命令是在主節點nimbus上執行的,或者是在其餘的supervisor節點上執行的,都是首先上傳到nimbus那臺機上上。咱們能夠經過$STORM_HOME/logs/nimbus.log日誌文件中的內容來查看。併發

2016-01-27 23:43:14 b.s.d.nimbus [INFO] Uploading file from client to /data/storm/nimbus/inbox/stormjar-20e3fcfc-6233-4b71-9ebb-1fe4963a7276.jar
2016-01-27 23:43:14 b.s.d.nimbus [INFO] Finished uploading file from client: /data/storm/nimbus/inbox/stormjar-20e3fcfc-6233-4b71-9ebb-1fe4963a7276.jar

實際上意思就是將咱們的wordcountapp的jar文件wordcountapp-0.0.1-SNAPSHOT-jar-with-dependencies.jar上傳到了/data/storm/nimbus/inbox/目錄下,而且將jar文件的名稱改爲了stormjar-20e3fcfc-6233-4b71-9ebb-1fe4963a7276.jarapp

咱們安裝storm時配置,storm運行時存儲數據目錄ui

storm.local.dir: "/data/storm"

2)nimbus分配執行線程

當jar文件上傳完成以後,storm首先會檢查這個Topology的配置信息,例如須要幾個worker來運行。而後storm會查詢集羣中可用slot(等價於worker)的數量。由於Topology須要運行在worker上,並且一個worker只能運行一個Topology,因此Storm必須先要查詢那些worker事空閒的,以便將Topology分配到這些空閒的worker上。這段分析咱們還能夠經過分析nimbus.log的日誌文件進行查看。this

#檢查Topology的配置信息
2016-01-27 23:43:14 b.s.d.nimbus [INFO] Received topology submission for wordcountapp with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register"
nil, "topology.kryo.decorators" (), "topology.name" "wordcountapp", "storm.id" "wordcountapp-1-1453909394", "topology.debug" false, "fileName" "words.txt"}
2016-01-27 23:43:14 b.s.d.nimbus [INFO] Activating wordcountapp: wordcountapp-1-1453909394
#檢查集羣中可用的Slots,等價於worker
2016-01-27 23:43:14 b.s.s.EvenScheduler [INFO] Available slots: (["c612a070-af7d-4335-b034-08ae33269f3a" 6703] ["c612a070-af7d-4335-b034-08ae33269f3a" 6702] ["c612a070-af7d-4335-b034-08ae3326
9f3a" 6701] ["c612a070-af7d-4335-b034-08ae33269f3a" 6700])

3)執行線程下載代碼

分配到任務的supervisor從nimbus上下載Topology的jar文件,由於Topology的jar文件還在nimbus所在的機器上,因此supervisor必須從nimbus上來下載這些jar文件到本地,而後才能運行。咱們能夠看一下supervisor.log spa

#supervisor從nimbus機器上下載Topology的jar文件
2016-01-27 23:43:14 b.s.d.supervisor [INFO] Downloading code for storm id wordcountapp-1-1453909394 from /data/storm/nimbus/stormdist/wordcountapp-1-1453909394
#將下載的文件存儲到/data/storm/nimbus/stormdist/wordcountapp-1-1453909394
2016-01-27 23:43:14 b.s.d.supervisor [INFO] Finished downloading code for storm id wordcountapp-1-1453909394 from /data/storm/nimbus/stormdist/wordcountapp-1-1453909394
#啓動Topology
2016-01-27 23:43:14 b.s.d.supervisor [INFO] Launching worker with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id "wordcountapp-1-1453909394", :executors ([3 3] [4 4] [
2 2] [1 1])} for this supervisor c612a070-af7d-4335-b034-08ae33269f3a on port 6703 with id c1cb4fc0-1906-48bf-be5e-e1b29229c89a........省略部分日誌........./data/storm/supervisor/stormdist/wordcountapp-1-1453909394/stormjar.jar' 'backtype.storm.daemon.worker' 'wordcountapp-1-1453909394' 'c612a070-af7d-4335-b034-08ae33269f3a' '6703' 'c1cb4fc0-1906-48bf-be5e-e1b29229c89a'

2.storm的grouping策略和併發度

1)拓撲運行在單個worker上

咱們瞭解到storm集羣遵循通常集羣的master/slaver結構,nimbus負責調度監控,supervisor負責任務執行,每一個supervisor表明一個主機,在這個主機上能夠有多個worker(默認4個),而每一個worker中有能夠有多個線程(Executer)來運行。Executer就是Topology運行的最小單元。線程

TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader", new WordReader(),3);
		builder.setBolt("word-normalizer", new WordNormalizer(),3).shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-normalizer", new Fields("word"));
		StormTopology topology = builder.createTopology();

Storm默認就會給這個Topology分配1個Worker,在這個Worker啓動7個線程,3個用來運行WordReader,3個線程用來運行WordNormalizer,1個線程用來運行WordCounter。debug

2)拓撲運行在多個worker上

一個Topology也能夠運行在多個worker上,下圖例子是有一個拓撲運行在兩個worker上,其中一個worker上面運行2個WordReader,2個WordNormalizer。而另外一個worker上面運行2個WordReader和1個WordNormalizer以及1個WordCounter。日誌

3.Grouping策略介紹

1)隨機分組

Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple, 保證每一個bolt接收到的tuple數目相同。輪詢,平均分配。code

2)按字段分組

 Fields Grouping:按字段分組, 好比按userid來分組, 具備一樣userid的tuple會被分到相同的Bolts, 而不一樣的userid則會被分配到不一樣的Bolts。orm

3)廣播分組

 All Grouping: 廣播發送, 對於每個tuple, 全部的Bolts都會收到。

4)全局分組

Global Grouping: 全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。

5)不分組

 Non Grouping: 不分組, 這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是同樣的效果,不平均分配。

6)直接分組

 Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者舉鼎由消息接收者的哪一個task處理這個消息。 只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。並且這種消息tuple必須使用emitDirect方法來發射。消息處理者能夠經過TopologyContext來或者處理它的消息的taskid(OutputCollector.emit方法也會返回taskid) 

4.消息可靠性

Storm 可以保證每個由 Spout 發送的消息都可以獲得完整地處理。

1)消息完整性處理

一個從 spout 中發送出的 tuple 會產生上千個基於它建立的 tuples。若是當該tuple產生的上千個tuples都處理完成時,稱該spout獲得完整性驗證。

好比咱們計算單詞個數的例子中,咱們從文件中讀取一行字符串就會產生一個tuple,而後該字符串會被髮送出去進而解析成多個單詞的tuple,而後每一個單詞的tuple又會產生一個計數的tuple。只有當全部單詞tuple和計數tuple都處理完成以後,該從文件中讀取的字符串的tuple纔算完整性處理。

咱們能夠根據storm中tuple的生命週期來幫助咱們理解:

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,經過調用 Spout 的 nextTuple 方法,Storm 向 Spout 請求一個 tuple。Spout 會使用 open 方法中提供的SpoutOutputCollector 向它的一個輸出數據流中發送一個 tuple。在發送 tuple 的時候,Spout 會提供一個 「消息 id」,這個 id 會在後續過程當中用於識別 tuple。

消息從spout中發送出去時調用this.collector.emit(new Values(str),msgId);方法。

這裏在消息向下傳遞的過程當中通過bolt時須要調用:

//bolt中的execute(Tuple input)處理代碼
collector.emit(input,new Values(word));
//而不是
collector.emit(new Values(word));

也就是說若是要把bolt中tuple算做spout的完整性,就須要把tuple傳遞下去,而後在該bolt中調用方法:

//bolt中的execute(Tuple input)處理代碼
collector.ack(input);

當全部由該發送出去的tuple產生的子孫tuples被處理完成且都調用了collector.ack(input)方法就會回調spout中 的ack(Object msgId) 方法證實該spout發出去的tuple被處理成功。

當有其中bolt處理消息時調用了collector.fail(input)方法,則代表子tuple中有沒有被成功處理的消息存在,就會致使spout中調用fail(Object msgId)方法。

注意:若是在向下傳遞的過程當中bolt沒有傳遞tuple,也就是沒有調用emit(input,new Values(word))則沒法驗證消息「完整性處理」。若是在向下傳遞的過程當中沒有調用collector.ack(input)或者collector.fail(input)也沒法進行消息的「完整性處理」。

相關文章
相關標籤/搜索