storm入門(二):關於storm中某一段時間內topN的計算入門

剛剛接觸storm 對於滑動窗口的topN複雜模型有一些不理解,經過閱讀其餘的博客發現有兩篇關於topN的非滑動窗口的介紹。而後轉載過來。css

下面是第一種:html

Storm的另外一種常見模式是對流式數據進行所謂「streaming top N」的計算,它的特色是持續的在內存中按照某個統計指標(如出現次數)計算TOP N,而後每隔必定時間間隔輸出實時計算後的TOP N結果。java

流式數據的TOP N計算的應用場景不少,例如計算twitter上最近一段時間內的熱門話題、熱門點擊圖片等等。安全

下面結合Storm-Starter中的例子,介紹一種能夠很容易進行擴展的實現方法:首先,在多臺機器上並行的運行多個Bolt,每一個Bolt負責一部分數據的TOP N計算,而後再有一個全局的Bolt來合併這些機器上計算出來的TOP N結果,合併後獲得最終全局的TOP N結果。dom

該部分示例代碼的入口是RollingTopWords類,用於計算文檔中出現次數最多的N個單詞。首先看一下這個Topology結構:ide

 

Topology構建的代碼以下:性能

 
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout(), 5);
        builder.setBolt("count", new RollingCountObjects(60, 10), 4)
                 .fieldsGrouping("word", new Fields("word"));
        builder.setBolt("rank", new RankObjects(TOP_N), 4)
                 .fieldsGrouping("count", new Fields("obj"));
        builder.setBolt("merge", new MergeObjects(TOP_N))
                 .globalGrouping("rank");
 

 

(1)首先,TestWordSpout()是Topology的數據源Spout,持續隨機生成單詞發出去,產生數據流「word」,輸出Fields是「word」,核心代碼以下:測試

 
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
  }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
  }
 

 

(2)接下來,「word」流入RollingCountObjects這個Bolt中進行word count計算,爲了保證同一個word的數據被髮送到同一個Bolt中進行處理,按照「word」字段進行field grouping;在RollingCountObjects中會計算各個word的出現次數,而後產生「count」流,輸出「obj」和「count」兩個Field,其中對於synchronized的線程鎖咱們也能夠換成安全的容器,好比ConcurrentHashMap等組件。核心代碼以下:ui

 
    public void execute(Tuple tuple) {

        Object obj = tuple.getValue(0);
        int bucket = currentBucket(_numBuckets);
        synchronized(_objectCounts) {
            long[] curr = _objectCounts.get(obj);
            if(curr==null) {
                curr = new long[_numBuckets];
                _objectCounts.put(obj, curr);
            }
            curr[bucket]++;
            _collector.emit(new Values(obj, totalObjects(obj)));
            _collector.ack(tuple);
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("obj", "count"));
    }
 

(3)而後,RankObjects這個Bolt按照「count」流的「obj」字段進行field grouping;在Bolt內維護TOP N個有序的單詞,若是超過TOP N個單詞,則將排在最後的單詞踢掉,同時每一個必定時間(2秒)產生「rank」流,輸出「list」字段,輸出TOP N計算結果到下一級數據流「merge」流,核心代碼以下:this

 
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        Object tag = tuple.getValue(0);
        Integer existingIndex = _find(tag);
        if (null != existingIndex) {
            _rankings.set(existingIndex, tuple.getValues());
        } else {
            _rankings.add(tuple.getValues());
        }
        Collections.sort(_rankings, new Comparator<List>() {
            public int compare(List o1, List o2) {
                return _compare(o1, o2);
            }
        });
        if (_rankings.size() > _count) {
            _rankings.remove(_count);
        }
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(new ArrayList(_rankings)));
            _lastTime = currentTime;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }
 

(4)最後,MergeObjects這個Bolt按照「rank」流的進行全局的grouping,即全部上一級Bolt產生的「rank」流都流到這個「merge」流進行;MergeObjects的計算邏輯和RankObjects相似,只是將各個RankObjects的Bolt合併後計算獲得最終全局的TOP N結果,核心代碼以下:

 
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        List<List> merging = (List) tuple.getValue(0);
        for(List pair : merging) {
            Integer existingIndex = _find(pair.get(0));
            if (null != existingIndex) {
                _rankings.set(existingIndex, pair);
            } else {
                _rankings.add(pair);
            }

            Collections.sort(_rankings, new Comparator<List>() {
                public int compare(List o1, List o2) {
                    return _compare(o1, o2);
                }
            });

            if (_rankings.size() > _count) {
                _rankings.subList(_count, _rankings.size()).clear();
            }
        }

        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(new ArrayList(_rankings)));
            LOG.info("Rankings: " + _rankings);
            _lastTime = currentTime;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }
 

另外,還有一種很聰明的方法,只在execute中插入數據而不emit,而在prepare中進行emit,建立線程根據時間進行監聽。

  1. package test.storm.topology;
  2. import test.storm.bolt.WordCounter;
  3. import test.storm.bolt.WordWriter;
  4. import test.storm.spout.WordReader;
  5. import backtype.storm.Config;
  6. import backtype.storm.StormSubmitter;
  7. import backtype.storm.generated.AlreadyAliveException;
  8. import backtype.storm.generated.InvalidTopologyException;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. public class WordTopN {
  12.     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
  13.         if (args == null || args.length < 1) {  
  14.             System.err.println("Usage: N");
  15.             System.err.println("such as : 10");
  16.             System.exit(-1);
  17.         }
  18.         TopologyBuilder builder = new TopologyBuilder();
  19.         builder.setSpout("wordreader", new WordReader(), 2);
  20.         builder.setBolt("wordcounter", new WordCounter(), 2).fieldsGrouping("wordreader", new Fields("word"));
  21.         builder.setBolt("wordwriter", new WordWriter()).globalGrouping("wordcounter");
  22.         Config conf = new Config();
  23.         conf.put("N", args[0]);
  24.         conf.setDebug(false);
  25.         StormSubmitter.submitTopology("topN", conf, builder.createTopology());
  26.     }
  27. }

這裏須要注意的幾點是,第一個bolt的分組策略是fieldsGrouping,按照字段分組,這一點很重要,它能保證相同的word被分發到同一個bolt上,
像作wordcount、TopN之類的應用就要使用這種分組策略。
最後一個bolt的分組策略是globalGrouping,全局分組,tuple會被分配到一個bolt用來彙總。
爲了提升並行度,spout和第一個bolt均設置並行度爲2(我這裏測試機器性能不是很高)。

點擊(此處)摺疊或打開

  1. package test.storm.spout;
  2. import java.util.Map;
  3. import java.util.Random;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. import backtype.storm.spout.SpoutOutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichSpout;
  9. import backtype.storm.tuple.Fields;
  10. import backtype.storm.tuple.Values;
  11. public class WordReader extends BaseRichSpout {
  12.     private static final long serialVersionUID = 2197521792014017918L;
  13.     private SpoutOutputCollector collector;
  14.     private static AtomicInteger i = new AtomicInteger();
  15.     private static String[] words = new String[] { \"a\", \"b\", \"c\", \"d\", \"e\", \"f\", \"g\", \"h\", \"i\", \"j\", \"k\", \"l\", \"m\",
  16.             \"n\", \"o\", \"p\", \"q\", \"r\", \"s\", \"t\", \"u\", \"v\", \"w\", \"x\", \"y\", \"z\" };
  17.     @Override
  18.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  19.         this.collector = collector;
  20.     }
  21.     @Override
  22.     public void nextTuple() {
  23.         if (i.intValue() < 100) {
  24.             Random rand = new Random();
  25.             String word = words[rand.nextInt(words.length)];
  26.             collector.emit(new Values(word));
  27.             i.incrementAndGet();
  28.         }
  29.     }
  30.     @Override
  31.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  32.         declarer.declare(new Fields("word"));
  33.     }
  34. }

spout的做用是隨機發送word,發送100次,因爲並行度是2,將產生2個spout實例,因此這裏的計數器使用了static的AtomicInteger來保證線程安全。


點擊(此處)摺疊或打開

  1. package test.storm.bolt;
  2. import java.util.ArrayList;
  3. import java.util.Collections;
  4. import java.util.Comparator;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Map.Entry;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. import backtype.storm.task.OutputCollector;
  11. import backtype.storm.task.TopologyContext;
  12. import backtype.storm.topology.IRichBolt;
  13. import backtype.storm.topology.OutputFieldsDeclarer;
  14. import backtype.storm.tuple.Fields;
  15. import backtype.storm.tuple.Tuple;
  16. import backtype.storm.tuple.Values;
  17. public class WordCounter implements IRichBolt {
  18.     private static final long serialVersionUID = 5683648523524179434L;
  19.     private static Map<String, Integer> counters = new ConcurrentHashMap<String, Integer>();
  20.     private volatile boolean edit = true;
  21.     @Override
  22.     public void prepare(final Map stormConf, TopologyContext context, final OutputCollector collector) {
  23.         new Thread(new Runnable() {
  24.             @Override
  25.             public void run() {
  26.                 while (true) {
  27.                     //5秒後counter再也不變化,能夠認爲spout已經發送完畢
  28.                     if (!edit) {
  29.                         if (counters.size() > 0) {
  30.                             List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>();
  31.                             list.addAll(counters.entrySet());
  32.                             Collections.sort(list, new ValueComparator());
  33.                             //向下一個bolt發送前N個word
  34.                             for (int i = 0; i < list.size(); i++) {
  35.                                 if (i < Integer.parseInt(stormConf.get("N").toString())) {
  36.                                     collector.emit(new Values(list.get(i).getKey() + ":" + list.get(i).getValue()));
  37.                                 }
  38.                             }
  39.                         }
  40.                         //發送以後,清空counters,以防spout再次發送word過來
  41.                         counters.clear();
  42.                     }
  43.                     edit = false;
  44.                     try {
  45.                         Thread.sleep(5000);
  46.                     } catch (InterruptedException e) {
  47.                         e.printStackTrace();
  48.                     }
  49.                 }
  50.             }
  51.         }).start();
  52.     }
  53.     @Override
  54.     public void execute(Tuple tuple) {
  55.         String str = tuple.getString(0);
  56.         if (counters.containsKey(str)) {
  57.             Integer c = counters.get(str) + 1;
  58.             counters.put(str, c);
  59.         } else {
  60.             counters.put(str, 1);
  61.         }
  62.         edit = true;
  63.     }
  64.     private static class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
  65.         @Override
  66.         public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
  67.             return entry2.getValue() - entry1.getValue();
  68.         }
  69.     }
  70.     @Override
  71.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  72.         declarer.declare(new Fields("word_count"));
  73.     }
  74.     @Override
  75.     public void cleanup() {
  76.     }
  77.     @Override
  78.     public Map<String, Object> getComponentConfiguration() {
  79.         return null;
  80.     }
  81. }

在WordCounter裏面有個線程安全的容器ConcurrentHashMap,來存儲word以及對應的次數。在prepare方法裏啓動一個線程,長期監聽edit的狀態,監聽間隔是5秒,
當edit爲false,即execute方法再也不執行、容器再也不變化,能夠認爲spout已經發送完畢了,能夠開始排序取TopN了。這裏使用了一個volatile edit(回憶一下volatile的使用場景:
對變量的修改不依賴變量當前的值,這裏設置true or false,顯然不相互依賴)。


點擊(此處)摺疊或打開

  1. package test.storm.bolt;
  2. import java.io.FileWriter;
  3. import java.io.IOException;
  4. import java.util.Map;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.BasicOutputCollector;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseBasicBolt;
  9. import backtype.storm.tuple.Tuple;
  10. public class WordWriter extends BaseBasicBolt {
  11.     private static final long serialVersionUID = -6586283337287975719L;
  12.     private FileWriter writer = null;
  13.     public WordWriter() {
  14.     }
  15.     @Override
  16.     public void prepare(Map stormConf, TopologyContext context) {
  17.         try {
  18.             writer = new FileWriter("/data/tianzhen/output/" + this);
  19.         } catch (IOException e) {
  20.             e.printStackTrace();
  21.         }
  22.     }
  23.     @Override
  24.     public void execute(Tuple input, BasicOutputCollector collector) {
  25.         String s = input.getString(0);
  26.         try {
  27.             writer.write(s);
  28.             writer.write("\n");
  29.             writer.flush();
  30.         } catch (IOException e) {
  31.             e.printStackTrace();
  32.         } finally {
  33.             //writer不能close,由於execute須要一直運行
  34.         }
  35.     }
  36.     @Override
  37.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  38.     }
  39. }

最後一個bolt作全局的彙總,這裏我偷了懶,直接將結果寫到文件了,省略截取TopN的過程,由於我這裏就一個supervisor節點,因此結果是正確的。

引用鏈接:http://blog.itpub.net/28912557/viewspace-1579860/

     http://www.cnblogs.com/panfeng412/archive/2012/06/16/storm-common-patterns-of-streaming-top-n.html

相關文章
相關標籤/搜索