設計一個topology,來實現對文檔裏面的單詞出現的頻率進行統計。整個topology分爲三個部分:java
SentenceSpout:數據源,在已知的英文句子中,隨機發送一條句子出去。apache
SplitBolt:負責將單行文本記錄(句子)切分紅單詞ide
CountBolt:負責對單詞的頻率進行累加ui
1 package com.ntjr.bigdata; 2 3 import org.apache.storm.Config; 4 import org.apache.storm.LocalCluster; 5 import org.apache.storm.StormSubmitter; 6 import org.apache.storm.generated.AlreadyAliveException; 7 import org.apache.storm.generated.AuthorizationException; 8 import org.apache.storm.generated.InvalidTopologyException; 9 import org.apache.storm.topology.TopologyBuilder; 10 import org.apache.storm.tuple.Fields; 11 12 public class WrodCountTopolog { 13 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { 14 //使用TopologyBuilder 構建一個topology 15 TopologyBuilder topologyBuilder = new TopologyBuilder(); 16 //發送英文句子 17 topologyBuilder.setSpout("sentenceSpout", new SentenceSpout(), 2); 18 //將一行行的文本切分紅單詞 19 topologyBuilder.setBolt("splitBolt", new SplitBolt(), 2).shuffleGrouping("sentenceSpout"); 20 //將單詞的頻率進行累加 21 topologyBuilder.setBolt("countBolt", new CountBolt(), 2).fieldsGrouping("splitBolt", new Fields("word")); 22 //啓動topology的配置信息 23 Config config = new Config(); 24 //定義集羣分配多少個工做進程來執行這個topology 25 config.setNumWorkers(3); 26 27 //本地模式提交topology 28 LocalCluster localCluster = new LocalCluster(); 29 localCluster.submitTopology("mywordCount", config, topologyBuilder.createTopology()); 30 31 //集羣模式提交topology 32 StormSubmitter.submitTopologyWithProgressBar("mywordCount", config, topologyBuilder.createTopology()); 33 34 } 35 36 }
1 package com.ntjr.bigdata; 2 3 import java.util.Map; 4 5 import org.apache.storm.spout.SpoutOutputCollector; 6 import org.apache.storm.task.TopologyContext; 7 import org.apache.storm.topology.OutputFieldsDeclarer; 8 import org.apache.storm.topology.base.BaseRichSpout; 9 import org.apache.storm.tuple.Fields; 10 import org.apache.storm.tuple.Values; 11 12 public class SentenceSpout extends BaseRichSpout { 13 14 private static final long serialVersionUID = 1L; 15 // 用來收集Spout輸出的tuple 16 private SpoutOutputCollector collector; 17 18 @Override 19 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 20 this.collector = collector; 21 22 } 23 24 // 該方法會循環調用 25 @Override 26 public void nextTuple() { 27 collector.emit(new Values("i am lilei love hanmeimei")); 28 } 29 30 // 消息源能夠發送多條消息流,該方法定義輸出的消息類型的字段 31 @Override 32 public void declareOutputFields(OutputFieldsDeclarer declarer) { 33 declarer.declare(new Fields("love")); 34 35 } 36 37 }
1 package com.ntjr.bigdata; 2 3 import java.util.Map; 4 5 import org.apache.storm.task.OutputCollector; 6 import org.apache.storm.task.TopologyContext; 7 import org.apache.storm.topology.OutputFieldsDeclarer; 8 import org.apache.storm.topology.base.BaseRichBolt; 9 import org.apache.storm.tuple.Fields; 10 import org.apache.storm.tuple.Tuple; 11 import org.apache.storm.tuple.Values; 12 13 public class SplitBolt extends BaseRichBolt { 14 15 private static final long serialVersionUID = 1L; 16 17 private OutputCollector collector; 18 19 // 該方法只會調用一次用來執行初始化 20 @Override 21 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 22 this.collector = collector; 23 24 } 25 26 // 接收的參數時spout發出來的句子,一個句子就是一個tuple 27 @Override 28 public void execute(Tuple input) { 29 String line = input.getString(0); 30 String[] words = line.split(" "); 31 for (String word : words) { 32 collector.emit(new Values(word, 1)); 33 } 34 35 } 36 37 // 定義輸出類型,輸出類型爲單詞和單詞的數目和collector.emit(new Values(word, 1));對應 38 @Override 39 public void declareOutputFields(OutputFieldsDeclarer declarer) { 40 declarer.declare(new Fields("word", "num")); 41 42 } 43 44 }
1 package com.ntjr.bigdata; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.storm.task.OutputCollector; 7 import org.apache.storm.task.TopologyContext; 8 import org.apache.storm.topology.OutputFieldsDeclarer; 9 import org.apache.storm.topology.base.BaseRichBolt; 10 import org.apache.storm.tuple.Tuple; 11 12 public class CountBolt extends BaseRichBolt { 13 14 private static final long serialVersionUID = 1L; 15 private OutputCollector collector; 16 // 用來保存最後的計算結果 key:單詞,value:單詞的個數 17 Map<String, Integer> map = new HashMap<String, Integer>(); 18 19 // 該方法調用一次用來執行初始化 20 @Override 21 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 22 this.collector = collector; 23 24 } 25 26 @Override 27 public void execute(Tuple input) { 28 String word = input.getString(0); 29 Integer num = input.getInteger(1); 30 31 if (map.containsKey(word)) { 32 Integer count = map.get(word); 33 map.put(word, count + num); 34 } else { 35 map.put(word, num); 36 } 37 System.out.println("count:" + map); 38 } 39 40 @Override 41 public void declareOutputFields(OutputFieldsDeclarer declarer) { 42 43 } 44 45 }
3.1 Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple,保證每一個bolt接收到的tuple數目大體相同。this
3.2 Fields Grouping:按字段分組,好比按userid來分組,具備一樣userid的tuple會被分到相同的Bolts裏的一個task,而不一樣的userid則會被分配到不一樣的bolts裏的task。spa
3.3 All Grouping:廣播發送,對於每個tuple,全部的bolts都會收到。線程
3.4 Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。設計
3.5 Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是同樣的效果, 有一點不一樣的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏面去執行。3d
3.6 Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪一個task處理這個消息。只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。並且這種消息tuple必須使用emitDirect方法來發射。code
消息處理者能夠經過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
3.7 Local or shuffle grouping:若是目標bolt有一個或者多個task在同一個工做進程中,tuple將會被隨機發生給這些tasks。不然,和普通的Shuffle Grouping行爲一致。