本實例爲入門篇無可靠性保證明例,關於storm的介紹,以及一些術語名詞等,能夠參考Storm介紹(一)、Storm介紹(二)。html
本案例是基於storm0.9.3版本java
1.案例結構
案例:Word Count案例數組
語句Spout --> 語句分隔Bolt --> 單詞計數Bolt --> 上報Bolt安全
2.語句生成Spout - SentenceSpout
做爲入門案例,咱們直接從一個數組中不斷讀取語句,做爲數據來源。
SentenceSpout不斷讀取語句將其做爲數據來源,組裝成單值tuple(鍵名sentence,鍵值爲祖父穿格式的語句)向後發射。
{"sentence":"i am so shuai!"}ide
3.代碼結構post
話很少說,上代碼:ui
1 import backtype.storm.Config; 2 import backtype.storm.LocalCluster; 3 import backtype.storm.generated.StormTopology; 4 import backtype.storm.topology.TopologyBuilder; 5 import backtype.storm.tuple.Fields; 6 7 public class WCTopologyDriver { 8 public static void main(String[] args) throws Exception { 9 //1.建立組件 10 SentenceSpout sentenceSpout = new SentenceSpout(); 11 SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt(); 12 WordCountBolt wordCountBolt = new WordCountBolt(); 13 ReportBolt reportBolt = new ReportBolt(); 14 15 //2.建立構建者 16 TopologyBuilder builder = new TopologyBuilder(); 17 18 //3.向構建者描述拓撲結構 19 builder.setSpout("Sentence_Spout", sentenceSpout); 20 builder.setBolt("Split_Sentence_Bolt", splitSentenceBolt) 21 .shuffleGrouping("Sentence_Spout"); 22 builder.setBolt(" ", wordCountBolt) 23 .fieldsGrouping("Split_Sentence_Bolt", new Fields("word")); 24 builder.setBolt("Report_Bolt", reportBolt) 25 .shuffleGrouping("Word_Count_Bolt"); 26 27 //4.經過構建者建立拓撲 28 StormTopology topology = builder.createTopology(); 29 30 //5.將拓撲提交到集羣中運行 31 //Config conf = new Config(); 32 //StormSubmitter.submitTopology("WC_Topology", conf, topology); 33 34 //5.建立本地集羣 模擬運行拓撲 35 LocalCluster cluster = new LocalCluster(); 36 Config conf = new Config(); 37 cluster.submitTopology("WC_Topology", conf, topology); 38 39 Thread.sleep(10 * 1000); 40 cluster.killTopology("WC_Topology"); 41 cluster.shutdown(); 42 } 43 }
1 import java.util.Map; 2 3 import backtype.storm.spout.SpoutOutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichSpout; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Values; 9 10 public class SentenceSpout extends BaseRichSpout { 11 12 private String [] sentences = { 13 "my name is park", 14 "i am so shuai", 15 "do you like me", 16 "are you sure you do not like me", 17 "ok i am sure" 18 }; 19 20 private SpoutOutputCollector collector = null; 21 22 /** 23 * 初始化的方法 24 * 當前組件初始化時 調用 執行初始化操做 25 * conf:表明當前topology相關配置信息 26 * context:表明上下文環境 能夠用來獲取 任務id 組件id 輸入輸出相關信息 等信息 27 * collector:表明發送者 能夠用來發送 拓撲 能夠在任什麼時候候發送 此對象線程安全 能夠放心的保存在類的內部做爲類的成員 28 */ 29 @Override 30 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 31 this.collector = collector; 32 } 33 34 /** 35 * storm會在一個單一線程中不停的調用此方法 要求發送tuple 36 * 若是有數據要發 直接發 若是沒有數據要發 也不要阻塞這個方法 而是直接返回便可 37 * 若是真的沒有數據要發送 最好睡上一個很短的時間 以便釋放cpu 不至於浪費過多資源 38 */ 39 private int index = 0; 40 @Override 41 public void nextTuple() { 42 if(index < sentences.length){ 43 collector.emit(new Values(sentences[index])); 44 index++; 45 }else{ 46 try { 47 Thread.sleep(1); 48 } catch (InterruptedException e) { 49 e.printStackTrace(); 50 } 51 return; 52 } 53 } 54 55 /** 56 * 用來聲明輸出信息 57 * declarer:聲明輸出的流的編號 輸出的tuple中的字段 以及是不是一個指向性的流 58 * 要注意 組件發送的tuple的結構 都要如今此方法中聲明 59 */ 60 @Override 61 public void declareOutputFields(OutputFieldsDeclarer declarer) { 62 declarer.declare(new Fields("sentence")); 63 } 64 65 }
1 import java.util.Map; 2 3 import backtype.storm.task.OutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 public class SplitSentenceBolt extends BaseRichBolt{ 12 13 private OutputCollector collector = null; 14 15 /** 16 * 初始化的方法 17 * 當前組件初始化時 調用 執行初始化操做 18 * conf:表明當前topology相關配置信息 19 * context:表明上下文環境 能夠用來獲取 任務id 組件id 輸入輸出相關信息 等信息 20 * collector:表明發送者 能夠用來發送 拓撲 能夠在任什麼時候候發送 此對象線程安全 能夠放心的保存在類的內部做爲類的成員 21 */ 22 @Override 23 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 24 this.collector = collector; 25 } 26 27 /** 28 * 對於輸入的tuple 一個tuple觸發一次此方法 29 * 在這個方法中對tuple進行處理 30 */ 31 @Override 32 public void execute(Tuple input) { 33 String sentence = input.getStringByField("sentence"); 34 String [] words = sentence.split(" "); 35 for(String word : words){ 36 collector.emit(new Values(word)); 37 } 38 } 39 40 /** 41 * 用來聲明輸出信息 42 * declarer:聲明輸出的流的編號 輸出的tuple中的字段 以及是不是一個指向性的流 43 * 要注意 組件發送的tuple的結構 都要如今此方法中聲明 44 */ 45 @Override 46 public void declareOutputFields(OutputFieldsDeclarer declarer) { 47 declarer.declare(new Fields("word")); 48 } 49 50 }
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import backtype.storm.task.OutputCollector; 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.OutputFieldsDeclarer; 7 import backtype.storm.topology.base.BaseRichBolt; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Tuple; 10 import backtype.storm.tuple.Values; 11 12 public class WordCountBolt extends BaseRichBolt { 13 14 private OutputCollector collector = null; 15 16 @Override 17 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 18 this.collector = collector; 19 } 20 21 private Map<String,Integer> map = new HashMap<>(); 22 @Override 23 public void execute(Tuple input) { 24 String word = input.getStringByField("word"); 25 map.put(word, map.containsKey(word) ? map.get(word)+1 : 1); 26 collector.emit(new Values(word,map.get(word))); 27 } 28 29 @Override 30 public void declareOutputFields(OutputFieldsDeclarer declarer) { 31 declarer.declare(new Fields("word","count")); 32 } 33 34 }
1 import java.util.Map; 2 3 import backtype.storm.task.OutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichBolt; 7 import backtype.storm.tuple.Tuple; 8 9 public class ReportBolt extends BaseRichBolt { 10 11 @Override 12 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 13 14 } 15 16 @Override 17 public void execute(Tuple input) { 18 String word = input.getStringByField("word"); 19 int count = input.getIntegerByField("count"); 20 System.out.println("--單詞數量發生變化:"+word+"~"+count+"--"); 21 } 22 23 @Override 24 public void declareOutputFields(OutputFieldsDeclarer declarer) { 25 26 } 27 28 }
運行結果:this
補充,如下是本文案例用到的jar包,因爲太大,沒有上傳,下載0.9.3的storm源碼,解壓後文件夾中的lib下的全部jar包:
url