Storm是實時的,分佈式,高容錯的計算系統。java+cljourehtml
Storm常駐內存,數據在內存中處理不通過磁盤,數據經過網絡傳輸。java
底層java+cljoure構成,阿里使用java重構Storm構建Jstorm。node
數據處理分類web
流式處理(異步)算法
客戶端提交數據進行結算,不會等待計算結果apache
數據追條處理:數據清洗或分析編程
例:在數據統計分析中:數據存入隊列,storm從MQ獲取數據,storm將結果存入存儲層vim
實時請求應答(同步)api
客戶端提交請求後當即計算並將結果返回給客戶端數組
圖片特徵提取: 客戶端向Storm的drpc發送請求,客戶端等待Storm響應結果
Storm與MR的比較
Storm流式處理,毫秒級響應,常駐內存
MR批處理TB級數據,分鐘級響應,反覆啓停
優點
高可靠:異常處理;消息可靠性保障機制
可維護性:提供UI監控接口
Nimbus:接收請求,接收jar包並解析,任務分配,資源分配
zookeeper:存儲Nimbus分配的子任務,存儲任務數據,用於解耦架構
Supervisor:從zk中獲取子任務,啓動worker任務,啓停所管理的worker
Worker:執行具體任務(每一個worker對應一個Topology的子集),分爲spout任務和bolt任務,啓動executor,一個executor對應一個線程,默認一個executor執行一個task任務。任務所需數據從zookeeper中獲取。
storm與MR比對 | MR | Storm |
---|---|---|
主節點 | ResourceManager | Nimbus |
從節點 | NodeManager | Supervisor |
應用程序 | Job | Topology |
工做進程 | Child | Worker |
計算模型 | Map/Reduce | Spout/Bolt |
DAG有向無環圖,拓撲結構圖,做爲實時計算邏輯的封裝。一旦啓動會一直在系統中運行直到手動kill
Spout 數據源節點
從外部讀取原始數據
定義數據流,可定義多個數據流向不一樣Bolt 發數據,讀取外部數據源
nextTuple:被Storm線程不斷調用、主動從數據源拉取數據,再經過emit方法將數據生成Tuple,發送給以後的Bolt計算
Bolt 計算節點
execute:該方法接收到一個Tuple數據,真正業務邏輯
定義數據流,將結果數據發出去
以wordcount爲例:Spout 獲取原始行數據,將行數據Tuple做爲發給bolt1,bolt1解析行數據爲單詞數據,bolt1將單詞數據轉爲Tuple發給bolt2,bolt2對單詞數據進行計數,將計數結果發送到存儲層。
指定計算任務的各節點及節點關聯,提交任務
topologyBuilder的setSpout方法和setBolt方法構建各節點
方法參數爲:id值,bolt類,併發度(線程),task數,數據來源節點,數據流,分組字段
shuffleGrouping是隨機分配數據
多併發度
fieldsGrouping是根據指定的若干字段分組到不一樣的線程
Config對象傳入配置參數
超時時間 秒:conf.setMessageTimeoutSecs(3);
LocalCluster本地執行,對象提交任務名,參數對象,拓撲對象,最終執行任務
public class MyTopology {
public static void main(String[] args) {
//建立拓撲構建對象
TopologyBuilder topologyBuilder=new TopologyBuilder();
/**
* 指定各節點,線程數量,上下游關聯
* 對於多線程,注意上游的結果會隨機發給各個線程
*/
//設置spout節點,指定(id值,spout類,併發度)
topologyBuilder.setSpout("s1",new CountSpout(),1);
//設置bolt節點,指定 (id值,bolt類,併發度,數據來源節點,分組字段)
topologyBuilder.setBolt("b1",new CountBolt1(),2).setNumTasks(4).shuffleGrouping("s1");
topologyBuilder.setBolt("b2",new CountBolt2(),1).fieldsGrouping("b1",new Fields("word"));
//構建拓撲對象
StormTopology stormTopology=topologyBuilder.createTopology();
//設置配置信息,該對象實際是個map,向其中設置各類參數
Config config=new Config();
config.setNumWorkers(3);
config.put("xxx","xxx");
//獲取本地執行集羣對象
LocalCluster lc=new LocalCluster();
//本地執行:向集羣提交任務,任務名,參數,拓撲節點
lc.submitTopology("job-count",config,stormTopology);
}
}
繼承BaseRichSpout類,實際實現了IRichSpout接口,主要重寫如下方法
open()——初始化,參數爲:配置參數, 上下文,輸出對象
nextTuple()——業務代碼
declareOutputFields()——定義輸出結果
關於數據輸出
將SpoutOutputCollector輸出對象轉爲全局變量,經過SpoutOutputCollector輸出數據
使用Values對象,存入多個結果(至關於一個list)
經過OutputFieldsDeclarer對象,依次定義每一個結果的字段名(至關於給list字段命名,造成相似map的有序集合)
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class CountSpout extends BaseRichSpout {
/** 將輸出對象轉爲全局屬性,支持數據輸出 */
private SpoutOutputCollector collector;
//初始化
繼承BaseRichBolt類,實際實現了IRichBolt接口,主要重寫如下方法
prepare()——初始化
execute()——業務代碼
declareOutputFields()——定義輸出字段
關於輸出數據,與spout相同
關於輸入數據:經過input的各類方法獲取對應類型的數據,支持經過字段所在索引下標或字段名獲取指定位置的值
public class CountBolt1 extends BaseRichBolt {
/** 將輸出對象轉爲全局屬性,支持數據輸出 */
private OutputCollector collector;
Shuffle Grouping:輪詢的方式,隨機派發tuple,每一個線程接收到數量大體相同。
Fields Grouping:按指定的字段分組,指定字段的值相同tuple,分配相同的task(處於同一個線程中)
如下方式瞭解
All Grouping:廣播發送,全部下游的bolt線程都會收到。仍是task
Global Grouping:全局分組, 將tuple分配給 id最低的task,這個id是用戶指定的id值。
None Grouping:隨機分組
Direct Grouping:指向型分組
local or shuffle grouping:若目標bolt task與源bolt task在同一工做進程中,則tuple隨機分配給同進程中的tasks。不然普通的Shuffle分配 Grouping行爲一致
customGrouping:自定義
安裝
tar -zxvf apache-storm-0.10.0.tar.gz
mv apache-storm-0.10.0 storm-0.10.0
修改配置
vim /opt/sxt/storm-0.10.0/conf/storm.yaml
#指定zk地址
storm.zookeeper.servers:
- "node1"
- "node2"
- "node3"
#指定nimbus的節點
nimbus.host: "node1"
#指定開放的端口(默認如下4個)
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
拷貝到其餘節點
cd /opt/sxt
scp -r storm-0.10.0/ node3:`pwd`
啓動集羣
啓動zookeeper集羣
zkServer.sh start
啓動nimbus節點 cd /opt/sxt/storm-0.10.0
各節點建立logs目錄:mkdir logs
主節點啓動:
啓動nimbus
nohup ./bin/storm nimbus >> logs/nimbus.out 2>&1 &
啓動監控ui
nohup ./bin/storm ui >> logs/ui.out 2>&1 &
從節點啓動supervisor
nohup ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
注:項目的日誌也就存放中logs目錄下的各文件中
能夠遠程查看:例如文件wc-1-1571317142-worker-6700.log
http://node1:8080/topology.html?wc-1-1571317142-worker-6700.log
網頁登陸設置在主節點上的UI監控地址 http://node1:8080
關閉集羣
#關閉nimbus
kill `ps aux | egrep '(daemon.nimbus)|(storm.ui.core)' | fgrep -v egrep | awk '{print $2}'`
#關閉supervisor
kill `ps aux | fgrep storm | fgrep -v 'fgrep' | awk '{print $2}'`
#關閉zk
zkServer.sh stop
從IDE中導出項目jar文件(field標籤 > project structure > 指定主類建立jar配置 > bulid標籤 > build project > 從項目目錄中找到jar文件) 導入到/opt/sxt下
載入項目命令:./bin/storm jar /opt/sxt/wc.jar com.shsxt.storm.wordcount.WordCountTP wc
依次指定jar文件目錄,主類,topology名
中止項目命令:./bin/storm kill wc (注意指定topology)
本地目錄
/opt/sxt/storm-0.10.0/storm-local 目錄下存放項目執行時的數據
內部爲一個目錄樹結構
zookeeper目錄
進入zk客戶端 zkCli.sh
查看storm的目錄 ls /storm 內部時一個目錄樹
查看指定的數據 get /xxx/xxx
worker進程
具體執行任務的進程
一個topology包含若干個worker執行,worker執行topology的子集
一個worker須要綁定一個網絡端口,默認一個節點最多運行4個worker,經過配置多於4個端口開放更多worker
Executor線程
execute是由worker進程建立的線程,worker包含至少一個executor線程
一個線程中可以執行一個或多個task任務,默認一個線程執行一個task
spout / boltk 與線程是一對多的關係,一個spout 能夠包含多個線程,但線程只對應一個spout,監控線程除外。
task
數據處理的最小單元
每一個task對應spout或bolt的一個併發度
關於再平衡熱rebalance
在topology生命週期中,worker與executor的數量可變,task的數量不可變
多個線程沒法同時執行一個task,只能一個線程執行一個task,所以線程數應該小於task數,設置多的線程是無效的。
注意:系統對每一個worker都會設置一個監控線程,監控線程數量與worker相同
命令修改
./bin/storm/rebalance wc -n 5 -e blue-spout=3 -e yellow-bolt=10
(調整wc任務 5個worker 3個spout線程 10個bolt線程 )
api設置
worker數
經過配置文件制定worker數量
config.setNumWorkers(3);
executor數與task數量
線程數設置 setBolt(String id, IRichBolt bolt, Number 線程數);和setSpout()方法
設置任務數 setNumTasks(task數)
topologyBuilder.setBolt("wcSplit",new WordCountSplit(),2).setNumTasks(4).shuffleGrouping("wcSpout");
worker之間數據傳輸使用Netty架構
接收線程監聽tcp端口,並將數據存入緩衝區域
發送線程使用傳輸隊列發送數據
worker的內部通信採用Disruptor的隊列處理機制
數據從緩衝區讀取通過:接收隊列——工做線程——發送隊列——發送線程
tuple 在Storm中的數據的傳輸單元
Stream 數據連通的管道,多個管道時須要指定Stream的id
線程數包括:spout線程數,blot線程數,acker(每一個work1個監控線程)
宕機機制
nimbus宕機
無狀態策略,全部的狀態信息都存放在Zookeeper中來管理,worker正常運行,只是沒法提交新的topology
快速失敗策略:自檢到異常後自毀並重啓
supervisor宕機
無狀態策略,supervisor的全部的狀態信息都存放在Zookeeper中來管理
快速失敗策略:自檢到異常後自毀並重啓
節點故障狀態下,若全部該節點上的任務超時,nimbus會將該節點的task分配給其餘的節點
worker宕機
supervisor會重啓worker進程
若worker屢次失敗且沒法向nimbus發送心跳,nimbus會將該worker的分配到其餘supervisor上執行。
消息完整性
spout中發出的tuple及其由tuple產生的子tuple,構成一個樹形結構,只有當數中的全部消息都被正確處理,才能保證數據的完整性。
爲了保證數據的完整性,對於處理失敗的tuple須要請求上游的spout重發數據。
消息重發的機制:at least(至少一次),oncely (有且只有一次)
acker機制
acker跟蹤每一個spout發出的tuple。
acker記錄每一個trulp的id,數據處理後,經過亦或算法清空id,若id未清空則代表這條數據沒有處理完成。
自定義acker類,對數據進行監控
在topology中添加具有acker功能的spout做爲acker類
topologyBuilder.setSpout("myack",new MyAckSpout());
定義acker類
實現IRichSpout接口,重寫ack(成功)和fail(失敗)方法,對發出的數據進程緩存
設置全局緩存,數據發送時存入緩存,處理成功刪除緩存,處理失敗,從緩存取數據重發
public class MyAckSpout implements IRichSpout {
SpoutOutputCollector collector;
//數據緩存
Map<Object,String> map = new HashMap<>();
//設置全局緩存id
long id = 0;
bolt 中代碼響應,execute方法中
方法末尾添加數據處理的響應
處理成功:collection.ack(input)
處理失敗:collection.fail(input) 注意:超時也數據失敗
向下遊發送數據時,指定acker是否繼續跟蹤下游數據
跟蹤:collector.emit(input,new Values(split[i]));
不跟蹤:collector.emit(new Values(split[i]));
public void execute(Tuple input) {
String xxx = input.getStringByField("aaa");
//結束跟蹤
collector.emit(new Values(xxx));
//繼續collector.emit(new Values(xxx));
//處理成功
collector.ack(input);
//處理失敗,collector.fail(input);
}
採集層(flume):實現日誌收集,使用負載均衡策略
消息隊列(kafka):做用是解耦及不一樣速度系統緩衝
實時處理單元(Storm):用Storm來進行數據處理,最終數據流入DB中
展現單元(DB+web):數據可視化,使用WEB框架展現