而後卡一下代碼怎麼實現的:php
先來看一下代碼結構:java
就如上圖所說,數據從PWSpout流到PrintBolt,最後到WriteBolt寫到文件。具體看一下這幾個類的代碼:python
先看一本地模式的:apache
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import bhz.bolt.PrintBolt; import bhz.bolt.WriteBolt; import bhz.spout.PWSpout; public class PWTopology1 { public static void main(String[] args) throws Exception { // Config cfg = new Config(); cfg.setNumWorkers(2); cfg.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new PWSpout()); builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout"); builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt"); //1 本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("top1", cfg, builder.createTopology()); Thread.sleep(10000); cluster.killTopology("top1"); cluster.shutdown(); //2 集羣模式 // StormSubmitter.submitTopology("top1", cfg, builder.createTopology()); } }
代碼分析:ruby
import java.util.HashMap; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class PWSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private static final Map<Integer, String> map = new HashMap<Integer, String>(); static { map.put(0, "java"); map.put(1, "php"); map.put(2, "groovy"); map.put(3, "python"); map.put(4, "ruby"); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //對spout進行初始化 this.collector = collector; //System.out.println(this.collector); } /** * <B>方法名稱:</B>輪詢tuple<BR> * <B>概要說明:</B><BR> * @see backtype.storm.spout.ISpout#nextTuple() */ @Override public void nextTuple() { //隨機發送一個單詞 final Random r = new Random(); int num = r.nextInt(5); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } this.collector.emit(new Values(map.get(num))); } /** * <B>方法名稱:</B>declarer聲明發送數據的field<BR> * <B>概要說明:</B><BR> * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer) */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //進行聲明 declarer.declare(new Fields("print")); } }
代碼解析:框架
總體結構dom
細入分析ide
---------------------------- open 方法---------------------------------------------------------函數
--------------------------------- nextTuple方法 --------------------------------------------------------------測試
---------------------------- declareOutputFields方法 ----------------------------------------------------
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class PrintBolt extends BaseBasicBolt { private static final Log log = LogFactory.getLog(PrintBolt.class); private static final long serialVersionUID = 1L; @Override public void execute(Tuple input, BasicOutputCollector collector) { //獲取上一個組件所聲明的Field String print = input.getStringByField("print"); log.info("【print】: " + print); //System.out.println("Name of input word is : " + word); //進行傳遞給下一個bolt collector.emit(new Values(print)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("write")); } }
代碼分析
import java.io.FileWriter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import clojure.main; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class WriteBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; private static final Log log = LogFactory.getLog(WriteBolt.class); private FileWriter writer ; @Override public void execute(Tuple input, BasicOutputCollector collector) { //獲取上一個組件所聲明的Field String text = input.getStringByField("write"); try { if(writer == null){ if(System.getProperty("os.name").equals("Windows 10")){ writer = new FileWriter("D:\\099_test\\" + this); } else if(System.getProperty("os.name").equals("Windows 8.1")){ writer = new FileWriter("D:\\099_test\\" + this); } else if(System.getProperty("os.name").equals("Windows 7")){ writer = new FileWriter("D:\\099_test\\" + this); } else if(System.getProperty("os.name").equals("Linux")){ System.out.println("----:" + System.getProperty("os.name")); writer = new FileWriter("/usr/local/temp/" + this); } } log.info("【write】: 寫入文件"); writer.write(text); writer.write("\n"); writer.flush(); } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
和PrintBolt 這個類很類似,都是在處理數據。不做過多解釋