圖解Storm

問題導讀:
1.你認爲何圖形能夠顯示hadoop與storm的區別?(電梯)
2.本文是如何形象講解hadoop與storm的?(離線批量處理、實時流式處理)
3.hadoop map/reduce對應storm那兩個概念?(spout/bolt)
4.storm流由誰來組成?(Tuples)
5.tuple具體是什麼形式?
數據庫




什麼是Storm?
Storm是:緩存

  • 快速且可擴展伸縮
  • 容錯
  • 確保消息可以被處理
  • 易於設置和操做
  • 開源的分佈式實時計算系統
  • 最初由Nathan Marz開發
  • 使用Java 和 Clojure 編寫

區別:
咱們知道hadoop是批處理,storm是流式處理,那麼是什麼是批處理,什麼流式處理?
Storm和Hadoop主要區別是實時和批處理的區別:
 
Storm概念組成:Spout和Bolt組成Topology。
 

Tuple是Storm的數據模型,如['jdon',12346]
多個Tuple組成事件流:
服務器


Spout是讀取須要分析處理的數據源,而後轉爲Tuples,這些數據源能夠是Web日誌、 API調用、數據庫等等。Spout至關於事件流的生產者
Bolt 處理Tuples而後再建立新的Tuples流,Bolt至關於事件流的消費者

Bolt 做爲真正業務處理者,主要實現大數據處理的核心功能,好比轉換數據,應用相應過濾器,計算和聚合數據(好比統計總和等等) 。
以Twitter的某個Tweet爲案例,看看Storm如何處理:

這些tweett貼內容是:「No Small Cell Lung #Cancer(沒有小細胞肺癌#癌症)」 "An #OnCology Consult...."
這些貼被Spout讀取之後,產生Tuple,字段名是tweet,內容是"No Small Cell Lung #Cancer",格式相似:['No Small Cell Lung #Cancer',133221]。
而後進入被流 消費者Bolt進行處理,第一個Bolt是SplitSentence,將tuple內容進行分離,結果成爲:一個個單詞:"No" "Small" "Cell" "Lung" "#Cancer" ;而後通過第二個Bolt進行過濾HashTagFilter處理,Hash標籤是單詞中用#標註的,也就是Cancer;再通過HasTagCount計數,能夠本地內存緩存這個計數結果,最後經過PrinterBolt打印出標籤單詞統計結果 。網絡

咱們使用Stom所要作的就是編制Spout和Bolt代碼:架構

 1 public class RandomSentenceSpout extends BaseRichSpout {
 2   SpoutOutputCollector collector;
 3   Random random;
 4   //讀入外部數據
 5   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 6     this.collector = collector;
 7     random = new Random();
 8   }
 9   //產生Tuple
10    public void nextTuple() {
11     String[] sentences = new String[] {
12       "No Small Cell Lung #Cancer",
13       "An #OnCology Consultant apple a day keeps the doctor away",
14       "four score and seven years ago",
15       "snow white and the seven dwarfs",
16       "i am at two with nature"
17     };
18     String tweet = sentences[random.nextInt(sentences.length)];
19     //定義字段名"tweet" 的值 
20     collector.emit(new Values(tweet));
21   }
22   // 定義字段名"tweet"
23   public void declareOutputFields(OutputFieldsDeclarer declarer) {
24     declarer.declare(new Fields("tweet"));
25   }
26   @Override
27   public void ack(Object msgId) {}
28   @Override
29   public void fail(Object msgId) {}
30 }

下面是Bolt的代碼編寫:app

 1 public class SplitSentenceBolt extends BaseRichBolt {
 2   OutputCollector collector;
 3   @Override
 4   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 5     this.collector = collector;
 6   }
 7   @Override 消費者激活主要方法:分離成單個單詞
 8   public void execute(Tuple input) {
 9     for (String s : input.getString(0).split("\\s")) {
10       collector.emit(new Values(s));
11     }
12   }
13   @Override 定義新的字段名
14   public void declareOutputFields(OutputFieldsDeclarer declarer) {
15     declarer.declare(new Fields("word"));
16   }

最後是裝配運行Spout和Bolt的客戶端調用代碼:dom

 1 public class WordCountTopology {
 2   public static void main(String[] args) throws Exception {
 3     TopologyBuilder builder = new TopologyBuilder();
 4     builder.setSpout("tweet", new RandomSentenceSpout(), 2);
 5     builder.setBolt("split", new SplitSentenceBolt(), 4)
 6       .shuffleGrouping("tweet")
 7       .setNumTasks(8);
 8     builder.setBolt("count", new WordCountBolt(), 6)
 9       .fieldsGrouping("split", new Fields("word"));
10     ..設置多個Bolt
11     Config config = new Config();
12     config.setNumWorkers(4);
13     
14     StormSubmitter.submitTopology("wordcount", config, builder.createTopology());
15 //Local testing
16 //LocalCluster cluster = new LocalCluster();
17 //cluster.submitTopology("wordcount", config, builder.createTopology());
18 //Thread.sleep(10000);
19 //cluster.shutdown();
20 }
21 }


在這個代碼中定義了一些參數好比Works的數目是4,其含義在後面詳細分析。

下面咱們要將上面這段代碼發佈部署到Storm中,首先了解Storm物理架構圖
分佈式

 

Nimbus是一個主後臺處理器,主要負責:
  1.發佈分發代碼
  2.分配任務
  3.監控失敗。
Supervisor是負責當前這個節點的後臺工做處理器的監聽。
Work相似Java的線程,採起JDK的Executor 。ide

 

下面開始將咱們的代碼部署到這個網絡拓撲中:
將代碼Jar包上傳到Nimbus的inbox,包括全部的依賴包,而後提交。
Nimbus將保存在本地文件系統,而後開始配置網絡拓撲,分配開始拓撲。
見下圖:
Nimbus服務器將拓撲Jar 配置和結構下載到 Supervisor,負載平衡ZooKeeper分配某個特定的Supervisor服務器,而Supervisor開始基於配置分配Work,Work調用JDK的Executor啓動線程,開始任務處理。
下面是咱們代碼對拓撲分配的參數示意圖:
 

Executor啓動的線程數目是12個,組件的實例是16個,那麼如何在實際服務器中分配呢?以下圖:
圖中RsSpout表明咱們的代碼中RandomSentenceSpout;SplitSentenceBolt簡寫爲SSbolt。oop

相關文章
相關標籤/搜索