storm

1.storm總體架構(畫圖+描述)java

 storm的數據流程:web

 

 

  storm分佈式計算結構稱爲topology(拓撲),由stream(數據流)、spout(數據流的生成者)、bolt(運算)組成服務器

  stream:網絡

    storm的核心數據結構是tuple(元祖),tuple是包含一個或者多個鍵值對的列表,Stream就是由無限制的tuple組成的序列數據結構

   spout:架構

    spout爲一個storm topology的主要數據入口,充當採集器的角色,鏈接到數據源,將數據轉化爲一個tuple,並將tuple做爲數據負載均衡

    storm爲spout提供了簡易的API,開發一個spout的主要工做就是編寫代碼從數據源或者API消費數據jvm

    數據源的種類:分佈式

      web或者移動程序的點擊數據ide

      應用程序的日誌數據

      傳感器的輸出

    bolt:

      bolt能夠理解爲計算程序的運算,將一個或者多個數據流做爲輸入,對數據進行實施運算後,

 

    Storm定義了八種內置數據流分組的定義:
      ① 隨機分組(Shuffle grouping):這種方式下元組會被儘量隨機地分配到 bolt 的不一樣任務(tasks)中,使得每一個任務所處理元組數量可以可以保持基本一致,以確保集羣的負載均衡。
      ② 按字段分組(Fields grouping):這種方式下數據流根據定義的「字段」來進行分組。例如,若是某個數據流是基於一個名爲「user-id」的字段進行分組的,那麼全部包含相同的「user-id」的元組都會被分配到同一個task中,這樣就能夠確保消息處理的一致性。
      ③ 徹底分組(All grouping):將全部的tuple複製後分發給全部的bolt task。每一個訂閱的task都會接收到tuple的拷貝,全部在使用此分組時需當心使用。
      ④ 全局分組(Global grouping):這種方式下全部的數據流都會被髮送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。
      ⑤不分組(None grouping):使用這種方式說明你不關心數據流如何分組。目前這種方式的結果與隨機分組徹底等效,不過將來 Storm 社區可能會考慮經過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個線程中執行
      ⑥ 指向型分組(Direct grouping):數據源會調用emitDirect()方法來判斷一個tuple應該由那個Storm組件來接收。只能在聲明瞭是指向型的數據流上使用。

 

這個是物理流程圖 

  Storm集羣由一個主節點(nimbus)和一個或者多個工做節點(Supervisor)組成

  nimbus:Storm的主節點,相似於hadoop中的Jobtracker,管理,協調和監控在集羣上運行的topology。包括topology的發佈,事務處理失敗時從新指派任務。

  Supervisor:等待nimbus分配任務後生成並監控worker(jvm進程)執行任務

  zookeeper:storm主要使用zookeeper來協調集羣中的狀態信息,

  任務提交描述

    1.client:提交topology

    2.numbus:這個角色所作的操做相對較多,具體以下:

      a.會把提交的jar包放到nimbus所在服務器的nimbus/inbox目錄下

      b.submitTopology方法會負責topology的處理;包括檢查集羣是否有active節點,配置文件是否正確,是否有重複的topology名稱,各個bolt/spout名是否使用相同的id等

      c.創建topology的本地目錄:nimbus/stormdist/topology-uuid

        該目錄包括三個文件:

          stormjar.jar --從nimbus/inbox目錄拷貝

          stormcode.ser --此topology對象的序列化

          stormconf.ser --此topology的配置文件序列化

       d.nimbus 任務分配,根據topology中的定義,給是spout/bolt設置Task的數據,並分配對應的task-id,最後把分配好的信息寫入到zookeeper的。

       e.nimbus在zookeeper上建立taskbeats目錄,要求每一個task定時向nimbus彙報

       f.將分配好的任務寫入到zookeeper,此時任務提交完畢,zk上的目錄爲assignments/topology-uuid

       g.將topology信息寫入到zookeeper/storms目錄

    3.Supervisor

      a.按期掃描zookeeper上的storm目錄,看看是否有新的任務,有就下載。

      b.刪除本地不須要的topology

      c.根據nimbus指定的任務信息啓動worker

    4.worker

      a.查看須要執行的任務,根據任務id分辨出spout/bolt任務

      b.計算出所表明的spout/bolt會給那些task發送信息

      c.執行spout任務或者bolt任務

    Supervisor會定時從zookeeper獲取拓撲信息topologies、任務分配信息assignment及各種心跳信息,以此爲依據進行任務分配。

    在Supervisor同步時,會根據新的任務分配狀況來啓動新的worker或者關閉舊的worker並進行負載均衡。

    worker經過按期的更新connection信息,來獲取其應該通信的其餘worker

    worker啓動時,會根據其分配到的任務啓動一個或多個execute線程,這些線程僅會處理惟一的topology

    若是有新的topolog被提交到集羣,nimbus會從新分配任務,這個到後面會說到

    execute線程負責處理多個spout或者多個bolts的邏輯,這寫spout或者bolts,也稱爲tasks

    具體有多少個worker,多少個execute,每一個execute負責多少個task,是由配置和指定的parallelism-hint共同決定的,但這個值並不必定等於實際運行的數目

    若是計算出的總的executors超過了nimbus的限制,此topology將不會獲得執行。

 

 2.storm完成一個單詞計數功能

  注意:在咱們編寫Java代碼以前必定要將storm的安裝目錄下的lib包導入到項目中,否則程序在編譯和本地運行會報錯的

  1.wordcount 的數據發送端,這一端就是進行數據的收集,而後發送給bolt進行邏輯處理的

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 
 4 import backtype.storm.Config;
 5 import backtype.storm.LocalCluster;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.generated.AlreadyAliveException;
 8 import backtype.storm.generated.InvalidTopologyException;
 9 import backtype.storm.topology.TopologyBuilder;
10 public class WordCountTest {
11 
12     public static void main(String[] args) {
13         
14         //建立一個Topology對象
15         TopologyBuilder builder=new TopologyBuilder();
16         //設置一個數據的輸入源
17         builder.setSpout("spout", new WordCountSpoutSource(),1);
18         //設置一個數據的邏輯處理bolt,就是對每個單詞進行計數的
19         builder.setBolt("bolt", new WordCountBoldHandle(),1).shuffleGrouping("spout");
20         builder.setBolt("bolt1", new WordCountBoldOut(),1).shuffleGrouping("bolt");
21         
22         //設置是在本地運行仍是在集羣中運行
23         Map map=new HashMap();
24         map.put(Config.TOPOLOGY_WORKERS, 2);
25         if(args.length>0){
26             try {
27                 //在集羣中暈運行這些topology
28                 StormSubmitter.submitTopology("topology", map, builder.createTopology());
29             } catch (AlreadyAliveException | InvalidTopologyException e) {
30                 // TODO Auto-generated catch block
31                 e.printStackTrace();
32             }
33         }else {
34             //在本地運行這些topology
35             LocalCluster loadCluster=new LocalCluster();
36             loadCluster.submitTopology("topology", map, builder.createTopology());
37         }
38         
39         
40         
41         
42         
43     }
44 }
View Code

  2.第一個bolt接收到來自spout的數據,將每個單詞放到map集合中,使用containsKey方法檢查map是否這個單詞的key,若是有就進行value進行加1操做,沒有就進行put操做,將單詞作爲key,1做爲value放到map集合中,最後將每一個map發送給下一個bolt,

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 
 4 import backtype.storm.task.OutputCollector;
 5 import backtype.storm.task.TopologyContext;
 6 import backtype.storm.topology.IRichBolt;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.tuple.Fields;
 9 import backtype.storm.tuple.Tuple;
10 import backtype.storm.tuple.Values;
11 
12 public class WordCountBoldHandle implements IRichBolt{
13     
14     TopologyContext context;
15     OutputCollector collector;
16     
17     Map map=new HashMap<String, Integer>();
18 
19     @Override
20     public void cleanup() {
21         // TODO Auto-generated method stub
22         
23     }
24 
25     @Override
26     public void execute(Tuple tuple) {
27         // TODO Auto-generated method stub
28         String word=(String)tuple.getValueByField("word");
29         System.out.println("接收數據1"+"----"+word);
30         if(map.containsKey(word)){
31             //這裏面是將咱們的額Integer對象拆箱成一個int類型
32             int num=(int) map.get(word)+1;
33             map.put(word, num);
34         }else {
35             map.put(word, 1);
36         }
37         
38         collector.emit(new Values(map));
39         
40         
41     }
42 
43     @Override
44     public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
45         // TODO Auto-generated method stub
46         this.collector=collector;
47         this.context=context;
48         
49     }
50 
51     @Override
52     public void declareOutputFields(OutputFieldsDeclarer declarer) {
53         // TODO Auto-generated method stub
54         declarer.declare(new Fields("WordMap"));
55         
56     }
57 
58     @Override
59     public Map<String, Object> getComponentConfiguration() {
60         // TODO Auto-generated method stub
61         return null;
62     }
View Code

  3.第二個bolt接收到來自的bolt的數據,將map裏的數據進行遍歷輸出。

 1 package com.cgh.storm.wordcount;
 2 
 3 import java.util.Map;
 4 import java.util.Set;
 5 
 6 import backtype.storm.task.OutputCollector;
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.IRichBolt;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.tuple.Tuple;
11 
12 public class WordCountBoldOut implements IRichBolt{
13 
14     @Override
15     public void cleanup() {
16         // TODO Auto-generated method stub
17         
18     }
19 
20     @Override
21     public void execute(Tuple arg0) {
22         // TODO Auto-generated method stub
23         //接收數據並刪除數據
24         Map map=(Map<String, Integer>)arg0.getValueByField("WordMap");
25         Set<String> set=map.keySet();
26         for(String key:set){
27             System.out.println("單詞:"+key+", 出現次數:"+map.get(key));
28         }
29         
30     }
31 
32     @Override
33     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
34         // TODO Auto-generated method stub
35         
36     }
37 
38     @Override
39     public void declareOutputFields(OutputFieldsDeclarer arg0) {
40         // TODO Auto-generated method stub
41         
42     }
43 
44     @Override
45     public Map<String, Object> getComponentConfiguration() {
46         // TODO Auto-generated method stub
47         return null;
48     }
49 
50 }
View Code

  4.本地運行測試代碼:

             

3.storm 任務提交流程:

  

 

4.storm 的worker間的通訊

  

5.storm與hadoop的有哪些異同點

  hadoop是磁盤級計算,進行計算時,數據在磁盤上,須要讀寫磁盤;storm是內存級計算,數據直接經過網絡導入內存。讀寫比磁盤塊n個數量級。

  hadoop 的mapreduce基於hdfs ,須要切分輸入的數據,產生中間數據文件、排序、數據壓縮、多份複製等,效率較低

  storm基於zeroMQ這個高性能的消息通信庫,不持久化數據。

  最主要的方面:hadoop使用磁盤做爲中間交換的介質,而storm的數據是一直在內存中流轉的,二者的面向的領域也不徹底相同,一個批處理,基於任務調度的,另外一個是實時處理,基於流的,以水爲例,hadoop能夠當作做爲一桶一桶的搬,而storm是用水管,預先接好(topology),而後打開水龍頭,水就會源源不斷的流出來。

相關文章
相關標籤/搜索