storm starter學習(一)

    官方提供的storm starter示例中,有不少應用的例子,對storm的應用場景理解頗有幫助。本文結合源碼來進行功能分解,記錄一下,做爲記憶索引吧。 java

    先來看一個比較簡單的示例:WordCountTopology,原版代碼該示例是爲了說明多語言適配而作的應用場景,主要功能是隨機生成一些String,將這些String劃分分組,統計各單詞出現數量。後來修改了一下,去掉了py調用的地方。使用java來進行詞組劃分。 dom

    先來看一下Topology: ide

public static void main(String[] args) throws Exception {
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("spout", new RandomSentenceSpout(), 5); // 數據源
	builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 單詞劃分
	builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); // 統計單詞出現個數
	Config conf = new Config();
	conf.setDebug(true);

	if (args != null && args.length > 0) { 
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
					builder.createTopology());
	} else {
	    conf.setMaxTaskParallelism(3);
	    LocalCluster cluster = new LocalCluster();
	    cluster.submitTopology("word-count", conf, builder.createTopology());

	    Thread.sleep(10000);
	    cluster.shutdown();
	}
}

    RandomSentenceSpout功能很簡單,隨機生成字符串到tuple中,key爲word。 ui

    再看一下bolt,該示例中使用了兩類分組方式shuffleGrouping(隨機分組),fieldsGrouping(按字段分組)。使用shuffleGrouping來進行單詞劃分,爲了保證單詞統計時都在一個bolt中進行,使用fieldsGrouping來進行word劃分統計,運行時會看到相同key值的tuple會分配到同一線程上。 spa

public static class SplitSentence implements IBasicBolt {
	public void prepare(Map conf, TopologyContext context) {
	}

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String sentence = tuple.getString(0);
		for (String word : sentence.split(" ")) { // 將Spout接收到的tuple按空格進行分解,產生單詞數據流
			collector.emit(new Values(word));
		}

	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word")); // 定義key值
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}
    單詞統計bolt:
public static class WordCount extends BaseBasicBolt {
	Map<String, Integer> counts = new HashMap<String, Integer>();

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String word = tuple.getString(0);
		Integer count = counts.get(word);
		if (count == null)
			count = 0;
		count++;
		counts.put(word, count);
		collector.emit(new Values(word, count)); // 輸出結果word + word number
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count")); 
	}
}

    總結: 線程

    一般狀況下,爲了保證數據可靠性與完整性,實現一個Bolt,能夠實現IRichBolt接口或繼承BaseRichBolt,若是不想本身處理結果反饋,能夠實現IBasicBolt接口或繼承BaseBasicBolt,至關於自動處理了prepare方法和collector.emit.ack(inputTuple); code

相關文章
相關標籤/搜索