本文主要研究一下storm的direct groupinghtml
direct grouping是一種特殊的grouping,它是由上游的producer直接指定下游哪一個task去接收它發射出來的tuple。direct grouping的使用有以下幾個步驟:apache
public class SentenceDirectBolt extends BaseRichBolt { private static final Logger LOGGER = LoggerFactory.getLogger(SentenceDirectBolt.class); private OutputCollector collector; private List<Integer> taskIds; private int numCounterTasks; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; //NOTE 1 這裏要取到下游的bolt的taskId,用於emitDirect時指定taskId this.taskIds = context.getComponentTasks("count-bolt"); this.numCounterTasks = taskIds.size(); } //...... }
這裏保存了下游的bolt的taskId列表,用於emitDirect時選擇taskId
public class SentenceDirectBolt extends BaseRichBolt { //...... public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); //NOTE 2 這裏要經過declareStream聲明direct stream,並指定streamId declarer.declareStream("directStreamDemo1",true,new Fields("word")); declarer.declareStream("directStreamDemo2",true,new Fields("word")); } }
這裏聲明瞭兩個streamId,一個是directStreamDemo1,一個是directStreamDemo2
public class SentenceDirectBolt extends BaseRichBolt { //...... public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ int targetTaskId = getWordCountTaskId(word); LOGGER.info("word:{} choose taskId:{}",word,targetTaskId); // NOTE 3 這裏指定發送給下游bolt的哪一個taskId,同時指定streamId if(targetTaskId % 2 == 0){ this.collector.emitDirect(targetTaskId,"directStreamDemo1",new Values(word)); }else{ this.collector.emitDirect(targetTaskId,"directStreamDemo2",new Values(word)); } } this.collector.ack(tuple); } }
這裏使用emitDirect(int taskId, String streamId, List<Object> tuple)方法指定了下游的taskId以及要發送到的streamId
@Test public void testDirectGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentence-spout", new SentenceSpout()); // SentenceSpout --> SplitSentenceBolt builder.setBolt("split-bolt", new SentenceDirectBolt()).shuffleGrouping("sentence-spout"); // SplitSentenceBolt --> WordCountBolt //NOTE 4這裏要指定上游的bolt以及要處理的streamId builder.setBolt("count-bolt", new WordCountBolt(),5).directGrouping("split-bolt","directStreamDemo1"); // WordCountBolt --> ReportBolt builder.setBolt("report-bolt", new ReportBolt()).globalGrouping("count-bolt"); submitRemote(builder); }
這裏count-bolt做爲split-bolt的下游,使用了directGrouping,同時指定了要接收的streamId爲directStreamDemo1