本文主要研究一下storm的stream的分流與合併html
@Test public void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentence-spout", new SentenceSpout()); // SentenceSpout --> SplitStreamBolt builder.setBolt("split-bolt", new SplitStreamBolt()) .shuffleGrouping("sentence-spout"); // SplitStreamBolt split two stream --> WordCountBolt //NOTE 這裏要指定上游的bolt以及要處理的streamId builder.setBolt("long-word-count-bolt", new CountStreamBolt(),5) .shuffleGrouping("split-bolt","longWordStream"); builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5) .shuffleGrouping("split-bolt","shortWordStream"); // WordCountBolt join --> ReportBolt builder.setBolt("report-bolt", new ReportBolt()) .shuffleGrouping("long-word-count-bolt") .shuffleGrouping("short-word-count-bolt"); submitRemote(builder); }
public class SplitStreamBolt extends BaseRichBolt { private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class); private OutputCollector collector; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; } //NOTE 這裏要本身ack public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ // NOTE 這裏指定發送給指定streamId if(word.length() > 4){ this.collector.emit("longWordStream",new Values(word)); }else{ this.collector.emit("shortWordStream",new Values(word)); } } this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); //NOTE 這裏經過declareStream聲明direct stream,並指定streamId declarer.declareStream("longWordStream",true,new Fields("word")); declarer.declareStream("shortWordStream",true,new Fields("word")); } }
public class CountStreamBolt extends BaseBasicBolt{ private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class); Map<String, Integer> longWordCounts = new HashMap<String, Integer>(); Map<String, Integer> shortWordCounts = new HashMap<String, Integer>(); @Override public void execute(Tuple input, BasicOutputCollector collector) { String sourceStreamId = input.getSourceStreamId(); String word = input.getString(0); if(sourceStreamId.equals("longWordStream")){ Integer count = longWordCounts.get(word); if (count == null) count = 0; count++; longWordCounts.put(word, count); LOGGER.info("long word:{} -> {}",word,count); collector.emit(new Values(word, count)); return ; } if(sourceStreamId.equals("shortWordStream")){ Integer count = shortWordCounts.get(word); if (count == null) count = 0; count++; shortWordCounts.put(word, count); LOGGER.info("short word:{} -> {}",word,count); collector.emit(new Values(word, count)); return ; } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
default
)做爲實際的streamId