1.storm總體架構(畫圖+描述)java
storm的數據流程:web
storm分佈式計算結構稱爲topology(拓撲),由stream(數據流)、spout(數據流的生成者)、bolt(運算)組成服務器
stream:網絡
storm的核心數據結構是tuple(元祖),tuple是包含一個或者多個鍵值對的列表,Stream就是由無限制的tuple組成的序列數據結構
spout:架構
spout爲一個storm topology的主要數據入口,充當採集器的角色,鏈接到數據源,將數據轉化爲一個tuple,並將tuple做爲數據負載均衡
storm爲spout提供了簡易的API,開發一個spout的主要工做就是編寫代碼從數據源或者API消費數據jvm
數據源的種類:分佈式
web或者移動程序的點擊數據ide
應用程序的日誌數據
傳感器的輸出
bolt:
bolt能夠理解爲計算程序的運算,將一個或者多個數據流做爲輸入,對數據進行實施運算後,
Storm定義了八種內置數據流分組的定義:
① 隨機分組(Shuffle grouping):這種方式下元組會被儘量隨機地分配到 bolt 的不一樣任務(tasks)中,使得每一個任務所處理元組數量可以可以保持基本一致,以確保集羣的負載均衡。
② 按字段分組(Fields grouping):這種方式下數據流根據定義的「字段」來進行分組。例如,若是某個數據流是基於一個名爲「user-id」的字段進行分組的,那麼全部包含相同的「user-id」的元組都會被分配到同一個task中,這樣就能夠確保消息處理的一致性。
③ 徹底分組(All grouping):將全部的tuple複製後分發給全部的bolt task。每一個訂閱的task都會接收到tuple的拷貝,全部在使用此分組時需當心使用。
④ 全局分組(Global grouping):這種方式下全部的數據流都會被髮送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。
⑤不分組(None grouping):使用這種方式說明你不關心數據流如何分組。目前這種方式的結果與隨機分組徹底等效,不過將來 Storm 社區可能會考慮經過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個線程中執行
⑥ 指向型分組(Direct grouping):數據源會調用emitDirect()方法來判斷一個tuple應該由那個Storm組件來接收。只能在聲明瞭是指向型的數據流上使用。
這個是物理流程圖
Storm集羣由一個主節點(nimbus)和一個或者多個工做節點(Supervisor)組成
nimbus:Storm的主節點,相似於hadoop中的Jobtracker,管理,協調和監控在集羣上運行的topology。包括topology的發佈,事務處理失敗時從新指派任務。
Supervisor:等待nimbus分配任務後生成並監控worker(jvm進程)執行任務
zookeeper:storm主要使用zookeeper來協調集羣中的狀態信息,
任務提交描述
1.client:提交topology
2.numbus:這個角色所作的操做相對較多,具體以下:
a.會把提交的jar包放到nimbus所在服務器的nimbus/inbox目錄下
b.submitTopology方法會負責topology的處理;包括檢查集羣是否有active節點,配置文件是否正確,是否有重複的topology名稱,各個bolt/spout名是否使用相同的id等
c.創建topology的本地目錄:nimbus/stormdist/topology-uuid
該目錄包括三個文件:
stormjar.jar --從nimbus/inbox目錄拷貝
stormcode.ser --此topology對象的序列化
stormconf.ser --此topology的配置文件序列化
d.nimbus 任務分配,根據topology中的定義,給是spout/bolt設置Task的數據,並分配對應的task-id,最後把分配好的信息寫入到zookeeper的。
e.nimbus在zookeeper上建立taskbeats目錄,要求每一個task定時向nimbus彙報
f.將分配好的任務寫入到zookeeper,此時任務提交完畢,zk上的目錄爲assignments/topology-uuid
g.將topology信息寫入到zookeeper/storms目錄
3.Supervisor
a.按期掃描zookeeper上的storm目錄,看看是否有新的任務,有就下載。
b.刪除本地不須要的topology
c.根據nimbus指定的任務信息啓動worker
4.worker
a.查看須要執行的任務,根據任務id分辨出spout/bolt任務
b.計算出所表明的spout/bolt會給那些task發送信息
c.執行spout任務或者bolt任務
Supervisor會定時從zookeeper獲取拓撲信息topologies、任務分配信息assignment及各種心跳信息,以此爲依據進行任務分配。
在Supervisor同步時,會根據新的任務分配狀況來啓動新的worker或者關閉舊的worker並進行負載均衡。
worker經過按期的更新connection信息,來獲取其應該通信的其餘worker
worker啓動時,會根據其分配到的任務啓動一個或多個execute線程,這些線程僅會處理惟一的topology
若是有新的topolog被提交到集羣,nimbus會從新分配任務,這個到後面會說到
execute線程負責處理多個spout或者多個bolts的邏輯,這寫spout或者bolts,也稱爲tasks
具體有多少個worker,多少個execute,每一個execute負責多少個task,是由配置和指定的parallelism-hint共同決定的,但這個值並不必定等於實際運行的數目
若是計算出的總的executors超過了nimbus的限制,此topology將不會獲得執行。
2.storm完成一個單詞計數功能
注意:在咱們編寫Java代碼以前必定要將storm的安裝目錄下的lib包導入到項目中,否則程序在編譯和本地運行會報錯的
1.wordcount 的數據發送端,這一端就是進行數據的收集,而後發送給bolt進行邏輯處理的
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import backtype.storm.Config; 5 import backtype.storm.LocalCluster; 6 import backtype.storm.StormSubmitter; 7 import backtype.storm.generated.AlreadyAliveException; 8 import backtype.storm.generated.InvalidTopologyException; 9 import backtype.storm.topology.TopologyBuilder; 10 public class WordCountTest { 11 12 public static void main(String[] args) { 13 14 //建立一個Topology對象 15 TopologyBuilder builder=new TopologyBuilder(); 16 //設置一個數據的輸入源 17 builder.setSpout("spout", new WordCountSpoutSource(),1); 18 //設置一個數據的邏輯處理bolt,就是對每個單詞進行計數的 19 builder.setBolt("bolt", new WordCountBoldHandle(),1).shuffleGrouping("spout"); 20 builder.setBolt("bolt1", new WordCountBoldOut(),1).shuffleGrouping("bolt"); 21 22 //設置是在本地運行仍是在集羣中運行 23 Map map=new HashMap(); 24 map.put(Config.TOPOLOGY_WORKERS, 2); 25 if(args.length>0){ 26 try { 27 //在集羣中暈運行這些topology 28 StormSubmitter.submitTopology("topology", map, builder.createTopology()); 29 } catch (AlreadyAliveException | InvalidTopologyException e) { 30 // TODO Auto-generated catch block 31 e.printStackTrace(); 32 } 33 }else { 34 //在本地運行這些topology 35 LocalCluster loadCluster=new LocalCluster(); 36 loadCluster.submitTopology("topology", map, builder.createTopology()); 37 } 38 39 40 41 42 43 } 44 }
2.第一個bolt接收到來自spout的數據,將每個單詞放到map集合中,使用containsKey方法檢查map是否這個單詞的key,若是有就進行value進行加1操做,沒有就進行put操做,將單詞作爲key,1做爲value放到map集合中,最後將每一個map發送給下一個bolt,
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import backtype.storm.task.OutputCollector; 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.IRichBolt; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Tuple; 10 import backtype.storm.tuple.Values; 11 12 public class WordCountBoldHandle implements IRichBolt{ 13 14 TopologyContext context; 15 OutputCollector collector; 16 17 Map map=new HashMap<String, Integer>(); 18 19 @Override 20 public void cleanup() { 21 // TODO Auto-generated method stub 22 23 } 24 25 @Override 26 public void execute(Tuple tuple) { 27 // TODO Auto-generated method stub 28 String word=(String)tuple.getValueByField("word"); 29 System.out.println("接收數據1"+"----"+word); 30 if(map.containsKey(word)){ 31 //這裏面是將咱們的額Integer對象拆箱成一個int類型 32 int num=(int) map.get(word)+1; 33 map.put(word, num); 34 }else { 35 map.put(word, 1); 36 } 37 38 collector.emit(new Values(map)); 39 40 41 } 42 43 @Override 44 public void prepare(Map arg0, TopologyContext context, OutputCollector collector) { 45 // TODO Auto-generated method stub 46 this.collector=collector; 47 this.context=context; 48 49 } 50 51 @Override 52 public void declareOutputFields(OutputFieldsDeclarer declarer) { 53 // TODO Auto-generated method stub 54 declarer.declare(new Fields("WordMap")); 55 56 } 57 58 @Override 59 public Map<String, Object> getComponentConfiguration() { 60 // TODO Auto-generated method stub 61 return null; 62 }
3.第二個bolt接收到來自的bolt的數據,將map裏的數據進行遍歷輸出。
1 package com.cgh.storm.wordcount; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import backtype.storm.task.OutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.IRichBolt; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.tuple.Tuple; 11 12 public class WordCountBoldOut implements IRichBolt{ 13 14 @Override 15 public void cleanup() { 16 // TODO Auto-generated method stub 17 18 } 19 20 @Override 21 public void execute(Tuple arg0) { 22 // TODO Auto-generated method stub 23 //接收數據並刪除數據 24 Map map=(Map<String, Integer>)arg0.getValueByField("WordMap"); 25 Set<String> set=map.keySet(); 26 for(String key:set){ 27 System.out.println("單詞:"+key+", 出現次數:"+map.get(key)); 28 } 29 30 } 31 32 @Override 33 public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { 34 // TODO Auto-generated method stub 35 36 } 37 38 @Override 39 public void declareOutputFields(OutputFieldsDeclarer arg0) { 40 // TODO Auto-generated method stub 41 42 } 43 44 @Override 45 public Map<String, Object> getComponentConfiguration() { 46 // TODO Auto-generated method stub 47 return null; 48 } 49 50 }
4.本地運行測試代碼:
3.storm 任務提交流程:
4.storm 的worker間的通訊
5.storm與hadoop的有哪些異同點
hadoop是磁盤級計算,進行計算時,數據在磁盤上,須要讀寫磁盤;storm是內存級計算,數據直接經過網絡導入內存。讀寫比磁盤塊n個數量級。
hadoop 的mapreduce基於hdfs ,須要切分輸入的數據,產生中間數據文件、排序、數據壓縮、多份複製等,效率較低
storm基於zeroMQ這個高性能的消息通信庫,不持久化數據。
最主要的方面:hadoop使用磁盤做爲中間交換的介質,而storm的數據是一直在內存中流轉的,二者的面向的領域也不徹底相同,一個批處理,基於任務調度的,另外一個是實時處理,基於流的,以水爲例,hadoop能夠當作做爲一桶一桶的搬,而storm是用水管,預先接好(topology),而後打開水龍頭,水就會源源不斷的流出來。