聊聊storm的direct grouping

本文主要研究一下storm的direct groupinghtml

direct grouping

direct grouping是一種特殊的grouping,它是由上游的producer直接指定下游哪一個task去接收它發射出來的tuple。direct grouping的使用有以下幾個步驟:apache

一、上游在prepare方法保存下游bolt的taskId列表

public class SentenceDirectBolt extends BaseRichBolt {

    private static final Logger LOGGER = LoggerFactory.getLogger(SentenceDirectBolt.class);

    private OutputCollector collector;

    private List<Integer> taskIds;

    private int numCounterTasks;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        //NOTE 1 這裏要取到下游的bolt的taskId,用於emitDirect時指定taskId
        this.taskIds = context.getComponentTasks("count-bolt");
        this.numCounterTasks = taskIds.size();
    }
    //......
}
這裏保存了下游的bolt的taskId列表,用於emitDirect時選擇taskId

二、上游在declareOutputFields使用declareStream聲明streamId

public class SentenceDirectBolt extends BaseRichBolt {
    //......
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        //NOTE 2 這裏要經過declareStream聲明direct stream,並指定streamId
        declarer.declareStream("directStreamDemo1",true,new Fields("word"));
        declarer.declareStream("directStreamDemo2",true,new Fields("word"));
    }
}
這裏聲明瞭兩個streamId,一個是directStreamDemo1,一個是directStreamDemo2

三、上游採用emitDirect指定下游taskId及streamId

public class SentenceDirectBolt extends BaseRichBolt {
    //......
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            int targetTaskId = getWordCountTaskId(word);
            LOGGER.info("word:{} choose taskId:{}",word,targetTaskId);
            // NOTE 3 這裏指定發送給下游bolt的哪一個taskId,同時指定streamId
            if(targetTaskId % 2 == 0){
                this.collector.emitDirect(targetTaskId,"directStreamDemo1",new Values(word));
            }else{
                this.collector.emitDirect(targetTaskId,"directStreamDemo2",new Values(word));
            }
        }
        this.collector.ack(tuple);
    }
}
這裏使用emitDirect(int taskId, String streamId, List<Object> tuple)方法指定了下游的taskId以及要發送到的streamId

四、下游使用directGrouping鏈接上游bolt及streamId

@Test
    public void testDirectGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt("split-bolt", new SentenceDirectBolt()).shuffleGrouping("sentence-spout");
        // SplitSentenceBolt --> WordCountBolt
        //NOTE 4這裏要指定上游的bolt以及要處理的streamId
        builder.setBolt("count-bolt", new WordCountBolt(),5).directGrouping("split-bolt","directStreamDemo1");
        // WordCountBolt --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt()).globalGrouping("count-bolt");

        submitRemote(builder);
    }
這裏count-bolt做爲split-bolt的下游,使用了directGrouping,同時指定了要接收的streamId爲directStreamDemo1

小結

  • direct grouping是一種特殊的grouping,它是由上游的producer直接指定下游哪一個task去接收它發射出來的tuple。
  • 下游使用directGrouping鏈接上游同時指定要消費的streamId,上游在prepare的時候保存下游的taskId列表,而後在declareOutputFields的時候使用declareStream來聲明streamId,最後在execute方法裏頭使用emitDirect(int taskId, String streamId, List<Object> tuple)方法指定了下游的taskId以及要發送到的streamId

doc

相關文章
相關標籤/搜索