storm學習

 

一 storm起源html

     storm做爲一個流式處理框架,它與hadoop的根本區別在於hadoop的輸入不是持續的,而storm的輸入是持續的。storm是一個開源的,分佈式的流式的計算系統。隨着有些公司數據量增加很是快和和數據量特別大就出現了分佈式的須要,把一個計算任務拆解成多個計算機同時運行。Google發表的三篇論文,Google File System,BigTable,MapReduce奠基了分佈式的理論基礎,原Yahoo的Doug Cutting根據這些學術論文研究出hadoop。基於hadoop改造的系統就如雨後春筍般的出現了,HBase,Drill,Hive,Tez,Pig,Dremel,Mahout,等造成了一整套生態系統。 可是hadoop只適用於批處理,不適用於流式處理,流式處理有時候是很是必須和重要的,批處理每每須要收集一部分時間數據而後在計算,流式處理是相對動態的,好比用戶出廣告費使他的搜索靠前,若是第二名出不了這麼多錢,就能夠惡意點擊位於他前面的廣告商使費用很快用完,若是批處理hadoop就可能招來廣告商的埋怨,若是用流式處理框架就能夠比較實時的計算是否是惡意點擊。因而就產生了分佈式流式計算系統,比較有名的有流失系統有Yahoo的S4,IBM的StreamBase,Amazon的kinesis,Spark的Streaming,Google的Millwheel .java

批量計算和流式計算的比較:node

       Storm的很大一部分實現都是Clojure代碼。同時storm在設計之初就考慮到了兼容多語言開發。Nimbus是一個thrift服務,topologies被定義爲Thrift結構體。Thrift的運用使得Storm能夠被任意開發語言使用。sql

二 storm組件數據庫

     Storm的術語包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。
apache

      Storm的一個做業是一個拓撲--Topology,包含了許多數據節點和計算節點,以及這些節點之間的邊,數據源節點稱爲spout,計算節點稱爲bolt,點之間的邊稱爲Stream,數據流中的每一條記錄稱爲tuple。拓撲的每一個節點都要說明它所發射出的元組的字段的name,其餘節點只須要訂閱該name就能夠接收處理。segmentfault

   

     Topologyapi

     在storm上須要你本身去定義和創建topology,從而進行實時的計算。topoloy裏面的每個節點都是並行運行的,它會一直永久運行,直到你顯示的關閉進程。topology的定義是thrift結構而且nimbus是一個thrift服務,能夠用其它語言建立而且提交topology.若是是java可使用TopologyBuilder定義拓撲,而且指定spout和bolt和分組方式,可使用stormsubmitter向集羣提交拓撲名稱,拓撲配置信息和自己的topology做爲參數運行一個拓撲。可使用storm kill {topologyname}來中止一個拓撲,其中 topologyname 就是你提交拓撲時使用的拓撲名稱。不過,在執行該命令後 Storm 不會立刻 kill 掉該拓撲。Storm 會先中止全部 spouts 的活動,使得他們不能繼續發送 tuple,而後 Storm 會等待 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 參數表示的一段時間,而後纔會結束全部的 worker 進程。這能夠保證拓撲在被 kill 以前能夠有足夠的時間完成已有的 tuple 的處理。數組

      Streams框架

      數據流式storm中最核心的抽象概念,指的是在分佈式環境中並行建立、處理的一組元組(tuple)的無界序列。在聲明數據流的時候能夠定義一個有效的id。可是大部分都是單一數據流的,不須要ID進行區分, 能夠直接使用OutputFieldsDeclarer   申明無id的數據流。實際上,系統默認會給這種數據流定義一個名爲「default」的 id。

      Spouts

      spouts是拓撲中的數據源,負責讀入數據。spouts能夠定義爲可靠的和不可靠的數據源,可靠的數據源能夠在tuple發送失敗的時候從新發送,不可靠的則不處理。nextTuple方法就是向拓撲中發送一個新的tuple.須要注意nextTuple方法不能被其它方法阻塞,不然會致使數據流的中斷。(可參考阿里的JStorm)spouts還有兩個比較重要的act和fail方法,只對可靠的spouts起做用,在發送tuple失敗和成功時可作進一步處理。

      Bolts

      bolts是topology的計算節點,能夠進行數據過濾,函數處理,聚合,聯結,數據庫交互等功能實現數據流的轉化。你須要從其餘的storm組件中訂閱指定的數據流,而且在定義bolt的時候把註冊相應的組件。對於申明默認id 的數據流,InputDeclarer的shuffleGrouping("1") 與shuffleGrouping("1", DEFAULT_STREAM_ID) 兩種聲明方式是等價的,都是訂閱來自組件「1」的數據流。bolt也能夠定義多個數據流,能夠經過OutputFieldsDeclarer的declareStream方法來申明定義不一樣的數據源,而後發送數據時在outputCollector的emit方法中將數據流id做爲參數來實現數據發送的功能。Bolt的execute方法負責接受一個元祖做爲一個輸入,而且使用outputCollector對象發送新的元組。若是有消息可靠性保障的需求,Bolt必須爲它所處理的每一個元組調用OutputCollector的 ack 方法,以便Storm可以瞭解元組是否處理完成。

      Tuple

      tuple也叫元組,是數據流中的一個基本處理單元,能夠由Integer,Long,Short,Byte,Double,Float,Boolean,Char基本類型、字符串、字節數組、ArrayList、HashMap、HashSet 以及 Clojure 的集合類型的序列化。若是你須要在 tuple 中使用其餘的對象類型,你就須要註冊一個自定義的序列化器。若是 Storm 發現了一個沒有註冊序列化器的類型,它會使用 Java 自帶的序列化器。若是這個對象沒法序列化,Storm 就會拋出異常。Java 自身的序列化機制很是耗費資源,並且無論在 CPU 的性能上或者序列化對象的大小上都沒有優點。建議在生產環境中運行拓撲的時候註冊一個自定義的序列化器。

      Tasks和Workers

      tasks是線程級別的,而workers是進程級別的,每一個工做進程即Worker是一個實際的JVM進程,tasks和workers都是執行topology中的spout和bolt.能夠經過topologyBuilder的setSpout 方法和 setBolt 方法中設置相應 的並行度。好比,topology的並行度定義爲100,workers數量爲20,那麼每一個worker就會有6個tasks任務。

      Stream Grouping

      隨機分組(Shuffle grouping):元組會被隨機地分配到 Bolt 的不一樣任務(tasks)中,儘量使得每一個任務所處理元組數量保持基本相同。

      域分組(Fields grouping):數據流根據定義的「域」來進行分組。好比能夠基於一個名爲「id」的域進行分組,包含相同的「id」的元組會分配到同一個任務中。  

      部分關鍵字分組(Partial Key grouping):這種方式與域分組很類似,根據定義的域來對數據流進行分組,不一樣的是,這種方式會考慮下游 Bolt 數據處理的均衡性問題,在輸入數據源關鍵字不平衡時會有更好的性能1。感興趣的讀者能夠參考這篇論文,其中詳細解釋了這種分組方式的工做原理以及它的優勢。

      徹底分組(All grouping):至關於廣播方式,同一個元組會被複制多份而後被全部的任務處理。

      全局分組(Global grouping):這種方式下全部的數據流都會被髮送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。

      非分組(None grouping):目前和隨機分組等效。

      直接分組(Direct grouping):元組的發送者能夠指定下游的哪一個任務能夠接收這個元組。只有在數據流被聲明爲直接數據流時纔可以使用直接分組方式。使用直接數據流發送元組須要使用OutputCollector 的其中一個 emitDirect 方法。Bolt 能夠經過 TopologyContext 來獲取它的下游消費者的任務 id,也能夠經過跟蹤 OutputCollector 的 emit 方法(該方法會返回它所發送元組的目標任務的 id)的數據來獲取任務 id。

      本地或隨機分組(Local or shuffle grouping):若是在源組件的 worker 進程裏目標 Bolt 有一個或更多的任務線程,元組會被隨機分配到那些同進程的任務中。換句話說,這與隨機分組的方式具備類似的效果。

 三 Storm的可用性

      storm集羣

      採用主從結構,主節點稱爲Nimbus,管理整個集羣的運行狀態,從節點稱爲Supervisor,維護每一臺機器的狀態。

      worker 掛掉

      supervisor 會從新啓動工做進程。若是仍然一直失敗,在必定時間內沒法向 Nimbus 發送心跳,Nimbus 就會將這個任務從新分配到其餘的worker上面。

      非主節點故障

      非主節點發生故障時,該節點上全部的任務(tasks)都會超時,而後 Nimbus 在檢測到超時後會將全部的這些任務從新分配到其餘機器上去。

      Nimbus 或者 Supervisor進程掛掉

      Zookeeper管理着Nimbus和Supervisor後臺進程的狀態,Niubus和Supervisor的後臺進程會在監控工具的監控下運行,若是掛掉,會靜默的自動啓動。 與Hadoop不一樣,JobTracker的故障會致使全部正在運行的job是失敗,Nimbus或者supervisor不會影響任何的工做進程。

      Nimbus單點故障

      Nimbus節點故障會致使worker進程不會在必要的時候從新分配到不一樣的機器中,看上去好像丟失了一個worker,這就是惟一的影響,此外,集羣中的worker仍然會繼續運行,supervisor也會監控而且啓動正在運行的機器。

      storm在zookeeper中的結構(資料來源:http://segmentfault.com/a/1190000000653595)

 

/-{storm-zk-root}           -- storm在zookeeper上的根目錄(默認爲/storm) | |-/assignments -- topology的任務分配信息 | | | |-/{topology-id} -- 這個目錄保存的是每一個topology的assignments信息包括:對應的nimbus上 | -- 的代碼目錄,全部task的啓動時間,每一個task與機器、端口的映射。操做爲 | -- (assignments)來獲取全部assignments的值;以及(assignment-info storm-id) | -- 來獲得給定的storm-id對應的AssignmentInfo信息 | -- 在AssignmentInfo中存儲的內容有: | -- :executor->node+port :executor->start-time-secs :node->host | -- 具體定義在common.clj中的 | -- (defrecord Assignment[master-code-dir node->host executor->node+port executor->start-time-secs]) | |-/storms -- 這個目錄保存全部正在運行的topology的id | | | | | |-/{topology-id} -- 這個文件保存這個topology的一些信息,包括topology的名字,topology開始運行 | -- 的時間以及這個topology的狀態。操做(active-storms),得到當前路徑活躍的下 | -- topology數據。保存的內容參考類StormBase;(storm-base storm-id)獲得給定的 | -- storm-id下的StormBase數據,具體定義在common.clj中的 | -- (defrecord StormBase [storm-name launch-time-secs status num-workers component->executors]) | |-/supervisors -- 這個目錄保存全部的supervisor的心跳信息 | | | | | |-/{supervisor-id} -- 這個文件保存supervisor的心跳信息包括:心跳時間,主機名,這個supervisor上 | -- worker的端口號,運行時間(具體看SupervisorInfo類)。操做(supervisors)獲得 | -- 全部的supervisors節點;(supervisor-info supervisor-id)獲得給定的 | -- supervisor-id對應的SupervisorInfo信息;具體定義在common.clj中的 | | -- (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs]) | |-/workerbeats -- 全部worker的心跳 | | | |-/{topology-id} -- 這個目錄保存這個topology的全部的worker的心跳信息 | | | |-/{supervisorId-port} -- worker的心跳信息,包括心跳的時間,worker運行時間以及一些統計信息 | | -- 操做(heartbeat-storms)獲得全部有心跳數據的topology, | -- (get-worker-heartbeat storm-id node port)獲得具體一個topology下 | -- 的某個worker(node:port)的心跳情況, | -- (executor-beats storm-id executor->node+port)獲得一個executor的心跳情況 | |-/errors -- 全部產生的error信息 | |-/{topology-id} -- 這個目錄保存這個topology下面的錯誤信息。操做(error-topologies)獲得出錯 | -- 的topology;(errors storm-id component-id)獲得 | -- 給定的storm-id component-id下的出錯信息 |-/{component-id}
相關文章
相關標籤/搜索