storm是一個免費、開源的分佈式實時計算框架。它讓你更方便、可靠的處理實時發送的消息。若是你以前瞭解過hadoop,應該知道hadoop能很快速、方便的幫你完成批量數據處理,而storm能夠認爲是實時數據處理領域的hadoop。storm簡單,雖然他是用jvm之上的clojure編寫的,可是一樣支持非jvm語言。html
若是你不知道是否該使用storm,你能夠先看看你有沒有過這些需求:java
若是你有其中某項需求,那麼恭喜你,storm能夠幫到你。storm性能好、可伸縮性強、容錯能力好,而且能保證消息的可靠性。這些特色足以你擁有使用storm的理由。git
要了解storm,首先須要瞭解這些概念:github
storm中的實時處理的應用會被打包成topology,這個topology由一系列stream(數據流)、spout(數據流生產者)、bolt(數據處理邏輯)組成。相似hadoop中作mapreduce的job,有個區別就是mapreduce job會結束,而topology只要你不手動kill掉,它永遠也不會結束。apache
Stream是數據流,有無窮無盡的tuple組成,而tuple則包含了用戶發送的具體數據,好比整數、小數、字符串等,也能夠包含自定義的數據類型,前提是你要爲它實現序列化。api
Spout是數據流Stream的生產者。一般spout會從外部數據源(kafka等)讀取數據tuple,並將它emit(發送)到topology中。網絡
Spout中最主要的方法是nextTuple。nextTuple一般會生成一個新的tuple,而後emit到topology。因爲storm會在一個線程中調用全部spout的nextTuple方法,因此千萬不要讓這個方法阻塞掉。儘可能保持spout只處理數據的發送,不要讓它處理業務邏輯。數據結構
Bolt處理topology中全部的運算、業務邏輯,若是邏輯複雜,一般使用多個bolt也能很好的解決。bolt會訂閱spout或者其餘bolt發送的tuple,而整個應用可能會有多個spout和bolt,他們組成一塊兒就會造成一個圖裝結構,也就是topology。併發
bolt中最主要的方法是execute,它會從訂閱的spout或者bolt獲取tuple,從tuple從取出數據,作響應的邏輯處理,而後生成新的tuple給emit出去。若是這個bolt是topology中最後一個bolt節點,就沒有必要繼續emit,而是本身來處理數據的歸屬。負載均衡
Stream groupings就是數據流分組,它定義了tuple該如何分發給bolt中不一樣的task。好比,一個topology中有ASpout和訂閱了Apout的Abolt,爲了保持併發量,給Abolt設置了4個task。數據流分組會決定Aspout發送出來的tuple,會怎樣分配到4個task中
目前storm定義了八種不一樣的分組方式:
storm提供了可靠的和不可靠的實時處理方式,須要本身經過api指定。經過追蹤tuple樹中的消息傳遞,spout能夠保證一旦消息丟失或者傳送超時,就會重發。具體能夠參見後面描述。
每一個spout和bolt均可以被分解成多個task,運行在不一樣的線程中,經過併發執行保持高效。對應的api爲TopologyBuilder的setSpout和setBolt方法。
topology能夠運行在多個worker進程中,每一個worker進程都是一個獨立的jvm,每一個進程裏面運行着不少task
Storm採用Kryo做爲序列化框架。默認狀況下,Storm支持基本數據類型, strings, byte arrays, ArrayList, HashMap, HashSet,以及Clojure的集合類型。若是你但願在tuple中存儲自定義數據類型,保證它能在topology中傳遞,你就須要註冊自定義數據類型。
官網提供了兩種註冊方式。
任選一種方式,將自定義數據結構註冊進去,就能使用FieldsSerializer來序列化(kryo提供),不然就要本身提供序列化方式。
若是tuple中存儲的數據沒有註冊過,就會採用默認java序列化方案,若是它沒法沒java序列化方案處理,storm會拋出異常。爲了性能考慮,若是存在自定義數據,最好使用storm提供的方案註冊,採用kryo等優秀的序列化方案。不然,java序列化的性能開銷很是大。
要理解storm的併發機制,首先得理解下面幾個概念:
如圖所示,該topology設置了2個wokers,10個executors,12個task。這些資源會被平均分配。
其中,executors數量不能多於task數量,這樣就保證了每一個executor至少會分配到一個task。默認狀況executors數量等於task數量。
若是用戶但願改變task並行能力,能夠經過改變executors數量來實現。之因此沒有僅僅使用task來表明線程,而是引入executor,主要是考慮到在一個運行的topology中,task數量沒法改變,由於一旦改變,可能致使Fields grouping這種分組方式出現bug。
舉個例子,用戶但願消息A分配到某個task,而且之後都由這個task接收,那採用Fields grouping能夠將獲取消息id,取n = hash(id) % task數量
,n就是A要去的task編號。若是運行期間能夠修改task數量,那麼n可能會發生變化,帶來的影響就是A會跑去另一個task。很明顯,這是不容許的。
隨着executor的引入,用戶能夠根據本身的需求,在topology運行時調整task的並行能力,更加自由靈活。(不過jstorm取消了executor這個語義,轉而採用task來表明任務和線程,主要考慮到storm這種模型的實現複雜性與收益微小性,每每大部分人採用默認配置)
storm提供了三個級別的消息處理保障機制:
這是最簡單的模式,就是發送消息,丟了就不作處理。
storm提供了一種api保證每一個tuple都會被完整的處理。要保證storm的消息可靠性,就得保證spout和bolt兩個角色的可靠性。
_collector.emit(new Values("field1", "field2", 3) , msgId);
加上msgId就將emit出去的tuple打上了標識,一旦tuple在timeout(默認爲30s)時間範圍內被徹底處理,系統就會調用ack(msgId),表示表示已經被完整的處理過了,不然就調用fail(msgId)作重發處理。(ack和fail方法都得本身實現,一般ack回覆數據來源消息已經被處理,fail作重發操做)
bolt可靠性:一般bolt都會讀入tuple,接着取出數據,最後emit新的tuple。bolt要保證可靠性,首先 須要在emit時錨定讀入的tuple和新生成的tuple。
_collector.emit(tuple, new Values(word));
接着根據消息處理成功或者失敗的狀況分別作ack或者fail調用。
//_collector是SpoutOutputCollector的對象 if(success){ _collector.ack(tuple); }else{ _collector.ack(tuple); }
那麼,storm是怎樣保障消息的可靠性的呢?
要了解實現原理,首先得有tuple樹的背景知識,經過下面一幅圖來看看tuple樹的處理流程。
假設圖中A是Aspout發送的tuple,BC是Bbolt發送的tuple,DE是Cbolt發送的tuple,Bbolt訂閱Aspout,Cbolt訂閱Bbolt。
storm的topology中會運行名爲acker的任務,acker會監控這個tuple樹,當發現tuple樹中全部的tuple都被完整的處理過了,而且沒有新的tuple生成,就會調用spout的ack方法,表示消息被成功處理。不然就調用fail方法。
當topology越大,tuple樹也就越大,完整的在內存生成這個tuple樹並跟蹤它是不現實的。storm經過一個很巧妙的方式,來實現對tuple樹的監控。 acker存儲了一個64位的數字,名爲ack val。當新加入一個tuple時,會生成一個64位隨機數字做爲id,讓tuple被emit時,會將更新ack val爲ack val xor id。當ack(tuple)時,也會按一樣的方法更新ack val。根據xor的原理,若是acker發現最後ack val的值變爲0,則說明全部生成的tuple都被ack,也就是tuple樹中全部的tuple都已經被處理了。經過這種方式,storm保證了acker能高效的識別消息是否被完整的處理。
由storm的高級api Trident來保證消息不會丟失,而且不會多發。具體信息本章不會描述,會在後面繼續補充。