本文來自網易雲社區
html
做者:汪建偉java
前言
node
前一段時間參與哨兵流式監控功能設計,調研了兩個能夠作流式計算的框架:storm和spark streaming,我負責storm的調研工做。斷斷續續花了一週的時間看了官網上的doc和網絡上的一些資料。我把所學到的總結成一個文檔,發出來給對storm感興趣的同事作入門引導。
數據庫
storm背景
編程
隨着互聯網的更進一步發展,從Portal信息瀏覽型到Search信息搜索型到SNS關係交互傳遞型,以及電子商務、互聯網旅遊生活產品等將生活中的流通環節在線化。對效率的要求讓你們對於實時性的要求進一步提高,而信息的交互和溝通正在從點對點往信息鏈甚至信息網的方向發展,這樣必然帶來數據在各個維度的交叉關聯,數據爆炸已不可避免。所以流式處理加NoSQL產品應運而生,分別解決實時框架和數據大規模存儲計算的問題。
json
2011年twitter對Storm開源。之前互聯網的開發人員在作一個實時應用的時候,除了要關注應用邏輯計算處理自己,還要爲了數據的實時流轉、交互、分佈大傷腦筋。如今開發人員能夠快速的搭建一套健壯、易用的實時流處理框架,配合SQL產品或者NoSQL產品或者MapReduce計算平臺,就能夠低成本的作出不少之前很難想象的實時產品:好比一淘數據部的量子恆道品牌旗下的多個產品就是構建在實時流處理平臺上的。數組
strom語言
服務器
Storm的主要開發語言是clojure,完成核心功能邏輯,輔助開發語言還有Python和java
網絡
strom的特色負載均衡
1. 編程模型簡單
Storm同hadoop同樣,爲大數據的實時計算提供了一些簡單優美的原語,這大大下降了開發並行實時處理的任務的複雜性,幫助你快速、高效的開發應用。
2. 可擴展
在Storm集羣中真正運行topology的主要有三個實體:工做進程、線程和任務。Storm集羣中的每臺機器上均可以運行多個工做進程,每一個工做進程又可建立多個線程,每一個線程能夠執行多個任務,任務是真正進行數據處理的實體,咱們開發的spout、bolt就是做爲一個或者多個任務的方式執行的。 所以,計算任務在多個線程、進程和服務器之間並行進行,支持靈活的水平擴展。
3. 高可靠
Storm能夠保證spout發出的每條消息都能被「徹底處理」。 spout發出的消息後續可能會觸發產生成千上萬條消息,能夠形象的理解爲一棵消息樹,其中spout發出的消息爲樹根,Storm會跟蹤這棵消息樹的處理狀況,只有當這棵消息樹中的全部消息都被處理了,Storm纔會認爲spout發出的這個消息已經被「徹底處理」。若是這棵消息樹中的任何一個消息處理失敗了,或者整棵消息樹在限定的時間內沒有「徹底處理」,那麼spout發出的消息就會重發。 考慮到儘量減小對內存的消耗,Storm並不會跟蹤消息樹中的每一個消息,而是採用了一些特殊的策略,它把消息樹看成一個總體來跟蹤,對消息樹中全部消息的惟一id進行異或計算,經過是否爲零來斷定spout發出的消息是否被「徹底處理」,這極大的節約了內存和簡化了斷定邏輯,後面會在下文對這種機制進行詳細介紹。
這種模式,每發送一個消息,都會同步發送一個ack/fail,對於網絡的帶寬會有必定的消耗,若是對於可靠性要求不高,可經過使用不一樣的emit接口關閉該模式。
上面所說的,Storm保證了每一個消息至少被處理一次,可是對於有些計算場合,會嚴格要求每一個消息只被處理一次,Storm的0.7.0引入了事務性拓撲,解決了這個問題。
4. 高容錯
若是在消息處理過程當中出了一些異常,Storm會從新安排這個出問題的處理單元。Storm保證一個處理單元永遠運行(除非你顯式殺掉這個處理單元)。固然,若是處理單元中存儲了中間狀態,那麼當處理單元從新被Storm啓動的時候,須要應用本身處理中間狀態的恢復。
5. 快速
這裏的快主要是指的時延。storm的網絡直傳、內存計算,其時延必然比hadoop的經過hdfs傳輸低得多;當計算模型比較適合流式時,storm的流式處理,省去了批處理的收集數據的時間;由於storm是服務型的做業,也省去了做業調度的時延。因此從時延上來看,storm要快於hadoop。
說一個典型的場景,幾千個日誌生產方產生日誌文件,須要進行一些ETL操做存入一個數據庫。
假設利用hadoop,則須要先存入hdfs,按每一分鐘切一個文件的粒度來算(這個粒度已經極端的細了,再小的話hdfs上會一堆小文件),hadoop開始計算時,1分鐘已通過去了,而後再開始調度任務又花了一分鐘,而後做業運行起來,假設機器特別多,幾鈔鍾就算完了,而後寫數據庫假設也花了不多的時間,這樣,從數據產生到最後可使用已通過去了至少兩分多鐘。
而流式計算則是數據產生時,則有一個程序去一直監控日誌的產生,產生一行就經過一個傳輸系統發給流式計算系統,而後流式計算系統直接處理,處理完以後直接寫入數據庫,每條數據從產生到寫入數據庫,在資源充足時能夠在毫秒級別完成。
6. 支持多種編程語言
除了用java實現spout和bolt,你還可使用任何你熟悉的編程語言來完成這項工做,這一切得益於Storm所謂的多語言協議。多語言協議是Storm內部的一種特殊協議,容許spout或者bolt使用標準輸入和標準輸出來進行消息傳遞,傳遞的消息爲單行文本或者是json編碼的多行。
7. 支持本地模式
Storm有一種「本地模式」,也就是在進程中模擬一個Storm集羣的全部功能,以本地模式運行topology跟在集羣上運行topology相似,這對於咱們開發和測試來講很是有用。
storm的組成
在Storm的集羣裏面有兩種節點: 控制節點(master node)和工做節點(worker node)。控制節點上面運行一個叫Nimbus後臺程序,它的做用相似Hadoop裏面的JobTracker。Nimbus負責在集羣裏面分發代碼,分配計算任務給機器, 而且監控狀態。
每個工做節點上面運行一個叫作Supervisor的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個topology的一個子集;一個運行的topology由運行在不少機器上的不少工做進程組成。
Nimbus和Supervisor之間的全部協調工做都是經過Zookeeper集羣完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。全部的狀態要麼在zookeeper裏面, 要麼在本地磁盤上。這也就意味着你能夠用kill -9來殺死Nimbus和Supervisor進程, 而後再重啓它們,就好像什麼都沒有發生過。這個設計使得Storm異常的穩定。
接下來咱們再來具體看一下這些概念。
Nimbus:負責資源分配和任務調度。
Supervisor:負責接受nimbus分配的任務,啓動和中止屬於本身管理的worker進程。
Worker:運行具體處理組件邏輯的進程。
Task:worker中每個spout/bolt的線程稱爲一個task. 在storm0.8以後,task再也不與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱爲executor。
下面這個圖描述了以上幾個角色之間的關係。
Topology基本原理
Storm集羣和Hadoop集羣表面上看很相似。可是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這二者之間是很是不同的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。
1 拓撲(Topologies)
一個topology是spouts和bolts組成的圖, 經過stream groupings將圖中的spouts和bolts鏈接起來,以下圖:
一個topology會一直運行直到你手動kill掉,Storm自動從新分配執行失敗的任務, 而且Storm能夠保證你不會有數據丟失(若是開啓了高可靠性的話)。若是一些機器意外停機它上面的全部任務會被轉移到其餘機器上。
2 流(Streams)
數據流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分佈式環境中並行建立、處理的一組元組(tuple)的無界序列。數據流能夠由一種可以表述數據流中元組的域(fields)的模式來定義。在默認狀況下,元組(tuple)包含有整型(Integer)數字、長整型(Long)數字、短整型(Short)數字、字節(Byte)、雙精度浮點數(Double)、單精度浮點數(Float)、布爾值以及字節數組等基本類型對象。固然,你也能夠經過定義可序列化的對象來實現自定義的元組類型。
3 數據源(Spouts)
數據源(Spout)是拓撲中數據流的來源。通常 Spout 會從一個外部的數據源讀取元組而後將他們發送到拓撲中。根據需求的不一樣,Spout 既能夠定義爲可靠的數據源,也能夠定義爲不可靠的數據源。一個可靠的 Spout 可以在它發送的元組處理失敗時從新發送該元組,以確保全部的元組都能獲得正確的處理;相對應的,不可靠的 Spout 就不會在元組發送以後對元組進行任何其餘的處理。
一個 Spout 能夠發送多個數據流。爲了實現這個功能,能夠先經過 OutputFieldsDeclarer 的 declareStream 方法來聲明定義不一樣的數據流,而後在發送數據時在 SpoutOutputCollector 的 emit 方法中將數據流 id 做爲參數來實現數據發送的功能。
Spout 中的關鍵方法是 nextTuple。顧名思義,nextTuple 要麼會向拓撲中發送一個新的元組,要麼會在沒有可發送的元組時直接返回。須要特別注意的是,因爲 Storm 是在同一個線程中調用全部的 Spout 方法,nextTuple 不能被 Spout 的任何其餘功能方法所阻塞,不然會直接致使數據流的中斷。
Spout 中另外兩個關鍵方法是 ack 和 fail,他們分別用於在 Storm 檢測到一個發送過的元組已經被成功處理或處理失敗後的進一步處理。注意,ack 和 fail 方法僅僅對上述「可靠的」 Spout 有效。
4 數據流處理組件(Bolts)
拓撲中全部的數據處理均是由 Bolt 完成的。經過數據過濾(filtering)、函數處理(functions)、聚合(aggregations)、聯結(joins)、數據庫交互等功能,Bolt 幾乎可以完成任何一種數據處理需求
一個 Bolt 能夠實現簡單的數據流轉換,而更復雜的數據流變換一般須要使用多個 Bolt 並經過多個步驟完成。例如,將一個微博數據流轉換成一個趨勢圖像的數據流至少包含兩個步驟:其中一個 Bolt 用於對每一個圖片的微博轉發進行滾動計數,另外一個或多個 Bolt 將數據流輸出爲「轉發最多的圖片」結果(相對於使用2個Bolt,若是使用3個 Bolt 你可讓這種轉換具備更好的可擴展性)。
與 Spout 相同,Bolt 也能夠輸出多個數據流。爲了實現這個功能,能夠先經過 OutputFieldsDeclarer 的 declareStream 方法來聲明定義不一樣的數據流,而後在發送數據時在 OutputCollector 的 emit 方法中將數據流 id 做爲參數來實現數據發送的功能。
在定義 Bolt 的輸入數據流時,你須要從其餘的 Storm 組件中訂閱指定的數據流。若是你須要從其餘全部的組件中訂閱數據流,你就必需要在定義 Bolt 時分別註冊每個組件。對於聲明爲默認 id(即上文中提到的「default」——譯者注)的數據流,InputDeclarer支持訂閱此類數據流的語法糖。也就是說,若是須要訂閱來自組件「1」的數據流,declarer.shuffleGrouping("1") 與 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID) 兩種聲明方式是等價的。
Bolt 的關鍵方法是 execute 方法。execute 方法負責接收一個元組做爲輸入,而且使用 OutputCollector 對象發送新的元組。若是有消息可靠性保障的需求,Bolt 必須爲它所處理的每一個元組調用 OutputCollector 的 ack 方法,以便 Storm 可以瞭解元組是否處理完成(而且最終決定是否能夠響應最初的 Spout 輸出元組樹)。通常狀況下,對於每一個輸入元組,在處理以後能夠根據須要選擇不發送仍是發送多個新元組,而後再響應(ack)輸入元組。IBasicBolt 接口可以實現元組的自動應答。
5 數據流分組(Stream groupings)
爲拓撲中的每一個 Bolt 的肯定輸入數據流是定義一個拓撲的重要環節。數據流分組定義了在 Bolt 的不一樣任務(tasks)中劃分數據流的方式。
在 Storm 中有八種內置的數據流分組方式,並且你還能夠經過CustomStreamGrouping 接口實現自定義的數據流分組模型。這八種分組分時分別爲:
1. 隨機分組(Shuffle grouping):這種方式下元組會被儘量隨機地分配到 Bolt 的不一樣任務(tasks)中,使得每一個任務所處理元組數量可以可以保持基本一致,以確保集羣的負載均衡。
2. 域分組(Fields grouping):這種方式下數據流根據定義的「域」來進行分組。例如,若是某個數據流是基於一個名爲「user-id」的域進行分組的,那麼全部包含相同的「user-id」的元組都會被分配到同一個任務中,這樣就能夠確保消息處理的一致性。
3. 部分關鍵字分組(Partial Key grouping):這種方式與域分組很類似,根據定義的域來對數據流進行分組,不一樣的是,這種方式會考慮下游 Bolt 數據處理的均衡性問題,在輸入數據源關鍵字不平衡時會有更好的性能1。感興趣的讀者能夠參考這篇論文,其中詳細解釋了這種分組方式的工做原理以及它的優勢。
4. 徹底分組(All grouping):這種方式下數據流會被同時發送到 Bolt 的全部任務中(也就是說同一個元組會被複制多份而後被全部的任務處理),使用這種分組方式要特別當心。
5. 全局分組(Global grouping):這種方式下全部的數據流都會被髮送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。
6. 非分組(None grouping):使用這種方式說明你不關心數據流如何分組。目前這種方式的結果與隨機分組徹底等效,不過將來 Storm 社區可能會考慮經過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個線程中執行。
7. 直接分組(Direct grouping):這是一種特殊的分組方式。使用這種方式意味着元組的發送者能夠指定下游的哪一個任務能夠接收這個元組。只有在數據流被聲明爲直接數據流時纔可以使用直接分組方式。使用直接數據流發送元組須要使用 OutputCollector 的其中一個 emitDirect 方法。Bolt 能夠經過 TopologyContext 來獲取它的下游消費者的任務 id,也能夠經過跟蹤 OutputCollector 的 emit 方法(該方法會返回它所發送元組的目標任務的 id)的數據來獲取任務 id。
8. 本地或隨機分組(Local or shuffle grouping):若是在源組件的 worker 進程裏目標 Bolt 有一個或更多的任務線程,元組會被隨機分配到那些同進程的任務中。換句話說,這與隨機分組的方式具備類似的效果。
6 任務(Tasks)
在 Storm 集羣中每一個 Spout 和 Bolt 都由若干個任務(tasks)來執行。每一個任務都與一個執行線程相對應。數據流分組能夠決定如何由一組任務向另外一組任務發送元組。你能夠在 TopologyBuilder 的 setSpout 方法和 setBolt 方法中設置 Spout/Bolt 的並行度。
7 工做進程(Workers)
拓撲是在一個或多個工做進程(worker processes)中運行的。每一個工做進程都是一個實際的 JVM 進程,而且執行拓撲的一個子集。例如,若是拓撲的並行度定義爲300,工做進程數定義爲50,那麼每一個工做進程就會執行6個任務(進程內部的線程)。Storm 會在全部的 worker 中分散任務,以便實現集羣的負載均衡。
相關閱讀: 流式處理框架storm淺析(下篇)
網易雲免費體驗館,0成本體驗20+款雲產品!
更多網易研發、產品、運營經驗分享請訪問網易雲社區。
相關文章:
【推薦】 一篇文章看懂Facebook和新浪微博的智能FEED
【推薦】 【kudu pk parquet】TPC-H Query2對比解析
【推薦】 Dubbo與HadoopRPC的區別