流式計算解決方案-Storm
html
在Hadoop生態圈中,針對大數據進行批量計算時,一般須要一個或者多個MapReduce做業來完成,但這種批量計算方式是知足不了對實時性要求高的場景。java
Storm是一個開源分佈式實時計算系統,它能夠實時可靠地處理流數據。面試
本章內容:數據庫
1) Storm特色apache
2) Storm基本概念編程
3) Storm分組模式api
4) Storm系統架構數組
5) Storm容錯機制安全
6) 一個簡單的Storm實現微信
在Storm出現以前,進行實時處理是很是痛苦的事情,咱們主要的時間都花在關注往哪裏發消息,從哪裏接收消息,消息如何序列化,真正的業務邏輯只佔了源代碼的一小部分。一個應用程序的邏輯運行在不少worker上,但這些worker須要各自單獨部署,還須要部署消息隊列。最大問題是系統很脆弱,並且不是容錯的:須要本身保證消息隊列和worker進程工做正常。
Storm完整地解決了這些問題。它是爲分佈式場景而生的,抽象了消息傳遞,會自動地在集羣機器上併發地處理流式計算,讓你專一於實時處理的業務邏輯。
Storm有以下特色:
1) 編程簡單:開發人員只須要關注應用邏輯,並且跟Hadoop相似,Storm提供的編程原語也很簡單
2) 高性能,低延遲:能夠應用於廣告搜索引擎這種要求對廣告主的操做進行實時響應的場景。
3) 分佈式:能夠輕鬆應對數據量大,單機搞不定的場景
4) 可擴展:隨着業務發展,數據量和計算量愈來愈大,系統可水平擴展
5) 容錯:單個節點掛了不影響應用
6) 消息不丟失:保證消息處理
不過Storm不是一個完整的解決方案。使用Storm時你須要關注如下幾點:
1) 若是使用的是本身的消息隊列,須要加入消息隊列作數據的來源和產出的代碼
2) 須要考慮如何作故障處理:如何記錄消息處理的進度,應對Storm重啓,掛掉的場景
3) 須要考慮如何作消息的回退:若是某些消息處理一直失敗怎麼辦?
1) 定義及架構
Hadoop是Apache的一個項目,是一個可以對大量數據進行分佈式處理的軟件框架。
Storm是Apache基金會的孵化項目,是應用於流式數據實時處理領域的分佈式計算系統。
Hadoop |
Storm |
|
系統角色 |
JobTracker |
Nimbus |
TaskTracker |
Supervisor |
|
Child |
Worker |
|
應用名稱 |
Job |
Topology |
組件接口 |
Mapper/Reducer |
Spout/Bolt |
2) 應用方面
Hadoop是分佈式批處理計算,強調批處理,經常使用於數據挖掘和分析。
Storm是分佈式實時計算,強調實時性,經常使用於實時性要求較高的地方。
3) 計算處理方式
Hadoop是磁盤級計算,進行計算時,數據在磁盤上,須要讀寫磁盤;Hadoop應用MapReduce的思想,將數據切片計算來處理大量的離線數據。Hadoop處理的數據必須是已經存放在HDFS上或者相似HBase的數據庫中,因此Hadoop實現的時候是經過移動計算到這些存放數據的機器上來提升效率的。
Storm是內存級計算,數據直接經過網絡導入內存。Storm是一個流計算框架,處理的數據是實時消息隊列中的,須要寫好一個Topology邏輯,而後將接收進來的數據進行處理,因此Storm是經過移動數據平均分配到機器資源來得到高效率的。
4) 數據處理方面
數據來源:Hadoop是HDFS上某個文件夾下的數據,數據量可能以TB來計;而Storm則是實時新增的某一筆數據。
處理過程:Hadoop是Map階段到Reduce階段的;Storm是由用戶定義處理流程,流程中能夠包含多個步驟,每一個步驟能夠是數據源(SPOUT),也能夠是處理邏輯(BOLT)。
是否結束:Hadoop最後必需要結束;而Storm沒有結束狀態,到最後一步時,就停在那,直到有新數據進入時再從新開始。
處理速度:Hadoop以處理HDFS上大量數據爲目的,速度慢;Storm只要處理新增的某一筆數據便可,故此它的速度很快。
適用場景:Hadoop主要是處理一批數據,對時效性要求不高,須要處理就提交一個JOB;而Storm主要是處理某一新增數據的,故此時效性要求高。
總結,Hadoop和Storm並無真的優劣之分,它們只是在各自的領域上有着獨特的性能而已,如果真的把它們進行單純的比較,反而是有失公平了。事實上,只有在最合適的方面使用最合適的大數據平臺,纔可以真正體現出它們的價值,也纔可以真正爲咱們的工做提供最爲便捷的助力!
1) Topology
一個Storm拓撲打包了一個實時處理程序的邏輯。一個Storm拓撲跟一個MapReduce的任務(job)是相似的。主要區別是MapReduce任務最終會結束,而拓撲會一直運行(固然直到你殺死它)。一個拓撲是一個經過流分組(Stream Grouping)把Spout和Bolt鏈接到一塊兒的拓撲結構。圖的每條邊表明一個Bolt訂閱了其餘Spout或者Bolt的輸出流。一個拓撲就是一個複雜的多階段的流計算。
2) Tuple
元組是Storm提供的一個輕量級的數據格式,能夠用來包裝你須要實際處理的數據。元組是一次消息傳遞的基本單元。一個元組是一個命名的值列表,其中的每一個值均可以是任意類型的。元組是動態地進行類型轉化的—字段的類型不須要事先聲明。在Storm中編程時,就是在操做和轉換由元組組成的流。一般,元組包含整數,字節,字符串,浮點數,布爾值和字節數組等類型。要想在元組中使用自定義類型,就須要實現本身的序列化方式。
3) Stream
流是Storm中的核心抽象。一個流由無限的元組序列組成,這些元組會被分佈式並行地建立和處理。經過流中元組包含的字段名稱來定義這個流。
每一個流聲明時都被賦予了一個ID。只有一個流的Spout和Bolt很是常見,因此OutputFieldsDeclarer提供了不須要指定ID來聲明一個流的函數(Spout和Bolt都須要聲明輸出的流)。這種狀況下,流的ID是默認的「default」。
4) Spout
Spout(噴嘴,這個名字很形象)是Storm中流的來源。一般Spout從外部數據源,如消息隊列中讀取元組數據並吐到拓撲裏。Spout能夠是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout可以在一個元組被Storm處理失敗時從新進行處理,而非可靠的Spout只是吐數據到拓撲裏,不關心處理成功仍是失敗了。
Spout能夠一次給多個流吐數據。此時須要經過OutputFieldsDeclarer的declareStream函數來聲明多個流並在調用SpoutOutputCollector提供的emit方法時指定元組吐給哪一個流。
Spout中最主要的函數是nextTuple,Storm框架會不斷調用它去作元組的輪詢。若是沒有新的元組過來,就直接返回,不然把新元組吐到拓撲裏。nextTuple必須是非阻塞的,由於Storm在同一個線程裏執行Spout的函數。
Spout中另外兩個主要的函數是Ack和fail。當Storm檢測到一個從Spout吐出的元組在拓撲中成功處理完時調用Ack,沒有成功處理完時調用Fail。只有可靠型的Spout會調用Ack和Fail函數。
5) Bolt
在拓撲中全部的計算邏輯都是在Bolt中實現的。一個Bolt能夠處理任意數量的輸入流,產生任意數量新的輸出流。Bolt能夠作函數處理,過濾,流的合併,聚合,存儲到數據庫等操做。Bolt就是流水線上的一個處理單元,把數據的計算處理過程合理的拆分到多個Bolt、合理設置Bolt的task數量,可以提升Bolt的處理能力,提高流水線的併發度。
Bolt能夠給多個流吐出元組數據。此時須要使用OutputFieldsDeclarer的declareStream方法來聲明多個流並在使用[OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)的emit方法時指定給哪一個流吐數據。
當你聲明瞭一個Bolt的輸入流,也就訂閱了另一個組件的某個特定的輸出流。若是但願訂閱另外一個組件的全部流,須要單獨挨個訂閱。InputDeclarer有語法糖來訂閱ID爲默認值的流。例如declarer.shuffleGrouping("redBolt")訂閱了redBolt組件上的默認流,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)是相同的。
在Bolt中最主要的函數是execute函數,它使用一個新的元組看成輸入。Bolt使用OutputCollector對象來吐出新的元組。Bolts必須爲處理的每一個元組調用OutputCollector的ack方法以便於Storm知道元組何時被各個Bolt處理完了(最終就能夠確認Spout吐出的某個元組處理完了)。一般處理一個輸入的元組時,會基於這個元組吐出零個或者多個元組,而後確認(ack)輸入的元組處理完了,Storm提供了IBasicBolt接口來自動完成確認。
必須注意OutputCollector不是線程安全的,因此全部的吐數據(emit)、確認(ack)、通知失敗(fail)必須發生在同一個線程裏。更多信息能夠參照問題定位
6) Task
每一個Spout和Bolt會以多個任務(Task)的形式在集羣上運行。每一個任務對應一個執行線程,流分組定義瞭如何從一組任務(同一個Bolt)發送元組到另一組任務(另一個Bolt)上。能夠在調用TopologyBuilder的setSpout和setBolt函數時設置每一個Spout和Bolt的併發數。
7) Component
組件(component)是對Bolt和Spout的統稱
8) Stream Grouping
定義拓撲的時候,一部分工做是指定每一個Bolt應該消費哪些流。流分組定義了一個流在一個消費它的Bolt內的多個任務(task)之間如何分組。流分組跟計算機網絡中的路由功能是相似的,決定了每一個元組在拓撲中的處理路線。
在Storm中有七個內置的流分組策略,你也能夠經過實現CustomStreamGrouping接口來自定義一個流分組策略:
洗牌分組(Shuffle grouping): 隨機分配元組到Bolt的某個任務上,這樣保證同一個Bolt的每一個任務都可以獲得相同數量的元組。
字段分組(Fields grouping): 按照指定的分組字段來進行流的分組。例如,流是用字段「user-id」來分組的,那有着相同「user-id」的元組就會分到同一個任務裏,可是有不一樣「user-id」的元組就會分到不一樣的任務裏。這是一種很是重要的分組方式,經過這種流分組方式,咱們就能夠作到讓Storm產出的消息在這個」user-id」級別是嚴格有序的,這對一些對時序敏感的應用(例如,計費系統)是很是重要的。
Partial Key grouping: 跟字段分組同樣,流也是用指定的分組字段進行分組的,可是在多個下游Bolt之間是有負載均衡的,這樣當輸入數據有傾斜時能夠更好的利用資源。這篇論文很好的解釋了這是如何工做的,有哪些優點。
All grouping: 流會複製給Bolt的全部任務。當心使用這種分組方式。
Global grouping: 整個流會分配給Bolt的一個任務。具體一點,會分配給有最小ID的任務。
不分組(None grouping): 說明不關心流是如何分組的。目前,None grouping等價於洗牌分組。
Direct grouping:一種特殊的分組。對於這樣分組的流,元組的生產者決定消費者的哪一個任務會接收處理這個元組。只能在聲明作直連的流(direct streams)上聲明Direct groupings分組方式。只能經過使用emitDirect系列函數來吐元組給直連流。一個Bolt能夠經過提供的TopologyContext來得到消費者的任務ID,也能夠經過OutputCollector對象的emit函數(會返回元組被髮送到的任務的ID)來跟蹤消費者的任務ID。
Local or shuffle grouping:若是目標Bolt在同一個worker進程裏有一個或多個任務,元組就會經過洗牌的方式分配到這些同一個進程內的任務裏。不然,就跟普通的洗牌分組同樣。
9) Reliability
Storm保證了拓撲中Spout產生的每一個元組都會被處理。Storm是經過跟蹤每一個Spout所產生的全部元組構成的樹形結構並得知這棵樹什麼時候被完整地處理來達到可靠性。每一個拓撲對這些樹形結構都有一個關聯的「消息超時」。若是在這個超時時間裏Storm檢測到Spout產生的一個元組沒有被成功處理完,那Spout的這個元組就處理失敗了,後續會從新處理一遍。
爲了發揮Storm的可靠性,須要你在建立一個元組樹中的一條邊時告訴Storm,也須要在處理完每一個元組以後告訴Storm。這些都是經過Bolt吐元組數據用的OutputCollector對象來完成的。標記是在emit函數裏完成,完成一個元組後須要使用Ack函數來告訴Storm。
10) Workers
拓撲以一個或多個Worker進程的方式運行。每一個Worker進程是一個物理的Java虛擬機,執行拓撲的一部分任務。例如,若是拓撲的併發設置成了300,分配了50個Worker,那麼每一個Worker執行6個任務(做爲Worker內部的線程)。Storm會盡可能把全部的任務均分到全部的Worker上。
1) 主節點(Nimbus):
在分佈式系統中,調度服務很是重要,它的設計,會直接關係到系統的運行效率,錯誤恢復(fail over),故障檢測(error detection)和水平擴展(scale)的能力。
集羣上任務(task)的調度由一個Master節點來負責。這臺機器上運行的Nimbus進程負責任務的調度。另一個進程是Storm UI,能夠界面上查看集羣和全部的拓撲的運行狀態。
2) 從節點(Supervisor)
Storm集羣上有多個從節點,他們從Nimbus上下載拓撲的代碼,而後去真正執行。Slave上的Supervisor進程是用來監督和管理實際運行業務代碼的進程。在Storm 0.9以後,又多了一個進程Logviewer,能夠用Storm UI來查看Slave節點上的log文件。
3) 協調服務Zookeeper:
ZooKeeper在Storm上不是用來作消息傳輸用的,而是用來提供協調服務(coordination service),同時存儲拓撲的狀態和統計數據。
l Supervisor,Nimbus和worker都在ZooKeeper留下約定好的信息。例如Supervisor啓動時,會在ZooKeeper上註冊,Nimbus就能夠發現Supervisor;Supervisor在ZooKeeper上留下心跳信息,Nimbus經過這些心跳信息來對Supervisor進行健康檢測,檢測出壞節點
l 因爲Storm組件(component)的狀態信息存儲在ZooKeeper上,因此Storm組件就能夠無狀態,能夠 kill -9來殺死
例如:Supervisors/Nimbus的重啓不影響正在運行中的拓撲,由於狀態都在ZooKeeper上,從ZooKeeper上從新加載一下就行了
l 用來作心跳
Worker經過ZooKeeper把孩子executor的狀況以心跳的形式彙報給Nimbus
Supervisor進程經過ZK把本身的狀態也以心跳的形式彙報給Nimbua
l 存儲最近任務的錯誤狀況(拓撲中止時會刪除)
4) 進程Worker
運行具體處理組件邏輯的進程,一個Topology可能會在一個或者多個worker裏面執行,每一個worker是一個物理JVM而且執行整個Topology的一部分
例如:對於並行度是300的topology來講,若是咱們使用50個工做進程來執行,那麼每一個工做進程會處理其中的6個tasks,Storm會盡可能均勻的工做分配給全部的worker
5) Task
Worker中的每個spout/bolt的線程稱爲一個task,每個spout和bolt會被看成不少task在整個集羣裏執行,每個executor對應到一個線程,在這個線程上運行多個task,Stream Grouping則是定義怎麼從一堆task發射tuple到另一堆task,能夠調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)
Storm的容錯機制包括架構容錯和數據容錯。
1) 架構容錯:
Nimbus和Supervisor進程被設計成快速失敗(fail fast)的(當遇到異常的狀況,進程就會掛掉)而且是無狀態的(狀態都保存在Zookeeper或者在磁盤上)。
最重要的是,worker進程不會由於Nimbus或者Supervisor掛掉而受影響。這跟Hadoop是不同的,當JobTracker掛掉,全部的任務都會沒了。
當Nimbus掛掉會怎樣?
若是Nimbus是以推薦的方式處於進程監管(例如經過supervisord)之下,那它會被重啓,不會有任何影響。
不然當Nimbus掛掉後:
l 已經存在的拓撲能夠繼續正常運行,可是不能提交新拓撲
l 正在運行的worker進程仍然能夠繼續工做。並且當worker掛掉,supervisor會一直重啓worker。
l 失敗的任務不會被分配到其餘機器(是Nimbus的職責)上了
當一個Supervisor(slave節點)掛掉會怎樣?
若是Supervisor是以推薦的方式處於進程監管(例如經過(supervisord)[supervisord.org/])之下,那它會被重啓,不會有任何影響
不然當Supervisor掛掉:分配到這臺機器的全部任務(task)會超時,Nimbus會把這些任務(task)從新分配給其餘機器。
當一個worker掛掉會怎麼樣?
當一個worker掛掉,supervisor會重啓它。若是啓動一直失敗那麼此時worker也就不能和Nimbus保持心跳了,Nimbus會從新分配worker到其餘機器。
Nimbus算是一個單點故障嗎?
若是Nimbus節點掛掉,worker進程仍然能夠繼續工做。並且當worker掛掉,supervisor會一直重啓worker。可是,沒有了Nimbus,當須要的時候(若是worker機器掛掉了)worker就不能被從新分配到其餘機器了。
因此答案是,Nimbus在「某種程度」上屬於單點故障的。在實際中,這種狀況沒什麼大不了的,由於當Nimbus進程掛掉,不會有災難性的事情發生
2) 數據容錯:
Storm中的每個Topology中都包含有一個Acker組件。 Acker組件的任務就是跟蹤從某個task中的Spout流出的每個messageId所綁定的Tuple樹中的全部Tuple的處理狀況。若是在用戶設置的最大超時時間(timetout 能夠經過 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來指定)內這些Tuple沒有被徹底處理,那麼Acker會告訴Spout該消息處理失敗,相反則會告知Spout該消息處理成功,它會分別調用Spout中的fail和ack方法。
實現一個拓撲包括一個spout和兩個bolt。Spout發送單詞。每一個bolt在輸入數據的尾部追加字符串「!!!」。三個節點排成一條線:spout發射給首個bolt,而後,這個bolt再發射給第二個bolt。若是spout發射元組「bob」和「john」,而後,第二個bolt將發射元組「bob!!!!!!」和「john!!!!!!」。
1) 其中Topology代碼以下,定義整個網絡拓撲圖:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); |
2) Spout實現:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } |
3) Bolt實現:
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
1) Config.TOPOLOGY_WORKERS:
這個設置用多少個工做進程來執行這個topology。好比,若是你把它設置成25, 那麼集羣裏面一共會有25個java進程來執行這個topology的全部task。若是你的這個topology裏面全部組件加起來一共有150的並行度,那麼每一個進程裏面會有6個線程(150 / 25 = 6)。
2) Config.TOPOLOGY_ACKERS:
這個配置設置acker任務的並行度。默認的acker任務並行度爲1,當系統中有大量的消息時,應該適當提升acker任務的併發度。設置爲0,經過此方法,當Spout發送一個消息的時候,它的ack方法將馬上被調用;
3) Config.TOPOLOGY_MAX_SPOUT_PENDING:
這個設置一個spout task上面最多有多少個沒有處理的tuple(沒有ack/failed)回覆, 咱們推薦你設置這個配置,以防止tuple隊列爆掉。
4) Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:
這個配置storm的tuple的超時時間 – 超過這個時間的tuple被認爲處理失敗了。這個設置的默認設置是30秒
下一篇,我會介紹推薦系統,也就是大數據的現實應用。
如何用4個月學會Hadoop開發並找到年薪25萬工做?
免費分享一套17年最新Hadoop大數據教程和100道Hadoop大數據必會面試題。
由於連接常常被和諧,須要的朋友請加微信 ganshiyun666 來獲取最新下載連接,註明「51CTO」
教程已幫助300+人成功轉型Hadoop開發,90%起薪超過20K,工資比以前翻了一倍。
百度Hadoop核心架構師親自錄製
內容包括0基礎入門、Hadoop生態系統、真實商業項目實戰3大部分。其中商業案例可讓你接觸真實的生產環境,訓練本身的開發能力。