[TOC]java
在Strom的API中提供了LocalCluster
對象,這樣在不用搭建Storm環境或者Storm集羣的狀況下也可以開發Storm的程序,很是方便。數據庫
基於Maven構建工程項目,其所須要的依賴以下:apache
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> </dependency>
需求以下:併發
數據源不斷產生遞增數字,對產生的數字累加求和
分析以下:app
Strom的Topology包含Spout和Bolt兩種節點類型,在這個案例中,可使用Spout來對數據源進行處理(模擬產生數據), 而後將其發送到計算和的Bolt中,因此實際上這裏只須要使用一個Spout節點和一個Bolt節點就能夠了。
在理解了Storm的設計思想後,將其與MapReduce的設計思想進行對比,再看下面的程序代碼實際上是很是好理解的。dom
/** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } /** * 接收數據的核心方法 */ @Override public void nextTuple() { long num = 0; while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num)); } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } }
private Long sumOrderCost = 0L; /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
/** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 */ public class StormLocalSumTopology { /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt()) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); // 啓動topology LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster String topologyName = StormLocalSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 localCluster.submitTopology(topologyName, config, topology); } }
須要說明的是,Spout和Bolt的類都做爲StormLocalSumTopology的靜態成員變量,這樣作是爲了開發的方便,固然實際上也能夠將其單獨做爲一個文件。分佈式
執行主函數,其輸出以下:ide
當前時間20180412213836產生的訂單金額:1 商城網站到目前20180412213836的商品總交易額1 當前時間20180412213837產生的訂單金額:2 商城網站到目前20180412213837的商品總交易額3 當前時間20180412213838產生的訂單金額:3 商城網站到目前20180412213838的商品總交易額6 ......
需求以下:函數
監控一個目錄下的文件,當發現有新文件的時候,把文件讀取過來,解析文件中的內容,統計單詞出現的總次數
分析以下:oop
能夠設置三個節點: Spout:用於持續讀取目錄下須要被監聽(經過後綴名標識)的文件,而且將每一行輸出到下一個Bolt中 (相似於MapReduce中的FileInputFormat) Bolt1:讀取行,並解析其中的單詞,將每一個單詞輸出到下一個Bolt中 (相似於MapReduce中的Mapper) Bolt2:讀取單詞,進行統計計算 (相似於MapReduce中的Reducer)
/** * Spout,獲取數據源,這裏是持續讀取某一目錄下的文件,並將每一行輸出到下一個Bolt中 */ static class FileSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void nextTuple() { File directory = new File("D:/data/storm"); // 第二個參數extensions的意思就是,只採集某些後綴名的文件 Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true); for (File file : files) { try { List<String> lines = FileUtils.readLines(file, "utf-8"); for(String line : lines) { this.collector.emit(new Values(line)); } // 當前文件被消費以後,須要重命名,同時爲了防止相同文件的加入,重命名後的文件加了一個隨機的UUID,或者加入時間戳也能夠的 File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed"); FileUtils.moveFile(file, destFile); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
/** * Bolt節點,將接收到的每一行數據切割爲一個個單詞併發送到下一個節點 */ static class SplitBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split(" "); for (String word : words) { this.collector.emit(new Values(word,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
/** * Bolt節點,執行單詞統計計算 */ static class WCBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = input.getIntegerByField("count"); /*if (map.containsKey(word)) { map.put(word, map.get(word) + 1); } else { map.put(word, 1); }*/ map.put(word, map.getOrDefault(word, 0) + 1); System.out.println("===================================="); map.forEach((k ,v)->{ System.out.println(k + ":::" +v); }); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
/** * 2°、單詞計數:監控一個目錄下的文件,當發現有新文件的時候, 把文件讀取過來,解析文件中的內容,統計單詞出現的總次數 E:\data\storm */ public class StormLocalWordCountTopology { /** * 構建拓撲,組裝Spout和Bolt節點,至關於在MapReduce中構建Job */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // dag builder.setSpout("id_file_spout", new FileSpout()); builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout"); builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt"); StormTopology stormTopology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); String topologyName = StormLocalWordCountTopology.class.getSimpleName(); Config config = new Config(); cluster.submitTopology(topologyName, config, stormTopology); } }
執行程序後,往目標目錄中添加.txt
文件,程序輸出以下:
==================================== hello:::1 ==================================== hello:::1 you:::1 ==================================== hello:::2 you:::1 ==================================== hello:::2 he:::1 you:::1 ==================================== hello:::3 he:::1 you:::1 ==================================== me:::1 hello:::3 he:::1 you:::1
在編寫了Storm的程序後,再來看看其相關的術語就容易理解不少了。
Topology用於封裝一個實時計算應用程序的邏輯,相似於Hadoop的MapReduce Job
Stream 消息流,是一個沒有邊界的tuple序列,這些tuples會被以一種分佈式的方式並行地建立和處理
Spouts 消息源,是消息生產者,他會從一個外部源讀取數據並向topology裏面面發出消息:tuple
Bolts 消息處理者,全部的消息處理邏輯被封裝在bolts裏面,處理輸入的數據流併產生新的輸出數據流, 可執行過濾,聚合,查詢數據庫等操做
Task 每個Spout和Bolt會被看成不少task在整個集羣裏面執行,每個task對應到一個線程.
Stream groupings 消息分發策略,定義一個Topology的其中一步是定義每一個tuple接受什麼樣的流做爲輸入, stream grouping就是用來定義一個stream應該如何分配給Bolts們.