[TOC]java
特別說明:前面的四篇Storm筆記中,關於計算總和的例子中的spout,使用了死循環的邏輯,實際上這樣作是不正確的,緣由很簡單,Storm提供給咱們的API中,nextTuple方法就是循環執行了,這至關因而作了雙層循環。由於後面在作可靠性acker案例分析時發現,加入死循環邏輯後,該nextTuple所屬於的那個task根本就沒有辦法跳出這個nextTuple方法,也就沒有辦法執行後面的ack或者是fail方法,這點尤爲須要注意。redis
worker進程死掉算法
worker進程掛掉,storm集羣會在從新啓動一個worker進程。數據庫
supervisor進程死掉apache
supervisor進程掛掉,不會影響以前已經提交的topology,只是後期不能向這個節點分配任務,由於這個節點已經不是集羣的一員了。緩存
nimbus進程死掉(存在HA的問題)快速失敗併發
nimbus進程掛掉,也不會影響以前已經提交的topology,只是後期不能向集羣再提交新的topology了。1.0如下的版本存在HA的問題,1.0以後已經修復了這個問題,能夠有多個備選nimbus。app
節點宕機dom
ack/fail消息確認機制(確保一個tuple被徹底處理)ide
徹底處理tuple
在storm裏面一個tuple被徹底處理的意思是: 這個tuple以及由這個tuple所衍生的全部的tuple都被成功處理。
前面也提到了,若是但願使用qck/fail
確認機制,則須要作下面的事情:
1.在咱們的spout中重寫ack和fail方法 2.spout發送tuple時須要攜帶messageId 3.bolt成功或失敗處理後要主動進行回調
根據上面的說明,程序代碼以下,注意其中體現的這幾點:
package cn.xpleaf.bigdata.storm.acker; import cn.xpleaf.bigdata.storm.utils.StormUtil; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Date; import java.util.Map; import java.util.UUID; /** * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。 * <p> * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯 * <p> * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。 * 咱們稱這爲適配器模式 * <p> * storm消息確認機制---可靠性分析 * acker * fail */ public class AckerSumTopology { /** * 數據源 */ static class OrderSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private long num = 0; /** * 接收數據的核心方法 */ @Override public void nextTuple() { String messageId = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase(); // while (true) { num++; StormUtil.sleep(1000); System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num); this.collector.emit(new Values(num), messageId); // } } /** * 是對發送出去的數據的描述schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("order_cost")); } @Override public void ack(Object msgId) { System.out.println(msgId + "對應的消息被處理成功了"); } @Override public void fail(Object msgId) { System.out.println(msgId + "---->對應的消息被處理失敗了"); } } /** * 計算和的Bolt節點 */ static class SumBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Long sumOrderCost = 0L; /** * 處理數據的核心方法 */ @Override public void execute(Tuple input) { Long orderCost = input.getLongByField("order_cost"); sumOrderCost += orderCost; if (orderCost % 10 == 1) { // 每10次模擬消息失敗一次 collector.fail(input); } else { System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost); collector.ack(input); } StormUtil.sleep(1000); } /** * 若是當前bolt爲最後一個處理單元,該方法能夠不用管 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,至關於在MapReduce中構建Job */ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ builder.setSpout("id_order_spout", new OrderSpout()); builder.setBolt("id_sum_bolt", new SumBolt(), 1) .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = AckerSumTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } }
運行後(本地運行或上傳到集羣上提交做業),輸出以下:
當前時間20180413215706產生的訂單金額:1 當前時間20180413215707產生的訂單金額:2 7a4ce596fd3a40659f2d7f80a7738f55---->對應的消息被處理失敗了 線程ID:133 ,商城網站到目前20180413215707的商品總交易額3 當前時間20180413215708產生的訂單金額:3 0555a933a49f413e94480be201a55615對應的消息被處理成功了 線程ID:133 ,商城網站到目前20180413215708的商品總交易額6 當前時間20180413215709產生的訂單金額:4 4b923132e4034e939c875aca368a8897對應的消息被處理成功了 線程ID:133 ,商城網站到目前20180413215709的商品總交易額10 當前時間20180413215710產生的訂單金額:5 51f159472e854ba282ab84a2218459b8對應的消息被處理成功了 線程ID:133 ,商城網站到目前20180413215710的商品總交易額15 ......
通常的業務數據存儲,最終仍是要落地,存儲到RDBMS,可是RDBMS沒法達到高訪問量,能力達不到實時處理,或者說處理能力是有限的,會形成鏈接中斷等問題,爲了數據落地,咱們能夠採起迂迴方式,能夠採用好比說先緩存到高速內存數據庫(如redis),而後再將內存數據庫中的數據定時同步到rdbms中,並且能夠按期定時來作。
能夠在storm中使用定時任務來實現這些定時數據落地的功能,不過須要先了解storm定時任務。
在main中設置
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60); // 設置多久發送一個系統的tuple定時發射數據
可是咱們通常都會對特定的bolt設置定時任務,而沒有必要對全局每個bolt都發送系統的tuple,這樣很是的耗費資源,因此就有了局部定時任務,也是咱們經常使用的。
注意:storm會按照用戶設置的時間間隔給拓撲中的全部bolt發送系統級別的tuple。在main函數中設置定時器,storm會定時給拓撲中的全部bolt都發送系統級別的tuple,若是隻須要給某一個bolt設置定時功能的話,只須要在這個bolt中覆蓋getComponentConfiguration方法,裏面設置定時間隔便可。
測試代碼以下:
package cn.xpleaf.bigdata.storm.quartz; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.shade.org.apache.commons.io.FileUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.io.File; import java.io.IOException; import java.util.*; /** * 2°、單詞計數:監控一個目錄下的文件,當發現有新文件的時候, 把文件讀取過來,解析文件中的內容,統計單詞出現的總次數 E:\data\storm 研究storm的定時任務 有兩種方式: 1.main中設置,全局有效 2.在特定bolt中設置,bolt中有效 */ public class QuartzWordCountTopology { /** * Spout,獲取數據源,這裏是持續讀取某一目錄下的文件,並將每一行輸出到下一個Bolt中 */ static class FileSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void nextTuple() { File directory = new File("D:/data/storm"); // 第二個參數extensions的意思就是,只採集某些後綴名的文件 Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true); for (File file : files) { try { List<String> lines = FileUtils.readLines(file, "utf-8"); for(String line : lines) { this.collector.emit(new Values(line)); } // 當前文件被消費以後,須要重命名,同時爲了防止相同文件的加入,重命名後的文件加了一個隨機的UUID,或者加入時間戳也能夠的 File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed"); FileUtils.moveFile(file, destFile); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } /** * Bolt節點,將接收到的每一行數據切割爲一個個單詞併發送到下一個節點 */ static class SplitBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void execute(Tuple input) { if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 確保不是系統發送的tuple,才使用咱們的業務邏輯 String line = input.getStringByField("line"); String[] words = line.split(" "); for (String word : words) { this.collector.emit(new Values(word, 1)); } } else { System.out.println("splitBolt: " + input.getSourceComponent().toString()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } /** * Bolt節點,執行單詞統計計算 */ static class WCBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 確保不是系統發送的tuple,才使用咱們的業務邏輯 String word = input.getStringByField("word"); Integer count = input.getIntegerByField("count"); /*if (map.containsKey(word)) { map.put(word, map.get(word) + 1); } else { map.put(word, 1); }*/ map.put(word, map.getOrDefault(word, 0) + 1); System.out.println("===================================="); map.forEach((k, v) -> { System.out.println(k + ":::" + v); }); } else { System.out.println("sumBolt: " + input.getSourceComponent().toString()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,組裝Spout和Bolt節點,至關於在MapReduce中構建Job */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // dag builder.setSpout("id_file_spout", new FileSpout()); builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout"); builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt"); StormTopology stormTopology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); String topologyName = QuartzWordCountTopology.class.getSimpleName(); Config config = new Config(); config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); cluster.submitTopology(topologyName, config, stormTopology); } }
輸出:
splitBolt: __system sumBolt: __system splitBolt: __system sumBolt: __system ......
在bolt中使用下面代碼判斷是不是觸發用的bolt
tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
若是爲true,則執行定時任務須要執行的代碼,最後return,若是爲false,則執行正常的tuple處理的業務邏輯。
即對於須要進行數據落地的bolt,咱們能夠只給該bolt設置定時任務,這樣系統會定時給該bolt發送系統級別的tuple,在咱們該bolt的代碼中進行判斷,若是接收到的是系統級別的bolt,則進行數據落地的操做,好比將數據寫入數據庫或其它操做等,不然就按照正常的邏輯來執行咱們的業務代碼。
工做中經常使用這一種方式進行操做。
測試程序以下:
package cn.xpleaf.bigdata.storm.quartz; import clojure.lang.Obj; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.shade.org.apache.commons.io.FileUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.io.File; import java.io.IOException; import java.util.*; /** * 2°、單詞計數:監控一個目錄下的文件,當發現有新文件的時候, 把文件讀取過來,解析文件中的內容,統計單詞出現的總次數 E:\data\storm 研究storm的定時任務 有兩種方式: 1.main中設置,全局有效 2.在特定bolt中設置,bolt中有效 */ public class QuartzPartWCTopology { /** * Spout,獲取數據源,這裏是持續讀取某一目錄下的文件,並將每一行輸出到下一個Bolt中 */ static class FileSpout extends BaseRichSpout { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private SpoutOutputCollector collector; // 發送tuple的組件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void nextTuple() { File directory = new File("D:/data/storm"); // 第二個參數extensions的意思就是,只採集某些後綴名的文件 Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true); for (File file : files) { try { List<String> lines = FileUtils.readLines(file, "utf-8"); for(String line : lines) { this.collector.emit(new Values(line)); } // 當前文件被消費以後,須要重命名,同時爲了防止相同文件的加入,重命名後的文件加了一個隨機的UUID,或者加入時間戳也能夠的 File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed"); FileUtils.moveFile(file, destFile); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } /** * Bolt節點,將接收到的每一行數據切割爲一個個單詞併發送到下一個節點 */ static class SplitBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split(" "); for (String word : words) { this.collector.emit(new Values(word, 1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } /** * Bolt節點,執行單詞統計計算 */ static class WCBolt extends BaseRichBolt { private Map conf; // 當前組件配置信息 private TopologyContext context; // 當前組件上下文對象 private OutputCollector collector; // 發送tuple的組件 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 確保不是系統發送的tuple,才使用咱們的業務邏輯 String word = input.getStringByField("word"); Integer count = input.getIntegerByField("count"); /*if (map.containsKey(word)) { map.put(word, map.get(word) + 1); } else { map.put(word, 1); }*/ map.put(word, map.getOrDefault(word, 0) + 1); System.out.println("===================================="); map.forEach((k, v) -> { System.out.println(k + ":::" + v); }); } else { System.out.println("sumBolt: " + input.getSourceComponent().toString() + "---" + System.currentTimeMillis()); } } @Override public Map<String, Object> getComponentConfiguration() { // 修改局部bolt的配置信息 Map<String, Object> config = new HashMap<>(); config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return config; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 構建拓撲,組裝Spout和Bolt節點,至關於在MapReduce中構建Job */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // dag builder.setSpout("id_file_spout", new FileSpout()); builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout"); builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt"); StormTopology stormTopology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); String topologyName = QuartzPartWCTopology.class.getSimpleName(); Config config = new Config(); cluster.submitTopology(topologyName, config, stormTopology); } }
輸出以下:
sumBolt: __system---1523631954330 sumBolt: __system---1523631964330 sumBolt: __system---1523631974329 sumBolt: __system---1523631984329 sumBolt: __system---1523631994330 sumBolt: __system---1523632004330 sumBolt: __system---1523632014329 sumBolt: __system---1523632024330 ......
deactive:未激活(暫停)
emitted: emitted tuple數
transferred: transferred tuple數
emitted的區別:若是一個task,emitted一個tuple到2個task中,則 transferred tuple數是emitted tuple數的兩倍
complete latency: spout emitting 一個tuple到spout ack這個tuple的平均時間(能夠認爲是tuple以及該tuple樹的整個處理時間)
process latency: bolt收到一個tuple到bolt ack這個tuple的平均時間,若是沒有啓動acker機制,那麼值爲0
execute latency:bolt處理一個tuple的平均時間,不包含acker操做,單位是毫秒(也就是bolt 執行 execute 方法的平均時間)
總結:execute latency和proces latnecy是處理消息的時效性,而capacity則表示處理能力是否已經飽和,從這3個參數能夠知道topology的瓶頸所在。