官方提供的storm starter示例中,有不少應用的例子,對storm的應用場景理解頗有幫助。本文結合源碼來進行功能分解,記錄一下,做爲記憶索引吧。 java
先來看一個比較簡單的示例:WordCountTopology,原版代碼該示例是爲了說明多語言適配而作的應用場景,主要功能是隨機生成一些String,將這些String劃分分組,統計各單詞出現數量。後來修改了一下,去掉了py調用的地方。使用java來進行詞組劃分。 dom
先來看一下Topology: ide
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); // 數據源 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 單詞劃分 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); // 統計單詞出現個數 Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar("wordCount", conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
RandomSentenceSpout功能很簡單,隨機生成字符串到tuple中,key爲word。 ui
再看一下bolt,該示例中使用了兩類分組方式shuffleGrouping(隨機分組),fieldsGrouping(按字段分組)。使用shuffleGrouping來進行單詞劃分,爲了保證單詞統計時都在一個bolt中進行,使用fieldsGrouping來進行word劃分統計,運行時會看到相同key值的tuple會分配到同一線程上。 spa
public static class SplitSentence implements IBasicBolt { public void prepare(Map conf, TopologyContext context) { } public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { // 將Spout接收到的tuple按空格進行分解,產生單詞數據流 collector.emit(new Values(word)); } } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); // 定義key值 } @Override public Map<String, Object> getComponentConfiguration() { return null; } }單詞統計bolt:
public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); // 輸出結果word + word number } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
總結: 線程
一般狀況下,爲了保證數據可靠性與完整性,實現一個Bolt,能夠實現IRichBolt接口或繼承BaseRichBolt,若是不想本身處理結果反饋,能夠實現IBasicBolt接口或繼承BaseBasicBolt,至關於自動處理了prepare方法和collector.emit.ack(inputTuple); code