Apache Storm中內置了一種定時機制——tick,它可以讓任何bolt的全部task每隔一段時間(精確到秒級,用戶能夠自定義)收到一個來自__systemd的__tick stream的tick tuple,bolt收到這樣的tuple後能夠根據業務需求完成相應的處理。java
Tick功能從Apache Storm 0.8.0版本開始支持,本文在Apache Storm 0.9.1上測試。框架
在代碼中如需使用tick,能夠參照下面的方式:dom
若但願某個bolt每隔一段時間作一些操做,那麼能夠將bolt繼承BaseBasicBolt/BaseRichBolt,並重寫getComponentConfiguration()方法。在方法中設置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,單位是秒。ide
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定義的方法,在此方法的實現中能夠定義以」Topology.*」開頭的此bolt特定的Config。測試
這樣設置以後,此bolt的全部task都會每隔一段時間收到一個來自__systemd的__tick stream的tick tuple,所以execute()方法能夠實現以下:ui
若但願Topology中的每一個bolt都每隔一段時間作一些操做,那麼能夠定義一個Topology全局的tick,一樣是設置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:spa
與Linux中的環境變量的優先級相似,storm中的tick也有優先級,即全局tick的做用域是全局bolt,但對每一個bolt其優先級低於此bolt定義的tick。orm
這個參數的名字TOPOLOGY_TICK_TUPLE_FREQ_SECS具備必定的迷惑性,一眼看上去應該是Topology全局的,但實際上每一個bolt也能夠本身定義。blog
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精確到秒級的。例如某bolt設置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS爲10s,理論上說bolt的每一個task應該每一個10s收到一個tick tuple。實際測試發現,這個時間間隔的精確性是很高的,通常延遲(而不是提早)時間在1ms左右。測試環境:3臺虛擬機作supervisor,每臺配置:4Cpu、16G內存、千兆網卡。繼承
在bolt中的getComponentConfiguration()定義了該bolt的特定的配置後,storm框架會在TopologyBuilder.setBolt()方法中調用bolt的getComponentConfiguration()方法,從而設置該bolt的配置。
調用路徑爲:TopologyBuilder.setBolt()
-> TopologyBuilder.initCommon()
-> getComponentConfiguration()
測試使用的代碼:
package storm.starter; import backtype.storm.Config; import backtype.storm.Constants; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; public class MyTickTestTopology { public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ System.out.println("################################WorldCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date())); } else{ collector.emit(new Values("a", 1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10); return conf; } } public static class TickTest extends BaseBasicBolt{ @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 收到的tuple是tick tuple if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ System.out.println("################################TickTest bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date())); } // 收到的tuple時正常的tuple else{ collector.emit(new Values("a")); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("test")); } @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,20); return conf; } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 3); builder.setBolt("count", new WordCount(), 3).shuffleGrouping("spout"); builder.setBolt("tickTest", new TickTest(), 3).shuffleGrouping("count"); Config conf = new Config(); conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7); conf.setDebug(false); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); // Thread.sleep(10000); // cluster.shutdown(); } } }