Twitter Storm Stream Grouping編寫自定義分組實現

##自定義Grouping測試ide

Storm是支持自定義分組的,本篇文章就是探究Storm如何編寫一個自定義分組器,以及對Storm分組器如何分組數據的理解。oop

這是我寫的一個自定義分組,老是把數據分到第一個Task:測試

public class MyFirstStreamGrouping implements CustomStreamGrouping {
    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);

    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
		List<Integer> targetTasks) {
	    this.tasks = targetTasks;
	    log.info(tasks.toString());
    }	
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
	    log.info(values.toString());
	    return Arrays.asList(tasks.get(0));
    }
}

從上面的代碼能夠看出,該自定義分組會把數據歸併到第一個Task<code>Arrays.asList(tasks.get(0));</code>,也就是數據到達後老是被派發到第一組。ui

測試代碼:this

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
//自定義分組,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
	    .customGrouping("words", new MyFirstStreamGrouping());

和以前的測試用例同樣,Spout老是發送<code>new String[] {「nathan」, 「mike」, 「jackson」, 「golda」, 「bertels」}</code>列表的字符串。咱們運行驗證一下:線程

11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

從這個運行日誌咱們能夠看出,數據老是派發到一個Blot:Thread-25-exclaim1。由於我時本地測試,Thread-25-exclaim1是線程名。而派發的線程是數據多個線程的。所以該測試符合預期,即老是發送到一個Task,而且這個Task也是第一個。日誌

##理解自定義分組實現code

本身實現一個自定義分組難嗎?其實若是你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是同樣的道理。orm

Hadoop MapReduce的Map完成後會把Map的中間結果寫入磁盤,在寫磁盤前,線程首先根據數據最終要傳送到的Reducer把數據劃分紅相應的分區,而後不一樣的分區進入不一樣的Reduce。咱們先來看看Hadoop是怎樣把數據怎樣分組的,這是Partitioner惟一一個方法:字符串

public class Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numReduceTasks) {
        return 0;
    }
}

上面的代碼中:Map輸出的數據都會通過getPartition()方法,用來肯定下一步的分組。numReduceTasks是一個Job的Reduce數量,而返回值就是肯定該條數據進入哪一個Reduce。返回值必須大於等於0,小於numReduceTasks,不然就會報錯。返回0就意味着這條數據進入第一個Reduce。對於隨機分組來講,這個方法能夠這麼實現:

public int getPartition(K key, V value, int numReduceTasks) {
    return hash(key) % numReduceTasks;
}

其實Hadoop 默認的Hash分組策略也正是這麼實現的。這樣好處是,數據在整個集羣基本上是負載平衡的。

搞通了Hadoop的Partitioner,咱們來看看Storm的CustomStreamGrouping。

這是CustomStreamGrouping類的源碼:

public interface CustomStreamGrouping extends Serializable {

   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

如出一轍的道理,targetTasks就是Storm運行時告訴你,當前有幾個目標Task能夠選擇,每個都給編上了數字編號。而 <code> chooseTasks(int taskId, List<Object> values); </code> 就是讓你選擇,你的這條數據values,是要哪幾個目標Task處理?

如上文文章開頭的自定義分組器實現的代碼,我選擇的老是讓第一個Task來處理數據,<code> return Arrays.asList(tasks.get(0)); </code> 。和Hadoop不一樣的是,Storm容許一條數據被多個Task處理,所以返回值是List<Integer>.就是讓你來在提供的 'List<Integer> targetTasks' Task中選擇任意的幾個(必須至少是一個)Task來處理數據。

由此,Storm的自定義分組策略也就不那麼麻煩了吧?

相關文章
相關標籤/搜索