storm自定義數據分組

數據流組緩存

設計一個拓撲時,你要作的最重要的事情之一就是定義如何在各組件之間交換數據(數據流是如何被bolts消費的)。一個數據流組指定了每一個bolt會消費哪些數據流,以及如何消費它們。 ide

storm自帶數據流組oop

隨機數據流組ui

隨機流組是最經常使用的數據流組。它只有一個參數(數據源組件),而且數據源會向隨機選擇的bolt發送元組,保證每一個消費者收到近似數量的元組。this

 builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

 域數據流組spa

域數據流組容許你基於元組的一個或多個域控制如何把元組發送給bolts。它保證擁有相同域組合的值集發送給同一個bolt。回到單詞計數器的例子,若是你用word域爲數據流分組,word-normalizer bolt將只會把相同單詞的元組發送給同一個word-counterbolt實例。設計

 builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));

所有數據流組code

所有數據流組,爲每一個接收數據的實例複製一份元組副本。這種分組方式用於向bolts發送信號。好比,你要刷新緩存,你能夠向全部的bolts發送一個刷新緩存信號。在單詞計數器的例子裏,你可使用一個所有數據流組,添加清除計數器緩存的功能 orm

builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");

直接數據流組接口

這是一個特殊的數據流組,數據源能夠用它決定哪一個組件接收元組

 builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

。與前面的例子相似,數據源將根據單詞首字母決定由哪一個bolt接收元組。要使用直接數據流組,在WordNormalizer bolt中,使用emitDirect方法代替emit。

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //對元組作出應答
        collector.ack(input);
    }
    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

在prepare方法中計算任務數

 public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

全局數據流組

全局數據流組把全部數據源建立的元組發送給單一目標實例(即擁有最低ID的任務)。

不分組

這個數據流組至關於隨機數據流組。也就是說,使用這個數據流組時,並不關心數據流是如何分組的。

自定義數據流組

storm自定義數據流組和hadoop Partitioner分組很類似,storm自定義分組要實現CustomStreamGrouping接口,接口源碼以下:

public   interface   CustomStreamGrouping  extends   Serializable {
 
    void   prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
 
    List<Integer> chooseTasks( int   taskId, List<Object> values);
}

targetTasks就是Storm運行時告訴你,當前有幾個目標Task能夠選擇,每個都給編上了數字編號。而 chooseTasks(int taskId, List values); 就是讓你選擇,你的這條數據values,是要哪幾個目標Task處理?

這是我寫的一個自定義分組,老是把數據分到第一個Task:

public   class   MyFirstStreamGrouping  implements   CustomStreamGrouping {
     private   static   Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping. class );
 
     private   List<Integer> tasks;
 
     @Override
     public   void   prepare(WorkerTopologyContext context, GlobalStreamId stream,
         List<Integer> targetTasks) {
         this .tasks = targetTasks;
         log.info(tasks.toString());
     }  
     @Override
     public   List<Integer> chooseTasks( int   taskId, List<Object> values) {
         log.info(values.toString());
         return   Arrays.asList(tasks.get( 0 ));
     }
}

從上面的代碼能夠看出,該自定義分組會把數據歸併到第一個TaskArrays.asList(tasks.get(0));,也就是數據到達後老是被派發到第一組。和Hadoop不一樣的是,Storm容許一條數據被多個Task處理,所以返回值是List .就是讓你來在提供的 'List targetTasks' Task中選擇任意的幾個(必須至少是一個)Task來處理數據。

第二個自定義分組,wordcount中使首字母相同的單詞交給同一個bolt處理:

public class ModuleGrouping implements CustormStreamGrouping{
        int numTasks = 0;
        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }
        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

這是一個CustomStreamGrouping的簡單實現,在這裏咱們採用單詞首字母字符的整數值與任務數的餘數,決定接收元組的bolt。

builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());
相關文章
相關標籤/搜索