1. 簡介
-
是一個分佈式, 高容錯的 實時計算框架html
-
Storm進程常駐內存, 永久運行java
-
Storm數據不通過磁盤, 在內存中流轉, 經過網絡直接發送給下游node
-
流式處理(streaming) 與 批處理(batch)python
-
批處理(batch): MapReducemysql
-
微批處理(MircroBatch): Spark (性能上近似 Streaming, 可是仍是有所不及)linux
-
流(streaming): Storm, Flink(其實Flink也能夠作批處理)redis
-
Storm MapReduce 流式處理 批處理 毫秒級 分鐘級 DAG模型 Map+Reduce模型 常駐運行 反覆啓停
-
-
Storm 計算模型
-
Topology - DAG 有向無環圖sql
- 例圖: (Spout: 噴嘴)
-
對Storm實時計算邏輯進行封裝mongodb
-
由一系列經過數據流相互關聯的Spout、Bolt鎖組成的拓撲結構shell
-
生命週期: 此拓撲只要啓動就會一直在集羣中運行, 直到手動將其kill, 不然不會終止
(與MapReduce中的Job的區別: MR中的Job在計算機執行完成就會終止)
-
Tuple - 元組
- Stream中最小的數據組成單元(熟悉python的必定不會陌生)
-
Stream - 數據流
- 從Spout 中源源不斷傳遞數據給Bolt、以及上一個Bolt傳遞數據給下一個Bolt, 所造成的的數據通道爲Stream
- 在聲明Stream時須要給其指定一個Id (默認爲Default)
- 實際開發場景中, 多使用單一數據流, 此時不須要單獨指定StreamId
-
Spout - 數據源
-
拓撲中數據流的來源。通常會從指定外部的數據源讀取元組 (Tuple) 發送到拓撲(Topology) 中。
-
一個Spout能夠發送多個數據流(Stream)
- 能夠先經過OutputFieldsDeclarer中的declare方法聲明定義的不一樣數據流, 發送數據時經過SpoutOutputCollector 中的 emit 方法指定數據流 Id (streamId) 參數將數據發送出去
-
Spout 中最核心的方法是 nextTuple, 該方法會被Storm線程不斷調用、主動從數據源拉取數據, 再經過emit 方法將數據生成元組 (Tuple) 發送給以後的 Bolt 計算
-
-
Bolt - 對數據流進行處理的組件
-
拓撲中數據處理均由Bolt完成。對於簡單的任務或者數據流轉換, 單個Bolt能夠簡單實現; 更加複雜場景每每須要多個Bolt分多個步驟完成
-
一個Bolt能夠發送多個數據流(Stream)
- 可先經過OutputFieldsDeclarer中的declare方法聲明定義的不一樣數據流, 發送數據時經過收集器(Collector)中的emit方法指定數據流Id(streamId) 參數將數據發送出去
-
Bolt 中最核心的方法是execute方法, 該方法負責接收到一個元組 (Tuple) 數據 以及 真正實現核心的業務邏輯
-
-
簡單的WorldCount實例
- WordCountSpout
package com.ronnie.storm.wordCount; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.util.Map; import java.util.Random; public class WordCountSpout extends BaseRichSpout { private Random random = new Random(); SpoutOutputCollector collector; String[] lines = { "Well done Gaben well fucking done", "What is going wrong with you Bro", "You are so fucking retard", "What the hell is it", "hadoop spark storm flink", "mysql oracle memcache redis mongodb" }; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) { this.collector = collector; } /** * 1. storm 會一直(死循環)調用此方法 * 2. 每次調用此方法, 往下游發輸出 * * while(flag){ * nextTuple(); * } */ @Override public void nextTuple() { int index = random.nextInt(lines.length); String line = lines[index]; System.out.println("line: " + line); collector.emit(new Values(line)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("liner")); } }
- WordCountSplit
package com.ronnie.storm.wordCount; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; public class WordCountSplit extends BaseRichBolt { // 提高做用域 OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { System.err.println(this + "============================="); this.collector = collector; } @Override public void execute(Tuple input) { // 從域中獲取數據, 要與以前Spout中 declareOutputFields 的域名稱一致 String line = input.getStringByField("liner"); // 根據什麼分離 String[] words = line.split(" "); for (String word: words){ // Value是一個ArrayList, 其中存的對象要與後面聲明的域中屬性相對應 collector.emit(new Values(word,"ronnie")); } } /** * Fields中 的名稱 與 前面 value 中的屬性相應 * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "name")); } }
-
WordCount
package com.ronnie.storm.wordCount; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; public class WordCount extends BaseRichBolt { Map<String, Integer> result = new HashMap<>(); /** * 初始化任務 * @param map * @param topologyContext * @param outputCollector */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } /** * 最核心方法 * 上游傳tuple數據給它, 並調用此方法 * @param input */ @Override public void execute(Tuple input) { String word = input.getString(0); Integer integer = result.get(word); if (null == integer){ integer = 1; } else { integer += 1; } result.put(word, integer); System.err.println(word + " : " + integer); } /** * 聲明輸出的域類型 * @param outputFieldsDeclarer */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
-
WordCountTopology
package com.ronnie.storm.wordCount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.AuthorizationException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class WordCountTopology { public static void main(String[] args) { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("wcSpout", new WordCountSpout()); // setBolt 的第三個參數爲並行量 setNumTasks 修改 任務數量爲 4 topologyBuilder.setBolt("wcSplit", new WordCountSplit(), 2).setNumTasks(4).shuffleGrouping("wcSpout"); topologyBuilder.setBolt("wcCount", new WordCount(), 5).fieldsGrouping("wcSplit", new Fields("word")); StormTopology topology = topologyBuilder.createTopology(); Config config = new Config(); // 修改配置文件中的worker數量爲3 config.setNumWorkers(3); // 只要參數存在 if (args.length > 0){ try { StormSubmitter.submitTopology(args[0],config,topology); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { // 不存在就執行本地任務 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount", config, topology); } } }
-
最後可將任務打成 jar 包傳送到linux系統上(已經部署好storm集羣), 再經過命令行執行任務
[root@node01 storm-0.10.0] bin/storm jar /opt/ronnie/wc.jar com.ronnie.storm.wordCount.WordCountTopology wc # 在storm目錄下 bin/storm jar jar文件目錄 包結構.任務類 任務參數
-
-
Storm 架構設計
-
Nimbus
- 資源調度
- 任務分配
- 接收jar包
-
Supervisor
-
接收Nimbus分配的任務
-
啓動、中止本身管理的worker進程 (當前supervisor上的work數量可經過配置文件設定)
-
-
Worker
-
運行具體處理運算組件的進程 (每一個Worker對應執行一個Topology 的子集)
-
worker 任務類型:
- spout 任務
- bolt 任務
-
啓動 executor
- executor是 worker JVM 進程中的一個java線程, 通常默認每一個executor負責執行一個task任務
-
-
Zookeeper
-
與Hadoop架構對比
Hadoop Storm 主節點 ResourceManager Nimbus 從節點 NodeManager Supervisor 應用程序 Job Topology 工做進程 Child Worker 計算模型 Map/Reduce Spout/Bolt
-
-
Storm 任務提交流程
-
Storm 本地目錄樹
-
Storm DRPC
-
DRPC (Distributed RPC)
-
分佈式遠程過程調用
-
DRPC 是經過一個 DRPC 服務端(DRPC server)來實現分佈式RPC功能的。
-
DRPC Server 負責接收 RPC 請求, 並將該請求發送到Storm中運行的Topology, 等待接收 Topology 發送的處理結果, 並將該結果返回給發送請求的客戶端。
-
DRPC設計目的:
- 爲了充分利用Storm的計算能力實現高密度的並行實時計算。
- (Storm 接收若干個數據流輸入, 數據在Topology 當中運行完成, 而後經過DRPC將結果進行輸出。)
- 爲了充分利用Storm的計算能力實現高密度的並行實時計算。
-
客戶端經過向 DRPC 服務器發送執行函數的名稱以及該函數的參數來獲取處理結果。
- 實現該函數的拓撲使用一個DRPCSpout 從 DRPC 服務器中接收一盒函數調用流。 DRPC 服務器 會爲每一個函數調用都標記一個惟一的id。
- 隨後拓撲會執行函數來計算結果, 並在拓撲的最後使用一個名爲 ReturnResults 的 Bolt 鏈接到 DRPC服務器, 根據函數調用的結果返回。
-
-
-
Storm 容錯機制
-
集羣節點宕機
-
Nimbus服務器
- 單點故障的話後續版本能夠經過將nimbus.host: 改成 nimbus.seeds: ["node01", "node02"] 來設置備份節點解決
-
非Nimbus服務器
- 故障時, 該節點上全部Task任務都會超時, Nimbus會將這些Task從新分配到其餘服務器上運行
-
-
進程掛了
- Worker
- 掛掉時, Supervisor 會從新啓動這個進程。
- 若是啓動過程當中仍然一直失敗, 而且沒法向Nimbus發送心跳, Nimbus會將該Worker從新分配到其餘服務器上
- Supervisor
- 無狀態
- 全部的狀態信息都存放在Zookeeper中管理
- 快速失敗
- 每當遇到任何異常狀況, 都會自動毀滅
- 無狀態
- Nimbus
- 無狀態
- 快速失敗
- Worker
-
消息的完整性
- 從Spout中發出的Tuple, 以及基於他所產生的Tuple
- 由這些消息構成了一顆tuple樹
- 當這顆tuple樹發送完成, 而且樹當中每一條消息都被正確地處理, 就代表spout發送的消息被完整地處理過了, 即該消息具備完整性。(Completation 有興趣的能夠去Flink-client 看看 源碼中 CompletationFuture的使用)
-
消息完整性的實現機制
-
Acker
- Storm的拓撲中特殊的一些任務
- 負責跟蹤每一個Spout發出的Tuple的DAG (有向五環圖)
-
-
-
Storm 併發機制
-
基本組件
-
Worker - 進程
- 一個Topology會包含一盒或多個Worker (每一個Worker進程只能從屬於一個特定的Topology)
- 這些Worker進程會並行跑在集羣中的不一樣服務器上, 即一個Topology實際上是由並行運行在Storm集羣中多臺服務器上的進程所組成
-
Executor - 線程
- Executor是由Worker進程中生成的一個線程
- 每一個Worker進程中會運行拓撲當中的一個或多個Executor線程
- 一個Executor線程中能夠執行一個或多個Task任務, 可是這些Task任務都是對應着同一個組件(Spout、Bolt)
-
Task
- 實際執行數據處理的最小單元
- 每一個task即爲一個Spout或者一個Bolt
- Task數量在整個Topology聲明週期中保持不變, Executor數量key變化或手動調整
- 默認狀況下, Task數量和Executor是相同的, 即每一個Executor線程中默認運行一個Task任務
-
設置參數
-
Worker進程數
- Config.setNumWorkers(int workers)
-
Executor線程數
- TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
- TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
- parallelism_hint(並行量) 即爲 executor 線程數
-
Task數量
- ComponentConfigurationDeclarer.setNumTasks(Number val)
-
-
-
Rebalance - 重平衡
- 動態調整Topology拓撲的Worker進程數量、以及Executor線程數量
- 兩種調整方式: 經過Storm UI && 經過Storm CLI
-
-
Storm 通訊機制
-
Worker進程間的數據通訊
-
ZMQ
- ZeroMQ 開源的消息傳遞框架, 並非消息隊列(MessageQueue)
-
Netty
-
Nettty 是基於NIO(Not Blocked Input Output)的網絡框架(是對NIO包的一種封裝, 由於原生API不是很好用),更加高效。
-
Storm 0.9版本以後使用Netty是由於ZMQ的license和Storm的license不兼容。
-
-
-
Worker內部的數據通訊
- Disruptor(干擾者? wtf)
- 實現了 「隊列」 的功能
- 能夠理解爲一種時間監聽或者消息處理機制, 即在隊列當中一邊由生產者放入消息數據, 另外一邊消費者並行去除消息數據進行處理
- 實現了 「隊列」 的功能
- Disruptor(干擾者? wtf)
-
-
Storm Grouping -- 流數據流分組(數據分發策略)
-
Shuffle Grouping
- 隨機分組, 隨機派發stream中的tuple, 保證每一個bolt task接收到的tuple數目大體相同。
- 輪詢, 平均分配
-
Fields Grouping
- 按字段分組, 好比, 按 "user-id" 分組, 那麼具備一樣"user-id" 的 tuple 會被分到相同的Bolt中的一個, 而不一樣的"user-id" 則可能會被分配到不一樣的task
-
All Grouping
- 廣播發送, 對於每個tuple, 全部的bolts都會收到
-
Global Grouping
- 全局分組, 把tuple分配給task id 最低的task
-
None Grouping
- 不分組, 這個分組的意思是說stream不關係到底怎樣分組。 目前這種分組和shuffle grouping是同樣的效果。 有一點不一樣的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程裏面去執行 (將來Storm若是可能的話會這樣設計)
-
Direct Grouping
- 指向型分組, 這是一種比較特別的分組方法, 用這種分組意味着消息(tuple) 的發送者指定由消息接收者的哪一個task處理這個消息。 只有被聲明爲Direct Stream 的消息流能夠聲明這種分組方法。
- 這種消息tuple必須使用 emitDirect 方法來發射。 消息處理者能夠經過TopologyContext 來獲取處理它的task的id(OutputCollector.emit 方法也會返回task的id)
-
Local or shuffle Grouping
- 本地或隨機分組。 若是目標bolt有一個或者多個task與源bolt的task在同一個工做進程中, tuple將會被隨機發送給這些同進程中的tasks。 不然, 和普通的Shuffle Grouping行爲一致
-
CustomGrouping
- 自定義, 至關於mapreduce那裏本身去實現一個partition同樣。
-
Flume + Kafka + Storm 架構設計
-
採集層: 實現日誌收集, 使用負載均衡策略
-
消息隊列: 做用是解耦及作不一樣速度系統緩衝
-
實時處理單元: 用Storm來進行數據處理, 最終數據流入DB中
-
展現單元: 數據可視化, 使用WEB框架展現