storm能夠確保spout發送出來的每一個消息都會被完整的處理。本章將會描述storm體系是如何達到這個目標的,並將會詳述開發者應該如何使用storm的這些機制來實現數據的可靠處理。 html
一個消息(tuple)從spout發送出來,可能會致使成百上千的消息基於此消息被建立。 git
咱們來思考一下流式的「單詞統計」的例子: github
storm任務從數據源(Kestrel queue)每次讀取一個完整的英文句子;將這個句子分解爲獨立的單詞,最後,實時的輸出每一個單詞以及它出現過的次數。 算法
本例中,每一個從spout發送出來的消息(每一個英文句子)都會觸發不少的消息被建立,那些從句子中分隔出來的單詞就是被建立出來的新消息。 併發
這些消息構成一個樹狀結構,咱們稱之爲「tuple tree」,看起來如圖1所示: 性能
圖1 示例tuple tree spa
在什麼條件下,Storm纔會認爲一個從spout發送出來的消息被完整處理呢?答案就是下面的條件同時被知足: 設計
若是在指定的時間內,一個消息衍生出來的tuple tree未被徹底處理成功,則認爲此消息未被完整處理。這個超時值能夠經過任務級參數Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進行配置,默認超時值爲30秒。 orm
若是消息被完整處理或者未被完整處理,Storm會如何進行接下來的操做呢?爲了弄清這個問題,咱們來研究一下從spout發出來的消息的生命週期。這裏列出了spout應該實現的接口: htm
首先, Storm使用spout實例的nextTuple()方法從spout請求一個消息(tuple)。 收到請求之後,spout使用open方法中提供的SpoutOutputCollector向它的輸出流發送一個或多個消息。每發送一個消息,Spout會給這個消息提供一個message ID,它將會被用來標識這個消息。
假設咱們從kestrel隊列中讀取消息,Spout會將kestrel 隊列爲這個消息設置的ID做爲此消息的message ID。 向SpoutOutputCollector中發送消息格式以下:
接來下,這些消息會被髮送到後續業務處理的bolts, 而且Storm會跟蹤由此消息產生出來的新消息。當檢測到一個消息衍生出來的tuple tree被完整處理後,Storm會調用Spout中的ack方法,並將此消息的messageID做爲參數傳入。同理,若是某消息處理超時,則此消息對應的Spout的fail方法會被調用,調用時此消息的messageID會被做爲參數傳入。
注意:一個消息只會由發送它的那個spout任務來調用ack或fail。若是系統中某個spout由多個任務運行,消息也只會由建立它的spout任務來應答(ack或fail),決不會由其餘的spout任務來應答。
咱們繼續使用從kestrel隊列中讀取消息的例子來闡述高可靠性下spout須要作些什麼(假設這個spout的名字是KestrelSpout)。
咱們先簡述一下kestrel消息隊列:
當KestrelSpout從kestrel隊列中讀取一個消息,表示它「打開」了隊列中某個消息。這意味着,此消息並未從隊列中真正的刪除,而是將此消息設置爲「pending」狀態,它等待來自客戶端的應答,被應答之後,此消息纔會被真正的從隊列中刪除。處於「pending」狀態的消息不會被其餘的客戶端看到。另外,若是一個客戶端意外的斷開鏈接,則由此客戶端「打開」的全部消息都會被從新加入到隊列中。當消息被「打開」的時候,kestrel隊列同時會爲這個消息提供一個惟一的標識。
KestrelSpout就是使用這個惟一的標識做爲這個tuple的messageID的。稍後當ack或fail被調用的時候,KestrelSpout會把ack或者fail連同messageID一塊兒發送給kestrel隊列,kestrel會將消息從隊列中真正刪除或者將它從新放回隊列中。
爲了使用Storm提供的可靠處理特性,咱們須要作兩件事情:
經過上面的兩步,storm就能夠檢測到一個tuple tree什麼時候被徹底處理了,而且會調用相關的ack或fail方法。Storm提供了簡單明瞭的方法來完成上述兩步。
爲tuple tree中指定的節點增長一個新的節點,咱們稱之爲錨定(anchoring)。錨定是在咱們發送消息的同時進行的。爲了更容易說明問題,咱們使用下面代碼做爲例子。本示例的bolt將包含整句話的消息分解爲一系列的子消息,每一個子消息包含一個單詞。
每一個消息都經過這種方式被錨定:把輸入消息做爲emit方法的第一個參數。由於word消息被錨定在了輸入消息上,這個輸入消息是spout發送過來的tuple tree的根節點,若是任意一個word消息處理失敗,派生這個tuple tree那個spout 消息將會被從新發送。
與此相反,咱們來看看使用下面的方式emit消息時,Storm會如何處理:
若是以這種方式發送消息,將會致使這個消息不會被錨定。若是此tuple tree中的消息處理失敗,派生此tuple tree的根消息不會被從新發送。根據任務的容錯級別,有時候很適合發送一個非錨定的消息。
一個輸出消息能夠被錨定在一個或者多個輸入消息上,這在作join或聚合的時候是頗有用的。一個被多重錨定的消息處理失敗,會致使與之關聯的多個spout消息被從新發送。多重錨定經過在emit方法中指定多個輸入消息來實現:
多重錨定會將被錨定的消息加到多棵tuple tree上。
注意:多重綁定可能會破壞傳統的樹形結構,從而構成一個DAGs(有向無環圖),如圖2所示:
圖2 多重錨定構成的鑽石型結構
Storm的實現能夠像處理樹那樣來處理DAGs。
錨定代表瞭如何將一個消息加入到指定的tuple tree中,高可靠處理API的接下來部分將向您描述當處理完tuple tree中一個單獨的消息時咱們該作些什麼。這些是經過OutputCollector 的ack和fail方法來實現的。回頭看一下例子SplitSentence,能夠發現當全部的word消息被髮送完成後,輸入的表示句子的消息會被應答(acked)。
每一個被處理的消息必須代表成功或失敗(acked 或者failed)。Storm是使用內存來跟蹤每一個消息的處理狀況的,若是被處理的消息沒有應答的話,早晚內存會被耗盡!>
不少bolt遵循特定的處理流程: 讀取一個消息、發送它派生出來的子消息、在execute結尾處應答此消息。通常的過濾器(filter)或者是簡單的處理功能都是這類的應用。Storm有一個BasicBolt接口封裝了上述的流程。示例SplitSentence可使用BasicBolt來重寫:
使用這種方式,代碼比以前稍微簡單了一些,可是實現的功能是同樣的。發送到BasicOutputCollector的消息會被自動的錨定到輸入消息,而且,當execute執行完畢的時候,會自動的應答輸入消息。
不少狀況下,一個消息須要延遲應答,例如聚合或者是join。只有根據一組輸入消息獲得一個結果以後,纔會應答以前全部的輸入消息。而且聚合和join大部分時候對輸出消息都是多重錨定。然而,這些特性不是IBasicBolt所能處理的。
Storm 系統中有一組叫作「acker」的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每一個消息。每當發現一個DAG被徹底處理,它就向建立這個根消息的spout任務發送一個信號。拓撲中acker任務的並行度能夠經過配置參數Config.TOPOLOGY_ACKERS來設置。默認的acker任務並行度爲1,當系統中有大量的消息時,應該適當提升acker任務的併發度。
爲了理解Storm可靠性處理機制,咱們從研究一個消息的生命週期和tuple tree的管理入手。當一個消息被建立的時候(不管是在spout仍是bolt中),系統都爲該消息分配一個64bit的隨機值做爲id。這些隨機的id是acker用來跟蹤由spout消息派生出來的tuple tree的。
每一個消息都知道它所在的tuple tree對應的根消息的id。每當bolt新生成一個消息,對應tuple tree中的根消息的messageId就拷貝到這個消息中。當這個消息被應答的時候,它就把關於tuple tree變化的信息發送給跟蹤這棵樹的acker。例如,他會告訴acker:本消息已經處理完畢,可是我派生出了一些新的消息,幫忙跟蹤一下吧。
舉個例子,假設消息D和E是由消息C派生出來的,這裏演示了消息C被應答時,tuple tree是如何變化的。
由於在C被從樹中移除的同時D和E會被加入到tuple tree中,所以tuple tree不會被過早的認爲已徹底處理。
關於Storm如何跟蹤tuple tree,咱們再深刻的探討一下。前面說過系統中能夠有任意個數的acker,那麼,每當一個消息被建立或應答的時候,它怎麼知道應該通知哪一個acker呢?
系統使用一種哈希算法來根據spout消息的messageId肯定由哪一個acker跟蹤此消息派生出來的tuple tree。由於每一個消息都知道與之對應的根消息的messageId,所以它知道應該與哪一個acker通訊。
當spout發送一個消息的時候,它就通知對應的acker一個新的根消息產生了,這時acker就會建立一個新的tuple tree。當acker發現這棵樹被徹底處理以後,他就會通知對應的spout任務。
tuple是如何被跟蹤的呢?系統中有成千上萬的消息,若是爲每一個spout發送的消息都構建一棵樹的話,很快內存就會耗盡。因此,必須採用不一樣的策略來跟蹤每一個消息。因爲使用了新的跟蹤算法,Storm只須要固定的內存(大約20字節)就能夠跟蹤一棵樹。這個算法是storm正確運行的核心,也是storm最大的突破。
acker任務保存了spout消息id到一對值的映射。第一個值就是spout的任務id,經過這個id,acker就知道消息處理完成時該通知哪一個spout任務。第二個值是一個64bit的數字,咱們稱之爲「ack val」, 它是樹中全部消息的隨機id的異或結果。ack val表示了整棵樹的的狀態,不管這棵樹多大,只須要這個固定大小的數字就能夠跟蹤整棵樹。當消息被建立和被應答的時候都會有相同的消息id發送過來作異或。
每當acker發現一棵樹的ack val值爲0的時候,它就知道這棵樹已經被徹底處理了。由於消息的隨機ID是一個64bit的值,所以ack val在樹處理完以前被置爲0的機率很是小。假設你每秒鐘發送一萬個消息,從機率上說,至少須要50,000,000年纔會有機會發生一次錯誤。即便如此,也只有在這個消息確實處理失敗的狀況下才會有數據的丟失!
Acker任務是輕量級的,因此在拓撲中並不須要太多的acker存在。能夠經過Storm UI來觀察acker任務的吞吐量,若是看上去吞吐量不夠的話,說明須要添加額外的acker。
若是你並不要求每一個消息必須被處理(你容許在處理過程當中丟失一些信息),那麼能夠關閉消息的可靠處理機制,從而能夠獲取較好的性能。關閉消息的可靠處理機制意味着系統中的消息數會減半(每一個消息不須要應答了)。另外,關閉消息的可靠處理能夠減小消息的大小(不須要每一個tuple記錄它的根id了),從而節省帶寬。
有三種方法能夠關係消息的可靠處理機制:
到如今爲止,你們已經理解了Storm的可靠性機制,而且知道了如何選擇不一樣的可靠性級別來知足需求。接下來咱們研究一下Storm如何保證在各類狀況下確保數據不丟失。
本章介紹了storm集羣如何實現數據的可靠處理。藉助於創新性的tuple tree跟蹤技術,storm高效的經過數據的應答機制來保證數據不丟失。
storm集羣中除nimbus外,沒有單點存在,任何節點均可以出故障而保證數據不會丟失。nimbus被設計爲無狀態的,只要能夠及時重啓,就不會影響正在運行的任務。