Storm中,Spout和Bolt都是Component。Storm定義了一個名叫IComponent的總接口java
ØSpout的最頂層抽象是ISpout接口。數據庫
一般狀況下(Shell和事務型的除外),實現一個Spout,能夠直接實現接口IRichSpout,併發
若是不想寫多餘的代碼,能夠直接繼承BaseRichSpout。dom
ØBolt的最頂層抽象是IBolt接口分佈式
Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功ide
主要開發流程:工具
1、實現Spout接口IRichSpout或者繼承類BaseRichSpoutui
2、實現Bolt接口IRichBolt或者IBasicBolt或者繼承類BaseBasicBoutthis
3、使用TopologyBuilder定義Topology(setSpout/ setBolt/ Grouping,spa
每一個Spout和Bolt的併發執行單元數和併發任務數在此設置);
4、配置TOPOLOGY_WORKERS等參數;
5、利用TopologyBuilder方法createTopology建立拓撲;
6、提交拓撲 StormSubmitter.submitTopology。
Topology
/** * 定義了一個簡單的topology,包括一個數據噴發節點spout和一個數據處理節點bolt。 */ public class SimpleTopology { public static void main(String[] args) { try { // 實例化TopologyBuilder類。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 設置噴發節點並分配併發數,該併發數將會控制該對象在集羣中的線程數。 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); // 設置數據處理節點並分配併發數。指定該節點接收噴發節點的策略爲隨機方式。 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping( "SimpleSpout"); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 這裏是本地模式下運行的啓動代碼。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology } } } }
spout
/** * Spout起到和外界溝通的做用,他能夠從一個數據庫中按照某種規則取數據,也能夠從分佈式隊列中取任務 */ @SuppressWarnings("serial") public class SimpleSpout extends BaseRichSpout { // 用來發射數據的工具類 private SpoutOutputCollector collector; private static String[] info = new String[] { "comaple\t,12424,44w46,654,12424,44w46,654,", "lisi\t,435435,6537,12424,44w46,654,", "lipeng\t,45735,6757,12424,44w46,654,", "hujintao\t,45735,6757,12424,44w46,654,", "jiangmin\t,23545,6457,2455,7576,qr44453", "beijing\t,435435,6537,12424,44w46,654,", "xiaoming\t,46654,8579,w3675,85877,077998,", "xiaozhang\t,9789,788,97978,656,345235,09889,", "ceo\t,46654,8579,w3675,85877,077998,", "cto\t,46654,8579,w3675,85877,077998,", "zhansan\t,46654,8579,w3675,85877,077998," }; Random random = new Random(); /** * 初始化collector */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker類中被調用,每調用一次就能夠向storm集羣中發射一條數據(一個tuple元組),該方法會被不停的調用 */ @Override public void nextTuple() { try { String msg = info[random.nextInt(11)]; // 調用發射方法 collector.emit(new Values(msg)); // 模擬等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定義字段id,該id在簡單模式下沒有用處,但在按照字段分組的模式下有很大的用處。 * 該declarer變量有很大做用,咱們還能夠調用declarer.declareStream();來定義stramId,該id能夠用來定義更加複雜的流拓撲結構 */ @Override public void declareOutputFields(OutputFieldsD
bolt
package com.zmq.helloword; /** * @author zhangmq(工號:68598) Tel:☎ * @version 1.0 * @since 2014-8-26 下午2:46:53 * @category com.ljq.helloword * @copyright 南京聯創科技 網管科技部 */ import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 接收噴發節點(Spout)發送的數據進行簡單的處理後,發射出去。 */ @SuppressWarnings("serial") public class SimpleBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null) { // System.out.println("msg="+msg); collector.emit(new Values(msg + "msg is processed!")); } } catch (Exception e) { e.printStackTrace(); } }