Strom的結構
Storm與傳統關係型數據庫
傳統關係型數據庫是先存後計算,而storm則是先算後存,甚至不存
傳統關係型數據庫很難部署實時計算,只能部署定時任務統計分析窗口數據
關係型數據庫重視事務,併發控制,相對來講Storm比較簡陋
Storm與Hadoop,Spark等是流行的大數據方案
與Storm關係密切的語言:核心代碼用clojure書寫,實用程序用python開發,使用java開發拓撲
topology
Storm集羣中有兩種節點,一種是控制節點(Nimbus節點),另外一種是工做節點(Supervisor節點)。全部Topology任務的 提交必須在Storm客戶端節點上進行(須要配置 storm.yaml文件),由Nimbus節點分配給其餘Supervisor節點進行處理。 Nimbus節點首先將提交的Topology進行分片,分紅一個個的Task,並將Task和Supervisor相關的信息提交到 zookeeper集羣上,Supervisor會去zookeeper集羣上認領本身的Task,通知本身的Worker進程進行Task的處理。
和一樣是計算框架的MapReduce相比,MapReduce集羣上運行的是Job,而Storm集羣上運行的是Topology。可是Job在運行結束以後會自行結束,Topology卻只能被手動的kill掉,不然會一直運行下去
Storm不處理計算結果的保存,這是應用代碼須要負責的事情,若是數據不大,你能夠簡單地保存在內存裏,也能夠每次都更新數據庫,也能夠採用NoSQL存儲。這部分事情徹底交給用戶。
數據存儲以後的展示,也是你須要本身處理的,storm UI 只提供對topology的監控和統計。
整體的Topology處理流程圖爲:
zookeeper集羣
storm使用zookeeper來協調整個集羣, 可是要注意的是storm並不用zookeeper來傳遞消息。因此zookeeper上的負載是很是低的,單個節點的zookeeper在大多數狀況下 都已經足夠了, 可是若是你要部署大一點的storm集羣, 那麼你須要的zookeeper也要大一點。關於如何部署zookeeper,能夠看http://zookeeper.apache.org/doc /r3.3.3/zookeeperAdmin.html
部署zookeeper有些須要注意的地方:
一、對zookeeper作好監控很是重要, zookeeper是fail-fast的系統,只要出現什麼錯誤就會退出, 因此實際場景中要監控,更多細節看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_supervision
二、實際場景中要配置一個cron job來壓縮zookeeper的數據和業務日誌。zookeeper本身是不會去壓縮這些的,因此你若是不設置一個cron job, 那麼你很快就會發現磁盤不夠用了,更多細節能夠查看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_maintenance
Component
Storm中,Spout和Bolt都是Component。因此,Storm定義了一個名叫IComponent的總接口
全家普以下:綠色部分是咱們最經常使用、比較簡單的部分。紅色部分是與事務相關的
Spout
Spout是Stream的消息產生源, Spout組件的實現能夠經過繼承BaseRichSpout類或者其餘Spout類來完成,也能夠經過實現IRichSpout接口來實現
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
open()方法 -- 初始化方法
close() -- 在該spout將要關閉時調用。可是不保證其必定被調用,由於在集羣中supervisor節點,可使用kill -9來殺死worker進程。只有當Storm是在本地模式下運行,若是是發送中止命令,能夠保證close的執行
ack(Object msgId) -- 成功處理tuple時回調的方法,一般狀況下,此方法的實現是將消息隊列中的消息移除,防止消息重放
fail(Object msgId) -- 處理tuple失敗時回調的方法,一般狀況下,此方法的實現是將消息放回消息隊列中而後在稍後時間裏重放
nextTuple() -- 這是Spout類中最重要的一個方法。發射一個Tuple到Topology都是經過這個方法來實現的。調用此方法時,storm向spout發出請求,讓spout發出元組(tuple)到輸出器(ouput collector)。這種方法應該是非阻塞的,因此spout若是沒有元組發出,這個方法應該返回。nextTuple、ack 和fail 都在spout任務的同一個線程中被循環調用。 當沒有元組的發射時,應該讓nextTuple睡眠一個很短的時間(如一毫秒),以避免浪費太多的CPU。
繼承了BaseRichSpout後,不用實現close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只關心最基本核心的部分。
一般狀況下(Shell和事務型的除外),實現一個Spout,能夠直接實現接口IRichSpout,若是不想寫多餘的代碼,能夠直接繼承BaseRichSpout
Bolt
Bolt類接收由Spout或者其餘上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現能夠經過繼承BasicRichBolt類或者IRichBolt接口等來完成
prepare方法 -- 此方法和Spout中的open方法相似,在集羣中一個worker中的task初始化時調用。 它提供了bolt執行的環境
declareOutputFields方法 -- 用於聲明當前Bolt發送的Tuple中包含的字段(field),和Spout中相似
cleanup方法 -- 同ISpout的close方法,在關閉前調用。一樣不保證其必定執行。
execute方法 -- 這是Bolt中最關鍵的一個方法,對於Tuple的處理均可以放到此方法中進行。具體的發送是經過emit方法來完成的。execute接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。
Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。若是你確實要反饋失敗,能夠拋出FailedException
一般狀況下,實現一個Bolt,能夠實現IRichBolt接口或繼承BaseRichBolt,若是不想本身處理結果反饋,能夠實現 IBasicBolt接口或繼承BaseBasicBolt,它實際上至關於自動實現了collector.emit.ack(inputTuple)
Topology運行流程
(1)Storm提交後,會把代碼首先存放到Nimbus節點的inbox目錄下,以後,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化以後的Topology代碼文件
(2) 在設定Topology所關聯的Spouts和Bolts時,能夠同時設置當前Spout和Bolt的executor數目和task數目,默認狀況下,一個Topology的task的總和是和executor的總和一致的。以後,系統根據worker的數目,儘可能平均的分配這些task的執行。worker在哪一個supervisor節點上運行是由storm自己決定的
(3)任務分配好以後,Nimbus節點會將任務的信息提交到zookeeper集羣,同時在zookeeper集羣中會有workerbeats節點,這裏存儲了當前Topology的全部worker進程的心跳信息
(4)Supervisor 節點會不斷的輪詢zookeeper集羣,在zookeeper的assignments節點中保存了全部Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關係等,Supervisor經過輪詢此節點的內容,來領取本身的任務,啓動worker進程運行
(5)一個Topology運行以後,就會不斷的經過Spouts來發送Stream流,經過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。
最後一步會不間斷的執行,除非手動結束Topology。
Topology運行方式
在開始建立項目以前,瞭解Storm的操做模式(operation modes)是很重要的。 Storm有兩種運行方式
本地運行的提交方式,例:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
分佈式提交方式,例:
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
須要注意的是,在Storm代碼編寫完成以後,須要打包成jar包放到Nimbus中運行,打包的時候,不須要把依賴的jar都打迚去,不然若是把依賴的storm.jar包打進去的話,運行時會出現重複的配置文件錯誤致使Topology沒法運行。由於Topology運行以前,會加載本地的 storm.yaml 配置文件。
運行的命令以下: storm jar StormTopology.jar mainclass [args]
storm守護進程的命令
Nimbus: storm nimbus 啓動nimbus守護進程
Supervisor: storm supervisor 啓動supervisor守護迚程
UI:storm ui 這將啓動stormUI的守護進程,爲監測storm集羣提供一個基於web的用戶界面。
DRPC: storm drpc 啓動DRPC的守護進程
storm管理命令
JAR:storm jar topology_jar topology_class [arguments...]
jar命令是用於提交一個集羣拓撲.它運行指定參數的topology_class中的main()方法,上傳topology_jar到nimbus,由nimbus發佈到集羣中。一旦提交,storm將激活拓撲並開始處理topology_class 中的main()方法,main()方法負責調用StormSubmitter.submitTopology()方法,並提供一個惟一的拓撲(集羣)的名。若是一個擁有該名稱的拓撲已經存在於集羣中,jar命令將會失敗。常見的作法是在使用命令行參數來指定拓撲名稱,以便拓撲在提交的時候被命名。
KILL:storm kill topology_name [-w wait_time]
殺死一個拓撲,可使用kill命令。它會以一種安全的方式銷燬一個拓撲,首先停用拓撲,在等待拓撲消息的時間段內容許拓撲完成當前的數據流。執行kill命令時能夠經過-w [等待秒數]指定拓撲停用之後的等待時間。也能夠在Storm UI 界面上實現一樣的功能
Deactivate:storm deactivate topology_name
停用拓撲時,全部已分發的元組都會獲得處理,spouts的nextTuple方法將不會被調用。也能夠在Storm UI 界面上實現一樣的功能
Activate:storm activate topology_name
啓動一個停用的拓撲。也能夠在Storm UI 界面上實現一樣的功能
Rebalance:storm rebalance topology_name [-w wait_time] [-n worker_count] [-e component_name=executer_count]...
rebalance使你從新分配集羣任務。這是個很強大的命令。好比,你向一個運行中的集羣增長了節點。rebalance命令將會停用拓撲,而後在相應超時時間以後重分配worker,並重啓拓撲
例:storm rebalance wordcount-topology -w 15 -n 5 -e sentence-spout=4 -e split-bolt=8
還有其餘管理命令,如:Remoteconfvalue、REPL、Classpath等
新建storm項目注意事項
爲了開發storm項目,你的classpath裏面須要有storm的jar包。最推薦的方式是使用Maven,不使用maven的話你能夠手動把storm發行版裏面的全部的jar包添加到classpath storm-starter項目使用Leiningen做爲build和依賴管理工具,你能夠下載這個腳本(https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein)來安裝Leiningen, 把它加入到你的PATH, 使它可執行。要拉取storm的全部依賴包,簡單地在項目的根目錄執行 lein deps 就能夠了