摘要:隨着數據體積的愈來愈大,實時處理成爲了許多機構須要面對的首要挑戰。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上結合了汽車超速監視,爲咱們演示了使用Storm進行實時大數據分析。CSDN在此編譯、整理。html
簡單和明瞭,Storm讓大數據分析變得輕鬆加愉快。java
當今世界,公司的平常運營常常會生成TB級別的數據。數據來源囊括了互聯網裝置能夠捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中建立的數據。考慮到數據的生成量,實時處理成爲了許多機構須要面對的首要挑戰。咱們常常用的一個很是有效的開源實時計算工具就是Storm —— Twitter開發,一般被比做「實時的Hadoop」。然而Storm遠比Hadoop來的簡單,由於用它處理大數據不會帶來新老技術的交替。node
Shruthi Kumar、Siddharth Patankar共同效力於Infosys,分別從事技術分析和研發工做。本文詳述了Storm的使用方法,例子中的項目名稱爲「超速報警系統(Speeding Alert System)」。咱們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger並把相關的數據存入數據庫。mysql
1. Storm是什麼git
全量數據處理使用的大可能是鼎鼎大名的hadoop或者hive,做爲一個批處理系統,hadoop以其吞吐量大、自動容錯等優勢,在海量數據處理上獲得了普遍的使用。github
Hadoop下的Map/Reduce框架對於數據的處理流程是:redis
一、 將要處理的數據上傳到Hadoop的文件系統HDFS中。sql
二、 Map階段數據庫
a) Master對Map的預處理:對於大量的數據進行切分,劃分爲M個16~64M的數據分片(可經過參數自定義分片大小)設計模式
b) 調用Mapper函數:Master爲Worker分配Map任務,每一個分片都對應一個Worker進行處理。各個Worker讀取並調用用戶定義的Mapper函數 處理數據,並將結果存入HDFS,返回存儲位置給Master。
一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。並將位置信息彙報給Master。
三、 Reduce階段
a) Master對Reduce的預處理:Master爲Worker分配Reduce任務,他會將全部Mapper產生的數據進行映射,將相同key的任務分配給某個Worker。
b) 調用Reduce函數:各個Worker將分配到的數據集進行排序(使用工具類Merg),並調用用戶自定義的Reduce函數,並將結果寫入HDFS。
每一個Worker的Reduce任務完成後,都會在HDFS中生成一個輸出文件。Hadoop並不將這些文件合併,由於這些文件每每會做爲另外一個Map/reduce程序的輸入。
以上的流程,粗略歸納,就是從HDFS中獲取數據,將其按照大小分片,進行分佈式處理,最終輸出結果。從流程來看,Hadoop框架進行數據處理有如下要求:
一、 數據已經存在在HDFS當中。
二、 數據間是少關聯的。各個任務執行器在執行負責的數據時,無需考慮對其餘數據的影響,數據之間應儘量是無聯繫、不會影響的。
使用Hadoop,適合大批量的數據處理,這是他所擅長的。因爲基於Map/Reduce這種單級的數據處理模型進行,所以,若是數據間的關聯繫較大,須要進行數據的多級交互處理(某個階段的處理數據依賴於上一個階段),須要進行屢次map/reduce。又因爲map/reduce每次執行都須要遍歷整個數據集,對於數據的實時計算並不合適,因而有了storm。
對比Hadoop的批處理,Storm是個實時的、分佈式以及具有高容錯的計算系統。同Hadoop同樣Storm也能夠處理大批量的數據,然而Storm在保證高可靠性的前提下還可讓處理進行的更加實時;也就是說,全部的信息都會被處理。Storm一樣還具有容錯和分佈計算這些特性,這就讓Storm能夠擴展到不一樣的機器上進行大批量的數據處理。他一樣還有如下的這些特性:
在線實時流處理模型
對於處理大批量數據的Map/reduce程序,在任務完成以後就中止了,但Storm是用於實時計算的,因此,相應的處理程序會一直執行(等待任務,有任務則執行)直至手動中止。
對於Storm,他是實時處理模型,與hadoop的不一樣是,他是針對在線業務而存在的計算平臺,如統計某用戶的交易量、生成爲某個用戶的推薦列表等實時性高的需求。他是一個「流處理」框架。何謂流處理?storm將數據以Stream的方式,並按照Topology的順序,依次處理並最終生成結果。
固然爲了更好的理解文章,你首先須要安裝和設置Storm。須要經過如下幾個簡單的步驟:
Storm集羣和Hadoop集羣表面上看很相似。可是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這二者之間是很是不同的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。
Storm集羣主要由一個主節點(Nimbus後臺程序)和一羣工做節點(worker node)Supervisor的節點組成,經過 Zookeeper進行協調。Nimbus相似Hadoop裏面的JobTracker。Nimbus負責在集羣裏面分發代碼,分配計算任務給機器, 而且監控狀態。
每個工做節點上面運行一個叫作Supervisor的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個topology的一個子集;一個運行的topology由運行在不少機器上的不少工做進程組成。
一、 Nimbus主節點:
主節點一般運行一個後臺程序 —— Nimbus,用於響應分佈在集羣中的節點,分配任務和監測故障。這個很相似於Hadoop中的Job Tracker。
二、Supervisor工做節點:
工做節點一樣會運行一個後臺程序 —— Supervisor,用於收聽工做指派並基於要求運行工做進程。每一個工做節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則經過Zookeeper系統或者集羣。
三、Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的「topology」。topology則是一組由Spouts(數據源)和Bolts(數據操做)經過Stream Groupings進行鏈接的圖。下面對出現的術語進行更深入的解析。
四、Worker:
運行具體處理組件邏輯的進程。
五、Task:
worker中每個spout/bolt的線程稱爲一個task. 在storm0.8以後,task再也不與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱爲executor。
六、Topology(拓撲):
storm中運行的一個實時應用程序,由於各個組件間的消息流動造成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 經過stream groupings將圖中的spouts和bolts鏈接起來,以下圖:
一個topology會一直運行直到你手動kill掉,Storm自動從新分配執行失敗的任務, 而且Storm能夠保證你不會有數據丟失(若是開啓了高可靠性的話)。若是一些機器意外停機它上面的全部任務會被轉移到其餘機器上。
運行一個topology很簡單。首先,把你全部的代碼以及所依賴的jar打進一個jar包。而後運行相似下面的這個命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology而且把它提交給Nimbus。storm jar負責鏈接到Nimbus而且上傳jar包。
Topology的定義是一個Thrift結構,而且Nimbus就是一個Thrift服務, 你能夠提交由任何語言建立的topology。上面的方面是用JVM-based語言提交的最簡單的方法。
七、Spout:
消息源spout是Storm裏面一個topology裏面的消息生產者。簡而言之,Spout歷來源處讀取數據並放入topology。Spout分紅可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。
消息源能夠發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,而後使用SpoutOutputCollector來發射指定的stream。
而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,若是沒有新tuple發射則會簡單的返回。
要注意的是nextTuple方法不能阻塞,由於storm在同一個線程上面調用全部消息源spout的方法。
另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,不然調用fail。storm只對可靠的spout調用ack和fail。
八、Bolt:
Topology中全部的處理都由Bolt完成。即全部的消息處理邏輯被封裝在bolts裏面。Bolt能夠完成任何事,好比:鏈接的過濾、聚合、訪問文件/數據庫、等等。
Bolt從Spout中接收數據並進行處理,若是遇到複雜流的處理也可能將tuple發送給另外一個Bolt進行處理。即須要通過不少blots。好比算出一堆圖片裏面被轉發最多的圖片就至少須要兩步:第一步算出每一個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(若是要把這個過程作得更具備擴展性那麼可能須要更多的步驟)。
Bolts能夠發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。
而Bolt中最重要的方法是execute(),以新的tuple做爲參數接收。無論是Spout仍是Bolt,若是將tuple發射成多個流,這些流均可以經過declareStream()來聲明。
bolts使用OutputCollector來發射tuple,bolts必需要爲它處理的每個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 通常的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 而後調用ack通知storm本身已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。
九、Tuple:
一次消息傳遞的基本單元。原本應該是一個key-value的map,可是因爲各個組件間傳遞的tuple的字段名稱已經事先定義好,因此tuple中只要按序填入各個value就好了,因此就是一個value list.
十、Stream:
源源不斷傳遞的tuple就組成了stream。消息流stream是storm裏的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分佈式的方式並行地建立和處理。經過對stream中tuple序列中每一個字段命名來定義stream。在默認的狀況下,tuple的字段類型能夠是:integer,long,short, byte,string,double,float,boolean和byte array。你也能夠自定義類型(只要實現相應的序列化器)。
每一個消息流在定義的時候會被分配給一個id,由於單向消息流使用的至關廣泛, OutputFieldsDeclarer定義了一些方法讓你能夠定義一個stream而不用指定這個id。在這種狀況下這個stream會分配個值爲‘default’默認的id 。
Storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現spout和bolt提供的接口來處理你的業務邏輯。
十一、Stream Groupings:
Stream Grouping定義了一個流在Bolt任務間該如何被切分。這裏有Storm提供的6個Stream Grouping類型:
1). 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每一個任務得到相等數量的tuple。
2). 字段分組(Fields grouping):根據指定字段分割數據流,並分組。例如,根據「user-id」字段,相同「user-id」的元組老是分發到同一個任務,不一樣「user-id」的元組可能分發到不一樣的任務。
3). 所有分組(All grouping):tuple被複制到bolt的全部任務。這種類型須要謹慎使用。
4). 全局分組(Global grouping):所有流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
5). 無分組(None grouping):你不須要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(若是可能)。
6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪一個元組處理者任務接收。
固然還能夠實現CustomStreamGroupimg接口來定製本身須要的分組。
storm 和hadoop的對比來了解storm中的基本概念。
Hadoop | Storm | |
系統角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
應用名稱 | Job | Topology |
組件接口 | Mapper/Reducer | Spout/Bolt |
3. Storm應用場景
Storm 與其餘大數據解決方案的不一樣之處在於它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 並分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。Storm 支持建立拓撲結構來轉換沒有終點的數據流。不一樣於 Hadoop 做業,這些轉換從不中止,它們會持續處理到達的數據。
Twitter列舉了Storm的三大類應用:
1. 信息流處理{Stream processing}
Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm能夠用來處理源源不斷流進來的消息,處理以後將結果寫入到某個存儲中去。
2. 連續計算{Continuous computation}
Storm可進行連續查詢並把結果即時反饋給客戶端。好比把Twitter上的熱門話題發送到瀏覽器中。
3. 分佈式遠程程序調用{Distributed RPC}
Storm可用來並行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分佈函數,當它收到一條調用信息後,會對查詢進行計算,並返回查詢結果。舉個例子Distributed RPC能夠作並行搜索或者處理大集合的數據。
經過配置drpc服務器,將storm的topology發佈爲drpc服務。客戶端程序能夠調用drpc服務將數據發送到storm集羣中,並接收處理結果的反饋。這種方式須要drpc服務器進行轉發,其中drpc服務器底層經過thrift實現。適合的業務場景主要是實時計算。而且擴展性良好,能夠增長每一個節點的工做worker數量來動態擴展。
4. 項目實施,構建Topology
當下狀況咱們須要給Spout和Bolt設計一種可以處理大量數據(日誌文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日誌文件而且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不只從現有的文件中讀入數據,同時還監視着新文件。文件一旦被修改Spout會讀入新的版本而且覆蓋以前的tuple(能夠被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就能夠發現全部可能超臨界的記錄。
下一節將對用例進行詳細介紹。
臨界分析
這一節,將主要聚焦於臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。
Listing One顯示了咱們將使用的一個類型日誌,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。
AB 123 | 60 | North city |
BC 123 | 70 | South city |
CD 234 | 40 | South city |
DE 123 | 40 | East city |
EF 123 | 90 | South city |
GH 123 | 50 | West city |
這裏將建立一個對應的XML文件,這將包含引入數據的模式。這個XML將用於日誌文件的解析。XML的設計模式和對應的說明請見下表。
XML文件和日誌文件都存放在Spout能夠隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。
Figure 1:Storm中創建的topology,用以實現數據實時處理
如圖所示:FilelistenerSpout接收輸入日誌並進行逐行的讀入,接着將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,而後由DBWriterBolt存入給數據庫。下面將對這個過程的實現進行詳細的解析。
Spout的實現
Spout以日誌文件和XML描述文件做爲接收對象。XML文件包含了與日誌一致的設計模式。不妨設想一下一個示例日誌文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)
Figure2:數據從日誌文件到Spout的流程圖
Listing Two顯示了tuple對應的XML,其中指定了字段、將日誌文件切割成字段的定界符以及字段的類型。XML文件以及數據都被保存到Spout指定的路徑。
Listing Two:用以描述日誌文件的XML文件。
經過構造函數及它的參數Directory、PathSpout和TupleInfo對象建立Spout對象。TupleInfo儲存了日誌文件的字段、定界符、字段的類型這些很必要的信息。這個對象經過XSTream序列化XML時創建。
Spout的實現步驟:
Spout的具體編碼在Listing Three中顯示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。
declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就能夠用相似的方法將tuple譯碼。Spout持續對日誌文件的數據的變動進行監聽,一旦有添加Spout就會進行讀入而且發送給Bolt進行處理。
Bolt的實現
Spout的輸出結果將給予Bolt進行更深一步的處理。通過對用例的思考,咱們的topology中須要如Figure 3中的兩個Bolt。
Figure 3:Spout到Bolt的數據流程。
ThresholdCalculatorBolt
Spout將tuple發出,由ThresholdCalculatorBolt接收並進行臨界值處理。在這裏,它將接收好幾項輸入進行檢查;分別是:
臨界值檢查
Listing Four中的類,定義用來保存這些值。
Listing Four:ThresholdInfo類
Listing Five:臨界值檢測代碼段
經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在咱們的用例中是DBWriterBolt。
DBWriterBolt
通過處理的tuple必須被持久化以便於觸發tigger或者更深層次的使用。DBWiterBolt作了這個持久化的工做並把tuple存入了數據庫。表的創建由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。
Listing Six:建表編碼。
數據分批次的插入數據庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不一樣類型輸入的解析。
Listing Seven:數據插入的代碼部分。
一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會創建topology並準備執行。下面就來看一下執行步驟。
在本地集羣上運行和測試topology
Listing Eight:創建和執行topology。
topology被創建後將被提交到本地集羣。一旦topology被提交,除非被取締或者集羣關閉,它將一直保持運行不須要作任何的修改。這也是Storm的另外一大特點之一。
這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將能夠輕鬆的使用Storm進行實時處理。若是你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。
5. storm常見問題解答
1、我有一個數據文件,或者我有一個系統裏面有數據,怎麼導入storm作計算?
你須要實現一個Spout,Spout負責將數據emit到storm系統裏,交給bolts計算。怎麼實現spout能夠參考官方的kestrel spout實現:
https://github.com/nathanmarz/storm-kestrel
若是你的數據源不支持事務性消費,那麼就沒法獲得storm提供的可靠處理的保證,也不必實現ISpout接口中的ack和fail方法。
2、Storm爲了保證tuple的可靠處理,須要保存tuple信息,這會不會致使內存OOM?
Storm爲了保證tuple的可靠處理,acker會保存該節點建立的tuple id的xor值,這稱爲ack value,那麼每ack一次,就將tuple id和ack value作異或(xor)。當全部產生的tuple都被ack的時候, ack value必定爲0。這是個很簡單的策略,對於每個tuple也只要佔用約20個字節的內存。對於100萬tuple,也才20M左右。關於可靠處理看這個:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
3、Storm計算後的結果保存在哪裏?能夠保存在外部存儲嗎?
Storm不處理計算結果的保存,這是應用代碼須要負責的事情,若是數據不大,你能夠簡單地保存在內存裏,也能夠每次都更新數據庫,也能夠採用NoSQL存儲。storm並無像s4那樣提供一個Persist API,根據時間或者容量來作存儲輸出。這部分事情徹底交給用戶。
數據存儲以後的展示,也是你須要本身處理的,storm UI只提供對topology的監控和統計。
4、Storm怎麼處理重複的tuple?
由於Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail並從新發送該tuple,那麼就會有tuple重複計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。由於實時計算一般並不要求很高的精確度,後續的批處理計算會更正實時計算的偏差。
(2)使用第三方集中存儲來過濾,好比利用mysql,memcached或者redis根據邏輯主鍵來去重。
(3)使用bloom filter作過濾,簡單高效。
5、Storm的動態增刪節點
我在storm和s4裏比較裏談到的動態增刪節點,是指storm能夠動態地添加和減小supervisor節點。對於減小節點來講,被移除的supervisor上的worker會被nimbus從新負載均衡到其餘supervisor節點上。在storm 0.6.1之前的版本,增長supervisor節點不會影響現有的topology,也就是現有的topology不會從新負載均衡到新的節點上,在擴展集羣的時候很不方便,須要從新提交topology。所以我在storm的郵件列表裏提了這個問題,storm的開發者nathanmarz建立了一個issue 54並在0.6.1提供了rebalance命令來讓正在運行的topology從新負載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的變動:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246
storm並不提供機制來動態調整worker和task數目。
6、Storm UI裏spout統計的complete latency的具體含義是什麼?爲何emit的數目會是acked的兩倍?
這個事實上是storm郵件列表裏的一個問題。Storm做者marz的解答: