storm集羣WordCount實例

storm集羣實例運行java

storm本地運行只須要storm的jar包就能夠了,結果能夠在控制檯直接看到,storm集羣運行,結果要在log日誌裏看,或者存儲下來。而且,集羣運行,execute方法裏的輸出能夠看到,可是cleanup裏的輸出是看不到的,由於cleanup只有在topology結束後纔會執行,而storm是實時連續的運行的,因此輸出放在execute裏或者保存起來查看。apache

wordcount實例代碼函數

代碼在前面的博客裏已經寫了只是將WordCounter作了點修改ui

package com.storm.stormDemo;  
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;  
import java.util.Iterator;
import java.util.Map; 
import org.apache.log4j.Logger;
import com.storm.stormTest.MergeObjects;
import backtype.storm.task.OutputCollector;  
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichBolt;  
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;  
  
public class WordCounter implements IRichBolt {
 public static Logger LOG = Logger.getLogger(WordCounter.class);
    Integer id;  
    String name;  
    Map<String, Integer> counters;  
    private OutputCollector collector;
    BufferedWriter output;
  
    public void prepare(Map stormConf, TopologyContext context,  
            OutputCollector collector) {  
        this.counters = new HashMap<String, Integer>();  
        this.collector = collector;  
        this.name = context.getThisComponentId();  
        this.id = context.getThisTaskId();  
        try {
   output = new BufferedWriter(new FileWriter("/home/zhanghuan/Downloads/wordcount.txt" , true));
  } catch (IOException e) {
   // TODO Auto-generated catch block
   try {
    output.close();
   } catch (IOException e1) {
    // TODO Auto-generated catch block
    e1.printStackTrace();
   }
   e.printStackTrace();
  }
  
    }  
    public void execute(Tuple input) {  
        String str = input.getString(0);  
        if (!counters.containsKey(str)) {  
            counters.put(str, 1);  
        } else {  
            Integer c = counters.get(str) + 1;  
            counters.put(str, c);  
        }  
        Iterator<String> iterator = counters.keySet().iterator();
        while(iterator.hasNext()){
         String next = iterator.next();
         try {
          System.out.print(next + ":" + counters.get(next) + " ");
    output.write(next + ":" + counters.get(next) + " ");
    output.flush();
   } catch (IOException e) {
    e.printStackTrace();
    try {
     output.close();
    } catch (IOException e1) {
     e1.printStackTrace();
    }
   }
        }
        // 確認成功處理一個tuple
        collector.ack(input);  
    }  
    /** 
     * Topology執行完畢的清理工做,好比關閉鏈接、釋放資源等操做都會寫在這裏 
     * 由於這只是個Demo,咱們用它來打印咱們的計數器 
     * */  
    public void cleanup() {  
     LOG.info("-- Word Counter [" + name + "-" + id + "] --");  
        for (Map.Entry<String, Integer> entry : counters.entrySet()) {  
         LOG.info(entry.getKey() + ": " + entry.getValue());  
        }  
        counters.clear();  
    }  
    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
        // TODO Auto-generated method stub  
     //declarer.declare(new Fields("word","number"));
  
    }  
    public Map<String, Object> getComponentConfiguration() {  
        // TODO Auto-generated method stub  
        return null;  
    }  
}

集羣運行this

storm jar StormDemo.jar com.storm.stormDemo.WordCountTopologyMain StormDemo /home/zhanghuan/Downloads/test.txtspa

注意:主函數路徑要寫全日誌

若是集羣報錯以下:code

則打開你打的第三方jar包文件夾,在裏面找到storm-core-0.10.0.jar,刪除這個jar包裏的default.yarml文件,或則刪掉你打的storm jar包。orm

topology提交後,會啓動相應數量的worker進程和logwriter進程,ui界面上也能看到這個topology的運行進程

這時候你就能夠查看log日誌文件或者存儲位置,查看結果。

沒有數據輸入的時候,日誌就像最下方同樣,保持通訊。

中止topology運行

storm kill topology的名字

相關文章
相關標籤/搜索