聊聊storm的stream的分流與合併

本文主要研究一下storm的stream的分流與合併html

實例

@Test
    public void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitStreamBolt
        builder.setBolt("split-bolt", new SplitStreamBolt())
                .shuffleGrouping("sentence-spout");
        // SplitStreamBolt split two stream --> WordCountBolt
        //NOTE 這裏要指定上游的bolt以及要處理的streamId
        builder.setBolt("long-word-count-bolt", new CountStreamBolt(),5)
                .shuffleGrouping("split-bolt","longWordStream");
        builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5)
                .shuffleGrouping("split-bolt","shortWordStream");
        // WordCountBolt join --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt())
                .shuffleGrouping("long-word-count-bolt")
                .shuffleGrouping("short-word-count-bolt");

        submitRemote(builder);
    }
  • 這裏在SplitStreamBolt裏頭將stream分爲兩個,以後有兩個CountStreamBolt分別處理兩個stream的數據,最後歸到同一個stream由ReportBolt消費tuple

SplitStreamBolt

public class SplitStreamBolt extends BaseRichBolt {

    private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);

    private OutputCollector collector;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    //NOTE 這裏要本身ack
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            // NOTE 這裏指定發送給指定streamId
            if(word.length() > 4){
                this.collector.emit("longWordStream",new Values(word));
            }else{
                this.collector.emit("shortWordStream",new Values(word));
            }
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        //NOTE 這裏經過declareStream聲明direct stream,並指定streamId
        declarer.declareStream("longWordStream",true,new Fields("word"));
        declarer.declareStream("shortWordStream",true,new Fields("word"));
    }
}
  • 這裏額外聲明瞭兩個stream,一個是longWordStream,一個是shortWordStream
  • 對於word長度大於4的發送到longWordStream,小於等於4的發送到longWordStream

CountStreamBolt

public class CountStreamBolt extends BaseBasicBolt{

    private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);

    Map<String, Integer> longWordCounts = new HashMap<String, Integer>();
    Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sourceStreamId = input.getSourceStreamId();

        String word = input.getString(0);

        if(sourceStreamId.equals("longWordStream")){
            Integer count = longWordCounts.get(word);
            if (count == null) count = 0;
            count++;
            longWordCounts.put(word, count);
            LOGGER.info("long word:{} -> {}",word,count);
            collector.emit(new Values(word, count));
            return ;
        }

        if(sourceStreamId.equals("shortWordStream")){
            Integer count = shortWordCounts.get(word);
            if (count == null) count = 0;
            count++;
            shortWordCounts.put(word, count);
            LOGGER.info("short word:{} -> {}",word,count);
            collector.emit(new Values(word, count));
            return ;
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • 這裏爲了展現sourceStreamId的區別,因此兩個stream共用了同一個bolt,可是topology那裏是兩個實例
  • 實際也能夠是兩個不一樣的bolt類來處理兩個stream的數據

小結

  • OutputFieldsDeclarer能夠經過declareStream方法聲明多個streamId
  • OutputCollector能夠經過emit(String streamId, List<Object> tuple)方法來選擇性將tuple發送到指定的streamId
  • OutputCollector也有emit方法參數沒有streamId,其內部默認是使用Utils.DEFAULT_STREAM_ID(default)做爲實際的streamId

doc

相關文章
相關標籤/搜索