storm集羣實例運行java
storm本地運行只須要storm的jar包就能夠了,結果能夠在控制檯直接看到,storm集羣運行,結果要在log日誌裏看,或者存儲下來。而且,集羣運行,execute方法裏的輸出能夠看到,可是cleanup裏的輸出是看不到的,由於cleanup只有在topology結束後纔會執行,而storm是實時連續的運行的,因此輸出放在execute裏或者保存起來查看。apache
wordcount實例代碼函數
代碼在前面的博客裏已經寫了只是將WordCounter作了點修改ui
package com.storm.stormDemo; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; import com.storm.stormTest.MergeObjects; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt { public static Logger LOG = Logger.getLogger(WordCounter.class); Integer id; String name; Map<String, Integer> counters; private OutputCollector collector; BufferedWriter output; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); try { output = new BufferedWriter(new FileWriter("/home/zhanghuan/Downloads/wordcount.txt" , true)); } catch (IOException e) { // TODO Auto-generated catch block try { output.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); } } public void execute(Tuple input) { String str = input.getString(0); if (!counters.containsKey(str)) { counters.put(str, 1); } else { Integer c = counters.get(str) + 1; counters.put(str, c); } Iterator<String> iterator = counters.keySet().iterator(); while(iterator.hasNext()){ String next = iterator.next(); try { System.out.print(next + ":" + counters.get(next) + " "); output.write(next + ":" + counters.get(next) + " "); output.flush(); } catch (IOException e) { e.printStackTrace(); try { output.close(); } catch (IOException e1) { e1.printStackTrace(); } } } // 確認成功處理一個tuple collector.ack(input); } /** * Topology執行完畢的清理工做,好比關閉鏈接、釋放資源等操做都會寫在這裏 * 由於這只是個Demo,咱們用它來打印咱們的計數器 * */ public void cleanup() { LOG.info("-- Word Counter [" + name + "-" + id + "] --"); for (Map.Entry<String, Integer> entry : counters.entrySet()) { LOG.info(entry.getKey() + ": " + entry.getValue()); } counters.clear(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub //declarer.declare(new Fields("word","number")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
集羣運行this
storm jar StormDemo.jar com.storm.stormDemo.WordCountTopologyMain StormDemo /home/zhanghuan/Downloads/test.txtspa
注意:主函數路徑要寫全日誌
若是集羣報錯以下:code
則打開你打的第三方jar包文件夾,在裏面找到storm-core-0.10.0.jar,刪除這個jar包裏的default.yarml文件,或則刪掉你打的storm jar包。orm
topology提交後,會啓動相應數量的worker進程和logwriter進程,ui界面上也能看到這個topology的運行進程
這時候你就能夠查看log日誌文件或者存儲位置,查看結果。
沒有數據輸入的時候,日誌就像最下方同樣,保持通訊。
中止topology運行
storm kill topology的名字