storm入門基礎實例(無可靠性保證明例) Storm介紹(一) Storm介紹(二)

本實例爲入門篇無可靠性保證明例,關於storm的介紹,以及一些術語名詞等,能夠參考Storm介紹(一)Storm介紹(二)html

本案例是基於storm0.9.3版本java

1.案例結構
案例:Word Count案例數組

語句Spout --> 語句分隔Bolt --> 單詞計數Bolt --> 上報Bolt安全

2.語句生成Spout - SentenceSpout
做爲入門案例,咱們直接從一個數組中不斷讀取語句,做爲數據來源。
SentenceSpout不斷讀取語句將其做爲數據來源,組裝成單值tuple(鍵名sentence,鍵值爲祖父穿格式的語句)向後發射。
{"sentence":"i am so shuai!"}ide

3.代碼結構post

話很少說,上代碼:ui

 1 import backtype.storm.Config;
 2 import backtype.storm.LocalCluster;
 3 import backtype.storm.generated.StormTopology;
 4 import backtype.storm.topology.TopologyBuilder;
 5 import backtype.storm.tuple.Fields;
 6 
 7 public class WCTopologyDriver {
 8     public static void main(String[] args) throws Exception {
 9         //1.建立組件
10         SentenceSpout sentenceSpout = new SentenceSpout();
11         SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
12         WordCountBolt wordCountBolt = new WordCountBolt();
13         ReportBolt reportBolt = new ReportBolt();
14         
15         //2.建立構建者
16         TopologyBuilder builder = new TopologyBuilder();
17         
18         //3.向構建者描述拓撲結構
19         builder.setSpout("Sentence_Spout", sentenceSpout);
20         builder.setBolt("Split_Sentence_Bolt", splitSentenceBolt)
21         .shuffleGrouping("Sentence_Spout");
22         builder.setBolt(" ", wordCountBolt)
23         .fieldsGrouping("Split_Sentence_Bolt", new Fields("word"));
24         builder.setBolt("Report_Bolt", reportBolt)
25         .shuffleGrouping("Word_Count_Bolt");
26         
27         //4.經過構建者建立拓撲
28         StormTopology topology = builder.createTopology();
29         
30         //5.將拓撲提交到集羣中運行
31         //Config conf = new Config();
32         //StormSubmitter.submitTopology("WC_Topology", conf, topology);
33         
34         //5.建立本地集羣 模擬運行拓撲 
35         LocalCluster cluster = new LocalCluster();
36         Config conf = new Config();
37         cluster.submitTopology("WC_Topology", conf, topology);
38         
39         Thread.sleep(10 * 1000);
40         cluster.killTopology("WC_Topology");
41         cluster.shutdown();
42     }
43 }
 1 import java.util.Map;
 2 
 3 import backtype.storm.spout.SpoutOutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.OutputFieldsDeclarer;
 6 import backtype.storm.topology.base.BaseRichSpout;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Values;
 9 
10 public class SentenceSpout extends BaseRichSpout {
11 
12     private String [] sentences = {
13         "my name is park",
14         "i am so shuai",
15         "do you like me",
16         "are you sure you do not like me",
17         "ok i am sure"
18     }; 
19     
20     private SpoutOutputCollector collector = null;
21     
22     /**
23      * 初始化的方法
24      * 當前組件初始化時 調用  執行初始化操做
25      * conf:表明當前topology相關配置信息
26      * context:表明上下文環境 能夠用來獲取 任務id 組件id 輸入輸出相關信息 等信息
27      * collector:表明發送者 能夠用來發送 拓撲 能夠在任什麼時候候發送 此對象線程安全  能夠放心的保存在類的內部做爲類的成員
28      */
29     @Override
30     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
31         this.collector = collector;
32     }
33 
34     /**
35      * storm會在一個單一線程中不停的調用此方法 要求發送tuple
36      * 若是有數據要發 直接發 若是沒有數據要發 也不要阻塞這個方法 而是直接返回便可
37      * 若是真的沒有數據要發送 最好睡上一個很短的時間 以便釋放cpu 不至於浪費過多資源
38      */
39     private int index = 0;
40     @Override
41     public void nextTuple() {
42         if(index < sentences.length){
43             collector.emit(new Values(sentences[index]));
44             index++;
45         }else{
46             try {
47                 Thread.sleep(1);
48             } catch (InterruptedException e) {
49                 e.printStackTrace();
50             }
51             return;
52         }
53     }
54 
55     /**
56      * 用來聲明輸出信息
57      * declarer:聲明輸出的流的編號 輸出的tuple中的字段 以及是不是一個指向性的流
58      * 要注意 組件發送的tuple的結構 都要如今此方法中聲明
59      */
60     @Override
61     public void declareOutputFields(OutputFieldsDeclarer declarer) {
62         declarer.declare(new Fields("sentence"));
63     }
64     
65 }
 1 import java.util.Map;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.OutputFieldsDeclarer;
 6 import backtype.storm.topology.base.BaseRichBolt;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 public class SplitSentenceBolt extends BaseRichBolt{
12 
13     private OutputCollector collector = null;
14     
15     /**
16      * 初始化的方法
17      * 當前組件初始化時 調用  執行初始化操做
18      * conf:表明當前topology相關配置信息
19      * context:表明上下文環境 能夠用來獲取 任務id 組件id 輸入輸出相關信息 等信息
20      * collector:表明發送者 能夠用來發送 拓撲 能夠在任什麼時候候發送 此對象線程安全  能夠放心的保存在類的內部做爲類的成員
21      */
22     @Override
23     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
24         this.collector = collector;
25     }
26     
27     /**
28      *     對於輸入的tuple 一個tuple觸發一次此方法
29      *    在這個方法中對tuple進行處理
30      */    
31     @Override
32     public void execute(Tuple input) {
33         String sentence = input.getStringByField("sentence");
34         String []  words = sentence.split(" ");
35         for(String word : words){
36             collector.emit(new Values(word));
37         }
38     }
39 
40     /**
41      * 用來聲明輸出信息
42      * declarer:聲明輸出的流的編號 輸出的tuple中的字段 以及是不是一個指向性的流
43      * 要注意 組件發送的tuple的結構 都要如今此方法中聲明
44      */
45     @Override
46     public void declareOutputFields(OutputFieldsDeclarer declarer) {
47         declarer.declare(new Fields("word"));
48     }
49     
50 }
 1 import java.util.HashMap;
 2 import java.util.Map;
 3 
 4 import backtype.storm.task.OutputCollector;
 5 import backtype.storm.task.TopologyContext;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.topology.base.BaseRichBolt;
 8 import backtype.storm.tuple.Fields;
 9 import backtype.storm.tuple.Tuple;
10 import backtype.storm.tuple.Values;
11 
12 public class WordCountBolt extends BaseRichBolt {
13 
14     private OutputCollector collector = null;
15     
16     @Override
17     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
18         this.collector = collector;
19     }
20 
21     private Map<String,Integer> map = new HashMap<>();
22     @Override
23     public void execute(Tuple input) {
24         String word = input.getStringByField("word");
25         map.put(word, map.containsKey(word) ? map.get(word)+1 : 1);
26         collector.emit(new Values(word,map.get(word)));
27     }
28 
29     @Override
30     public void declareOutputFields(OutputFieldsDeclarer declarer) {
31         declarer.declare(new Fields("word","count"));
32     }
33 
34 }
 1 import java.util.Map;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.OutputFieldsDeclarer;
 6 import backtype.storm.topology.base.BaseRichBolt;
 7 import backtype.storm.tuple.Tuple;
 8 
 9 public class ReportBolt extends BaseRichBolt {
10 
11     @Override
12     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
13 
14     }
15 
16     @Override
17     public void execute(Tuple input) {
18         String word = input.getStringByField("word");
19         int count = input.getIntegerByField("count");
20         System.out.println("--單詞數量發生變化:"+word+"~"+count+"--");
21     }
22 
23     @Override
24     public void declareOutputFields(OutputFieldsDeclarer declarer) {
25 
26     }
27 
28 }

 運行結果:this

補充,如下是本文案例用到的jar包,因爲太大,沒有上傳,下載0.9.3的storm源碼,解壓後文件夾中的lib下的全部jar包:
url

相關文章
相關標籤/搜索