問題導讀:
1.你認爲何圖形能夠顯示hadoop與storm的區別?(電梯)
2.本文是如何形象講解hadoop與storm的?(離線批量處理、實時流式處理)
3.hadoop map/reduce對應storm那兩個概念?(spout/bolt)
4.storm流由誰來組成?(Tuples)
5.tuple具體是什麼形式?
數據庫
什麼是Storm?
Storm是:緩存
區別:
咱們知道hadoop是批處理,storm是流式處理,那麼是什麼是批處理,什麼流式處理?
Storm和Hadoop主要區別是實時和批處理的區別:
Storm概念組成:Spout和Bolt組成Topology。
Tuple是Storm的數據模型,如['jdon',12346]
多個Tuple組成事件流:
服務器
Spout是讀取須要分析處理的數據源,而後轉爲Tuples,這些數據源能夠是Web日誌、 API調用、數據庫等等。Spout至關於事件流的生產者。
Bolt 處理Tuples而後再建立新的Tuples流,Bolt至關於事件流的消費者。
Bolt 做爲真正業務處理者,主要實現大數據處理的核心功能,好比轉換數據,應用相應過濾器,計算和聚合數據(好比統計總和等等) 。
以Twitter的某個Tweet爲案例,看看Storm如何處理:
這些tweett貼內容是:「No Small Cell Lung #Cancer(沒有小細胞肺癌#癌症)」 "An #OnCology Consult...."
這些貼被Spout讀取之後,產生Tuple,字段名是tweet,內容是"No Small Cell Lung #Cancer",格式相似:['No Small Cell Lung #Cancer',133221]。
而後進入被流 消費者Bolt進行處理,第一個Bolt是SplitSentence,將tuple內容進行分離,結果成爲:一個個單詞:"No" "Small" "Cell" "Lung" "#Cancer" ;而後通過第二個Bolt進行過濾HashTagFilter處理,Hash標籤是單詞中用#標註的,也就是Cancer;再通過HasTagCount計數,能夠本地內存緩存這個計數結果,最後經過PrinterBolt打印出標籤單詞統計結果 。網絡
咱們使用Stom所要作的就是編制Spout和Bolt代碼:架構
1 public class RandomSentenceSpout extends BaseRichSpout { 2 SpoutOutputCollector collector; 3 Random random; 4 //讀入外部數據 5 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 6 this.collector = collector; 7 random = new Random(); 8 } 9 //產生Tuple 10 public void nextTuple() { 11 String[] sentences = new String[] { 12 "No Small Cell Lung #Cancer", 13 "An #OnCology Consultant apple a day keeps the doctor away", 14 "four score and seven years ago", 15 "snow white and the seven dwarfs", 16 "i am at two with nature" 17 }; 18 String tweet = sentences[random.nextInt(sentences.length)]; 19 //定義字段名"tweet" 的值 20 collector.emit(new Values(tweet)); 21 } 22 // 定義字段名"tweet" 23 public void declareOutputFields(OutputFieldsDeclarer declarer) { 24 declarer.declare(new Fields("tweet")); 25 } 26 @Override 27 public void ack(Object msgId) {} 28 @Override 29 public void fail(Object msgId) {} 30 }
下面是Bolt的代碼編寫:app
1 public class SplitSentenceBolt extends BaseRichBolt { 2 OutputCollector collector; 3 @Override 4 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 5 this.collector = collector; 6 } 7 @Override 消費者激活主要方法:分離成單個單詞 8 public void execute(Tuple input) { 9 for (String s : input.getString(0).split("\\s")) { 10 collector.emit(new Values(s)); 11 } 12 } 13 @Override 定義新的字段名 14 public void declareOutputFields(OutputFieldsDeclarer declarer) { 15 declarer.declare(new Fields("word")); 16 }
最後是裝配運行Spout和Bolt的客戶端調用代碼:dom
1 public class WordCountTopology { 2 public static void main(String[] args) throws Exception { 3 TopologyBuilder builder = new TopologyBuilder(); 4 builder.setSpout("tweet", new RandomSentenceSpout(), 2); 5 builder.setBolt("split", new SplitSentenceBolt(), 4) 6 .shuffleGrouping("tweet") 7 .setNumTasks(8); 8 builder.setBolt("count", new WordCountBolt(), 6) 9 .fieldsGrouping("split", new Fields("word")); 10 ..設置多個Bolt 11 Config config = new Config(); 12 config.setNumWorkers(4); 13 14 StormSubmitter.submitTopology("wordcount", config, builder.createTopology()); 15 //Local testing 16 //LocalCluster cluster = new LocalCluster(); 17 //cluster.submitTopology("wordcount", config, builder.createTopology()); 18 //Thread.sleep(10000); 19 //cluster.shutdown(); 20 } 21 }
在這個代碼中定義了一些參數好比Works的數目是4,其含義在後面詳細分析。
下面咱們要將上面這段代碼發佈部署到Storm中,首先了解Storm物理架構圖:
分佈式
Nimbus是一個主後臺處理器,主要負責:
1.發佈分發代碼
2.分配任務
3.監控失敗。
Supervisor是負責當前這個節點的後臺工做處理器的監聽。
Work相似Java的線程,採起JDK的Executor 。ide
下面開始將咱們的代碼部署到這個網絡拓撲中:
將代碼Jar包上傳到Nimbus的inbox,包括全部的依賴包,而後提交。
Nimbus將保存在本地文件系統,而後開始配置網絡拓撲,分配開始拓撲。
見下圖:
Nimbus服務器將拓撲Jar 配置和結構下載到 Supervisor,負載平衡ZooKeeper分配某個特定的Supervisor服務器,而Supervisor開始基於配置分配Work,Work調用JDK的Executor啓動線程,開始任務處理。
下面是咱們代碼對拓撲分配的參數示意圖:
Executor啓動的線程數目是12個,組件的實例是16個,那麼如何在實際服務器中分配呢?以下圖:
圖中RsSpout表明咱們的代碼中RandomSentenceSpout;SplitSentenceBolt簡寫爲SSbolt。oop