Storm 第三章 Storm編程案例及Stream Grouping詳解

1 功能說明

  設計一個topology,來實現對文檔裏面的單詞出現的頻率進行統計。整個topology分爲三個部分:java

  SentenceSpout:數據源,在已知的英文句子中,隨機發送一條句子出去。apache

  SplitBolt:負責將單行文本記錄(句子)切分紅單詞ide

  CountBolt:負責對單詞的頻率進行累加ui

2 代碼實現

 1 package com.ntjr.bigdata;
 2 
 3 import org.apache.storm.Config;
 4 import org.apache.storm.LocalCluster;
 5 import org.apache.storm.StormSubmitter;
 6 import org.apache.storm.generated.AlreadyAliveException;
 7 import org.apache.storm.generated.AuthorizationException;
 8 import org.apache.storm.generated.InvalidTopologyException;
 9 import org.apache.storm.topology.TopologyBuilder;
10 import org.apache.storm.tuple.Fields;
11 
12 public class WrodCountTopolog {
13     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
14         //使用TopologyBuilder 構建一個topology
15         TopologyBuilder topologyBuilder = new TopologyBuilder();
16         //發送英文句子
17         topologyBuilder.setSpout("sentenceSpout", new SentenceSpout(), 2);
18         //將一行行的文本切分紅單詞
19         topologyBuilder.setBolt("splitBolt", new SplitBolt(), 2).shuffleGrouping("sentenceSpout");
20         //將單詞的頻率進行累加
21         topologyBuilder.setBolt("countBolt", new CountBolt(), 2).fieldsGrouping("splitBolt", new Fields("word"));
22         //啓動topology的配置信息
23         Config config = new Config();
24         //定義集羣分配多少個工做進程來執行這個topology
25         config.setNumWorkers(3);
26         
27         //本地模式提交topology
28         LocalCluster localCluster = new LocalCluster();
29         localCluster.submitTopology("mywordCount", config, topologyBuilder.createTopology());
30         
31         //集羣模式提交topology
32         StormSubmitter.submitTopologyWithProgressBar("mywordCount", config, topologyBuilder.createTopology());
33 
34     }
35 
36 }
WrodCountTopolog.java
 1 package com.ntjr.bigdata;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.spout.SpoutOutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichSpout;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Values;
11 
12 public class SentenceSpout extends BaseRichSpout {
13 
14     private static final long serialVersionUID = 1L;
15     // 用來收集Spout輸出的tuple
16     private SpoutOutputCollector collector;
17 
18     @Override
19     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
20         this.collector = collector;
21 
22     }
23 
24     // 該方法會循環調用
25     @Override
26     public void nextTuple() {
27         collector.emit(new Values("i am lilei love hanmeimei"));
28     }
29 
30     // 消息源能夠發送多條消息流,該方法定義輸出的消息類型的字段
31     @Override
32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
33         declarer.declare(new Fields("love"));
34 
35     }
36 
37 }
SentenceSpout.java
 1 package com.ntjr.bigdata;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichBolt;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12 
13 public class SplitBolt extends BaseRichBolt {
14 
15     private static final long serialVersionUID = 1L;
16 
17     private OutputCollector collector;
18 
19     // 該方法只會調用一次用來執行初始化
20     @Override
21     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
22         this.collector = collector;
23 
24     }
25 
26     // 接收的參數時spout發出來的句子,一個句子就是一個tuple
27     @Override
28     public void execute(Tuple input) {
29         String line = input.getString(0);
30         String[] words = line.split(" ");
31         for (String word : words) {
32             collector.emit(new Values(word, 1));
33         }
34 
35     }
36 
37     // 定義輸出類型,輸出類型爲單詞和單詞的數目和collector.emit(new Values(word, 1));對應
38     @Override
39     public void declareOutputFields(OutputFieldsDeclarer declarer) {
40         declarer.declare(new Fields("word", "num"));
41 
42     }
43 
44 }
SplitBolt.java
 1 package com.ntjr.bigdata;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.storm.task.OutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.topology.base.BaseRichBolt;
10 import org.apache.storm.tuple.Tuple;
11 
12 public class CountBolt extends BaseRichBolt {
13 
14     private static final long serialVersionUID = 1L;
15     private OutputCollector collector;
16     // 用來保存最後的計算結果 key:單詞,value:單詞的個數
17     Map<String, Integer> map = new HashMap<String, Integer>();
18 
19     // 該方法調用一次用來執行初始化
20     @Override
21     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
22         this.collector = collector;
23 
24     }
25 
26     @Override
27     public void execute(Tuple input) {
28         String word = input.getString(0);
29         Integer num = input.getInteger(1);
30 
31         if (map.containsKey(word)) {
32             Integer count = map.get(word);
33             map.put(word, count + num);
34         } else {
35             map.put(word, num);
36         }
37         System.out.println("count:" + map);
38     }
39 
40     @Override
41     public void declareOutputFields(OutputFieldsDeclarer declarer) {
42 
43     }
44 
45 }
CountBolt.java

3 執行流程圖

 

Stream Grouping詳解

  3.1 Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple,保證每一個bolt接收到的tuple數目大體相同。this

  3.2 Fields Grouping:按字段分組,好比按userid來分組,具備一樣userid的tuple會被分到相同的Bolts裏的一個task,而不一樣的userid則會被分配到不一樣的bolts裏的task。spa

  3.3 All Grouping:廣播發送,對於每個tuple,全部的bolts都會收到。線程

  3.4 Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。設計

  3.5 Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是同樣的效果 有一點不一樣的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏面去執行。3d

  3.6 Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪一個task處理這個消息。只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。並且這種消息tuple必須使用emitDirect方法來發射。code

            消息處理者能夠經過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。

  3.7 Local or shuffle grouping:若是目標bolt有一個或者多個task在同一個工做進程中,tuple將會被隨機發生給這些tasks。不然,和普通的Shuffle Grouping行爲一致。

相關文章
相關標籤/搜索