storm原理剖析

爲何用Storm

storm是一個免費、開源的分佈式實時計算框架。它讓你更方便、可靠的處理實時發送的消息。若是你以前瞭解過hadoop,應該知道hadoop能很快速、方便的幫你完成批量數據處理,而storm能夠認爲是實時數據處理領域的hadoop。storm簡單,雖然他是用jvm之上的clojure編寫的,可是一樣支持非jvm語言。html

若是你不知道是否該使用storm,你能夠先看看你有沒有過這些需求:java

  1. 實時數據分析
  2. 在線機器學習
  3. 實時計算
  4. 分佈式rpc框架

若是你有其中某項需求,那麼恭喜你,storm能夠幫到你。storm性能好、可伸縮性強、容錯能力好,而且能保證消息的可靠性。這些特色足以你擁有使用storm的理由。git

介紹

要了解storm,首先須要瞭解這些概念:github

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers

Topologies

Topology

storm中的實時處理的應用會被打包成topology,這個topology由一系列stream(數據流)、spout(數據流生產者)、bolt(數據處理邏輯)組成。相似hadoop中作mapreduce的job,有個區別就是mapreduce job會結束,而topology只要你不手動kill掉,它永遠也不會結束。apache

Streams

Stream是數據流,有無窮無盡的tuple組成,而tuple則包含了用戶發送的具體數據,好比整數、小數、字符串等,也能夠包含自定義的數據類型,前提是你要爲它實現序列化。api

Spouts

Spout是數據流Stream的生產者。一般spout會從外部數據源(kafka等)讀取數據tuple,並將它emit(發送)到topology中。網絡

Spout中最主要的方法是nextTuple。nextTuple一般會生成一個新的tuple,而後emit到topology。因爲storm會在一個線程中調用全部spout的nextTuple方法,因此千萬不要讓這個方法阻塞掉。儘可能保持spout只處理數據的發送,不要讓它處理業務邏輯。數據結構

Bolts

Bolt處理topology中全部的運算、業務邏輯,若是邏輯複雜,一般使用多個bolt也能很好的解決。bolt會訂閱spout或者其餘bolt發送的tuple,而整個應用可能會有多個spout和bolt,他們組成一塊兒就會造成一個圖裝結構,也就是topology。併發

bolt中最主要的方法是execute,它會從訂閱的spout或者bolt獲取tuple,從tuple從取出數據,作響應的邏輯處理,而後生成新的tuple給emit出去。若是這個bolt是topology中最後一個bolt節點,就沒有必要繼續emit,而是本身來處理數據的歸屬。負載均衡

Stream groupings

Stream groupings就是數據流分組,它定義了tuple該如何分發給bolt中不一樣的task。好比,一個topology中有ASpout和訂閱了Apout的Abolt,爲了保持併發量,給Abolt設置了4個task。數據流分組會決定Aspout發送出來的tuple,會怎樣分配到4個task中

目前storm定義了八種不一樣的分組方式:

  1. Shuffle grouping:隨機分組。隨機分配給不一樣的task,保證最後每一個task接受到的tuple數量均等
  2. Fields grouping:按字段分組。好比tuple中存在名爲user-id的字段,那麼全部該字段所在的tuple都會被分配到同一個task上。
  3. Partial Key grouping:部分key分組。同字段分組,惟一的區別是,它會在不一樣的task之間作負載均衡,保證tuple均勻分配。
  4. All grouping:全複製分組。將tuple複製後發給全部訂閱的bolt,這種會致使網絡傳輸量較大,當心使用。
  5. Global grouping:全局分組。將tuple發送給id最小的task。
  6. None grouping:不分組。目前實現上等同隨機分組。
  7. Direct grouping:指向型分組。經過emitDirect(id,tuple)發給指定id的task。
  8. Local or shuffle grouping:本地或者隨機分組。若是同一個work內有目標bolt的task,會在這幾個task中作隨機分發。其餘狀況下,採用隨機分組方式。這種分組實現的目的是減小網絡傳輸,儘可能選擇本地的task作隨機分發,若是沒有再選擇遠程task。

Reliability

storm提供了可靠的和不可靠的實時處理方式,須要本身經過api指定。經過追蹤tuple樹中的消息傳遞,spout能夠保證一旦消息丟失或者傳送超時,就會重發。具體能夠參見後面描述。

Tasks

每一個spout和bolt均可以被分解成多個task,運行在不一樣的線程中,經過併發執行保持高效。對應的api爲TopologyBuilder的setSpout和setBolt方法。

Workers

topology能夠運行在多個worker進程中,每一個worker進程都是一個獨立的jvm,每一個進程裏面運行着不少task

storm如何序列化

Storm採用Kryo做爲序列化框架。默認狀況下,Storm支持基本數據類型, strings, byte arrays, ArrayList, HashMap, HashSet,以及Clojure的集合類型。若是你但願在tuple中存儲自定義數據類型,保證它能在topology中傳遞,你就須要註冊自定義數據類型。

官網提供了兩種註冊方式。

  1. 配置文件
  2. Config對象的registerSerialization方法

任選一種方式,將自定義數據結構註冊進去,就能使用FieldsSerializer來序列化(kryo提供),不然就要本身提供序列化方式。

若是tuple中存儲的數據沒有註冊過,就會採用默認java序列化方案,若是它沒法沒java序列化方案處理,storm會拋出異常。爲了性能考慮,若是存在自定義數據,最好使用storm提供的方案註冊,採用kryo等優秀的序列化方案。不然,java序列化的性能開銷很是大。

storm併發機制

要理解storm的併發機制,首先得理解下面幾個概念:

  1. Workers:每一個woker是topology中獨立的jvm進程
  2. Executors:executors是woker中運行的線程,執行具體task
  3. Tasks:每一個task至關於spout或者bolt實例

併發topology

如圖所示,該topology設置了2個wokers,10個executors,12個task。這些資源會被平均分配。

其中,executors數量不能多於task數量,這樣就保證了每一個executor至少會分配到一個task。默認狀況executors數量等於task數量。

若是用戶但願改變task並行能力,能夠經過改變executors數量來實現。之因此沒有僅僅使用task來表明線程,而是引入executor,主要是考慮到在一個運行的topology中,task數量沒法改變,由於一旦改變,可能致使Fields grouping這種分組方式出現bug。

舉個例子,用戶但願消息A分配到某個task,而且之後都由這個task接收,那採用Fields grouping能夠將獲取消息id,取n = hash(id) % task數量,n就是A要去的task編號。若是運行期間能夠修改task數量,那麼n可能會發生變化,帶來的影響就是A會跑去另一個task。很明顯,這是不容許的。

隨着executor的引入,用戶能夠根據本身的需求,在topology運行時調整task的並行能力,更加自由靈活。(不過jstorm取消了executor這個語義,轉而採用task來表明任務和線程,主要考慮到storm這種模型的實現複雜性與收益微小性,每每大部分人採用默認配置)

storm可靠性保證

storm提供了三個級別的消息處理保障機制:

  1. 儘可能保障消息發送
  2. 保障消息至少發送一次
  3. 保障消息發送而且僅發一次

儘可能保障消息發送

這是最簡單的模式,就是發送消息,丟了就不作處理。

保障消息至少發送一次

storm提供了一種api保證每一個tuple都會被完整的處理。要保證storm的消息可靠性,就得保證spout和bolt兩個角色的可靠性。

  • spout可靠性:在nextTuple中經過SpoutOutputCollector來emit消息的時候加上消息id,如
_collector.emit(new Values("field1", "field2", 3) , msgId);

加上msgId就將emit出去的tuple打上了標識,一旦tuple在timeout(默認爲30s)時間範圍內被徹底處理,系統就會調用ack(msgId),表示表示已經被完整的處理過了,不然就調用fail(msgId)作重發處理。(ack和fail方法都得本身實現,一般ack回覆數據來源消息已經被處理,fail作重發操做

  • bolt可靠性:一般bolt都會讀入tuple,接着取出數據,最後emit新的tuple。bolt要保證可靠性,首先 須要在emit時錨定讀入的tuple和新生成的tuple。

    _collector.emit(tuple, new Values(word));

    接着根據消息處理成功或者失敗的狀況分別作ack或者fail調用。

    //_collector是SpoutOutputCollector的對象
    	if(success){
    		_collector.ack(tuple);
    	}else{
    		_collector.ack(tuple);
    	}

那麼,storm是怎樣保障消息的可靠性的呢?

要了解實現原理,首先得有tuple樹的背景知識,經過下面一幅圖來看看tuple樹的處理流程。

tuple樹

假設圖中A是Aspout發送的tuple,BC是Bbolt發送的tuple,DE是Cbolt發送的tuple,Bbolt訂閱Aspout,Cbolt訂閱Bbolt。

  1. 當Aspout emit A時,會將A加入tuple樹,當Bbolt接收到A,錨定新生成的tuple B和C時,會將B和C加入tuple樹,最後ack(A),因而A在tuple樹中標記爲已處理。目前的狀態就是上圖左邊所示。
  2. 當Cbolt接收到C之後,通過處理,會錨定新生成的D和E,接着ack(C),因而C也被標記爲已處理。目前的狀態就是上圖右邊所示。

storm的topology中會運行名爲acker的任務,acker會監控這個tuple樹,當發現tuple樹中全部的tuple都被完整的處理過了,而且沒有新的tuple生成,就會調用spout的ack方法,表示消息被成功處理。不然就調用fail方法。

當topology越大,tuple樹也就越大,完整的在內存生成這個tuple樹並跟蹤它是不現實的。storm經過一個很巧妙的方式,來實現對tuple樹的監控。 acker存儲了一個64位的數字,名爲ack val。當新加入一個tuple時,會生成一個64位隨機數字做爲id,讓tuple被emit時,會將更新ack val爲ack val xor id。當ack(tuple)時,也會按一樣的方法更新ack val。根據xor的原理,若是acker發現最後ack val的值變爲0,則說明全部生成的tuple都被ack,也就是tuple樹中全部的tuple都已經被處理了。經過這種方式,storm保證了acker能高效的識別消息是否被完整的處理。

保障消息發送而且僅發一次

由storm的高級api Trident來保證消息不會丟失,而且不會多發。具體信息本章不會描述,會在後面繼續補充。

storm高可用性(HA)

  • 若是woker掛了,supervisor會從新建立
  • 若是機器節點掛了,nimbus會把該節點上的task轉移到其餘節點
  • 若是nimbus或者supervisor掛了,重啓就好了。nimbus和supervisor被設計成無狀態,狀態都被存到zookeeper裏面了
  • 爲防止nimbus掛掉,worker節點也掛掉,致使任務沒法被nimbus轉移到其餘機器。nimbus也被設計成HA的,利用主從結構保證主節點掛了以後從節點同樣能服務

學習資料

相關文章
相關標籤/搜索