1.1簡介數據庫
流處理是針對流式數據的實時計算。它具備實時持續、來源衆多、不關注存儲等特色。典型的應用場景有互聯網業務的日誌數據處理、金融領域的銀行股票數據處理等。編程
1.2 處理流程網絡
傳統數據處理流程是用戶發起查詢請求,請求被翻譯成數據庫查詢語句,最終經過數據戶將查詢結果返回給用戶。此時用戶是主動的,DBMS是被動的併發
流處理數據處理流程是數據實時採集、實時計算、實時接受查詢服務。用戶接收流處理後的結果。此時用戶是被動的,DBMS是主動的。框架
傳統數據處理流程機器學習
流處理數據處理流程編程語言
Storm (event processor)是一個分佈式計算框架,主要由Clojure編程語言編寫。最初是由Nathan Marz及其團隊建立於BackType,該項目在被Twitter取得後開源。它經過使用者定義的「噴嘴(spouts)」和「閥門(bolts)」來定義數據源和相應的操做來實現批量、分佈式處理流式數據。最第一版本發佈於2011年9月17日,目前最新穩定版本爲2016年8月10日的1.0.2版本。分佈式
Storm應用被設計稱爲一個拓撲結構,該拓撲結構是一個有向無環圖(Directed Acyclic Graph,DAG)。有向無環圖的頂點是「噴嘴」和「閥門」,邊是數據流。一個簡單的拓撲結構如圖2-1所示:工具
圖2-1 Storm流式計算拓撲oop
下面對Storm中的重要概念進行簡要介紹。
Storm對於流Stream的抽象:流是一個不間斷的無界的連續Tuple(元組,是元素有序列表)。Stream消息流,是一個沒有邊界的Tuple序列,這些Tuples會被以一種分佈式的方式並行地建立和處理。如圖2-2所示。
圖2-2 Streams抽象
Storm認爲每一個Stream都有一個源頭,它將這個源頭抽象爲Spouts。Spouts流數據源,它會從外部讀取流數據併發出Tuple。如圖2-3所示。
圖2-3 Spouts
Storm將流的中間狀態轉換抽象爲Bolts,Bolts能夠處理Tuples,同時它也能夠發送新的流給其餘Bolts使用。Bolts消息處理者,全部的消息處理邏輯被封裝在Bolts裏面,處理輸入的數據流併產生輸出的新數據流,可執行過濾,聚合,查詢數據庫等操做。如圖2-4所示。
圖2-4 Bolts
爲了提升效率,在Spout源接上多個Bolts處理器。Storm將這樣的有向無環圖抽象爲Topology(拓撲)。Topology是Storm中最高層次的抽象概念,它能夠被提交到Storm集羣執行,一個拓撲就是一個流轉換圖。圖中的邊表示Bolt訂閱了哪些流。當Spout或者Bolt發送元組到流時,它就發送元組到每一個訂閱了該流的Bolt上進行處理。如圖2-5所示:
2-5 Topology
定義一個 Topology 的其中一步是定義每一個Bolt 接收什麼樣的流做爲輸入。Stream Grouping 就是用來定義一個Stream 應該如何分配給Bolts 上面的多個Tasks。Storm 裏面有6 種類型的Stream Grouping。
● Shuffle Grouping:隨機分組,隨機派發Stream 裏面的tuple,保證每一個Bolt 接收到的tuple 數目相同。
● Fields Grouping:按字段分組,好比按userid 來分組,具備一樣userid 的tuple 會被分到相同的Bolts,而不一樣的userid 則會被分配到不一樣的Bolts。
● All Grouping:廣播發送,對於每個tuple,全部的Bolts 都會收到。
● Global Grouping: 全局分組,這個tuple 被分配到Storm 中一個Bolt 的其中一個Task。再具體一點就是分配給id 值最低的那個Task。
● Non Grouping:不分組,這個分組的意思是Stream 不關心到底誰會收到它的tuple。目前這種分組和Shuffle Grouping 是同樣的效果,有一點不一樣的是Storm 會把這個Bolt 放到此Bolt 的訂閱者同一個線程裏面去執行。
● Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪一個Task 處理這個消息。只有被聲明爲Direct Stream 的消息流能夠聲明這種分組方法。並且這種消息tuple 必須使用emitDirect 方法來發送。消息處理者能夠經過TopologyContext 來獲取處理它的消息的taskid(OutputCollector.emit 方法也會返回taskid)。
Storm 能夠保證每一個消息tuple 會被Topology 完整地處理,Storm 會追蹤每一個從Spout 發送出的消息tuple 在後續處理過程當中產生的消息樹(Bolt 接收到的消息完成處理後又能夠產生0 個或多個消息,這樣反覆進行下去,就會造成一棵消息樹),Storm 會確保這棵消息樹被成功地執行。Storm 對每一個消息都設置了一個超時時間,若是在設定的時間內,Storm 沒有檢測到某個從Spout 發送的tuple 是否執行成功,Storm 會假設該tuple 執行失敗,所以會從新發送該tuple。這樣就保證了每條消息都被正確地完整地執行。
Storm 保證消息的可靠性是經過在發送一個tuple 和處理完一個tuple 的時候都須要像Storm 同樣返回確認信息來實現的,這一切是由OutputCollector 來完成的。經過它的emit 方法來通知一個新的tuple 產生,經過它的ack 方法通知一個tuple 處理完成。
在 Storm 集羣上,每一個Spout 和Bolt 都是由不少個Task 組成的,每一個Task對應一個線程,流分組策略就是定義如何從一堆Task 發送tuple 到另外一堆Task。在實現本身的Topology 時能夠調用TopologyBuilder.setSpout() 和TopBuilder.setBolt()方法來設置並行度,也就是有多少個Task。
一個 Topology 可能會在一個或者多個工做進程裏面執行,每一個工做進程執行整個Topology 的一部分。好比,對於並行度是300 的Topology 來講,若是咱們使用50 個工做進程來執行,那麼每一個工做進程會處理其中的6 個Tasks(其實就是每一個工做進程裏面分配6 個線程)。Storm 會盡可能均勻地把工做分配給全部的工做進程。
在 Storm 裏面能夠經過配置大量的參數來調整Nimbus、Supervisor 以及正在運行的Topology 的行爲,一些配置是系統級別的,一些配置是Topology 級別的。全部有默認值的配置的默認配置是配置在default.xml 裏面的,用戶能夠經過定義一個storm.xml 在classpath 裏來覆蓋這些默認配置。而且也可使用Storm Submitter 在代碼裏面設置一些Topology 相關的配置信息。固然,這些配置的優先級是default.xml<storm.xml<TOPOLOGY-SPECIFIC 配置。
Storm集羣表面相似Hadoop集羣:
最終會結束,而一個Topology永遠處理消息(或直到kill它)。
發代碼、爲worker分配任務和故障監測。
所在機器的工做,基於Nimbus分配給它的事情來決定啓動或中止工做者進程。
Storm工做流程如圖2-6所示:
圖2-6 Storm工做流程
Storm 在官方網站中列舉了它的幾大關鍵特徵。
● 適用場景廣:Storm 能夠用來處理消息和更新數據庫(消息的流處理),對一個數據量進行持續的查詢並將結果返回給客戶端(連續計算),對於耗費資源的查詢進行並行化處理(分佈式方法調用),Storm 提供的計算原語能夠知足諸如以上所述的大量場景。
● 可伸縮性強:Storm 的可伸縮性可讓Storm 每秒處理的消息量達到很高,如100 萬。實現計算任務的擴展,只須要在集羣中添加機器,而後提升計算任務的並行度設置。Storm 網站上給出了一個具備伸縮性的例子,一個Storm應用在一個包含10 個節點的集羣上每秒處理1 000 000 個消息,其中包括每秒100 屢次的數據庫調用。Storm 使用Apache ZooKeeper 來協調集羣中各類配置的同步,這樣Storm 集羣能夠很容易地進行擴展。
● 保證數據不丟失:實時計算系統的關鍵就是保證數據被正確處理,丟失數據的系統使用場景會很窄,而Storm 能夠保證每一條消息都會被處理,這是Storm 區別於S4(Yahoo 開發的實時計算系統)系統的關鍵特徵。
● 健壯性強:不像Hadoop 集羣很難進行管理,它須要管理人員掌握不少Hadoop 的配置、維護、調優的知識。而Storm 集羣很容易進行管理,容易管理是Storm 的設計目標之一。
● 高容錯:Storm 能夠對消息的處理過程進行容錯處理,若是一條消息在處理過程當中失敗,那麼Storm 會從新安排出錯的處理邏輯。Storm 能夠保證一個處理邏輯永遠運行。
● 語言無關性:Storm 應用不該該只能使用一種編程平臺,Storm 雖然是使用Clojure 語言開發實現,可是,Storm 的處理邏輯和消息處理組件均可以使用任何語言來進行定義,這就是說任何語言的開發者均可以使用Storm。默認支持Clojure、Java、Ruby和Python。要增長對其餘語言的支持,只須要實現一個簡單的額Storm通訊協議便可。
Storm 的容錯分爲以下幾種類型。
(1)工做進程worker 失效:若是一個節點的工做進程worker「死掉」,supervisor 進程會嘗試重啓該worker。若是連續重啓worker 失敗或者worker 不能按期向Nimbus 報告「心跳」,Nimbus 會分配該任務到集羣其餘的節點上執行。
(2)集羣節點失效:若是集羣中某個節點失效,分配給該節點的全部任務會因超時而失敗,Nimbus 會將分配給該節點的全部任務從新分配給集羣中的其餘節點。
(3)Nimbus 或者supervisor 守護進程失敗:Nimbus 和supervisor 都被設計成快速失敗(遇到未知錯誤時迅速自我失敗)和無狀態的(全部的狀態信息都保存在Zookeeper 上或者是磁盤上)。Nimbus 和supervisor 守護進程必須在一些監控工具(例如,daemontools 或者monitor)的輔助下運行,一旦Nimbus 或者supervisor 失敗,能夠馬上重啓它們,整個集羣就好像什麼事情也沒發生。最重要的是,沒有工做進程worker 會由於Nimbus 或supervisor 的失敗而受到影響,Storm 的這個特性和Hadoop 造成了鮮明的對比,若是JobTracker 失效,全部的任務都會失敗。
(4)Nimbus 所在的節點失效:若是Nimbus 守護進程駐留的節點失敗,工做節點上的工做進程worker 會繼續執行計算任務,並且,若是worker 進程失敗,supervisor 進程會在該節點上重啓失敗的worker 任務。可是,沒有Nimbus的影響時,全部worker 任務不會分配到其餘的工做節點機器上,即便該worker所在的機器失效。
Storm 有許多應用領域,包括實時分析、在線機器學習、信息流處理(例如,可使用Storm 處理新的數據和快速更新數據庫)、連續性的計算(例如,使用Storm 連續查詢,而後將結果返回給客戶端,如將微博上的熱門話題轉發給用戶)、分佈式RPC(遠過程調用協議,經過網絡從遠程計算機程序上請求服務)、ETL(Extraction Transformation Loading,數據抽取、轉換和加載)等。
Storm 的處理速度驚人,經測試,每一個節點每秒能夠處理100 萬個數據元組。Storm 可擴展且具備容錯功能,很容易設置和操做。Storm 集成了隊列和數據庫技術,Storm 拓撲網絡經過綜合的方法,將數據流在每一個數據平臺間進行從新分配。