storm入門 第四章 消息的可靠處理

4.1 簡介

storm能夠確保spout發送出來的每一個消息都會被完整的處理。本章將會描述storm體系是如何達到這個目標的,並將會詳述開發者應該如何使用storm的這些機制來實現數據的可靠處理。 html

4.2 理解消息被完整處理

一個消息(tuple)從spout發送出來,可能會致使成百上千的消息基於此消息被建立。 git

咱們來思考一下流式的「單詞統計」的例子: github

storm任務從數據源(Kestrel queue)每次讀取一個完整的英文句子;將這個句子分解爲獨立的單詞,最後,實時的輸出每一個單詞以及它出現過的次數。 算法

本例中,每一個從spout發送出來的消息(每一個英文句子)都會觸發不少的消息被建立,那些從句子中分隔出來的單詞就是被建立出來的新消息。 併發

這些消息構成一個樹狀結構,咱們稱之爲「tuple tree」,看起來如圖1所示: 性能

圖1 示例tuple tree spa

在什麼條件下,Storm纔會認爲一個從spout發送出來的消息被完整處理呢?答案就是下面的條件同時被知足: 設計

  • tuple tree再也不生長
  • 樹中的任何消息被標識爲「已處理」

若是在指定的時間內,一個消息衍生出來的tuple tree未被徹底處理成功,則認爲此消息未被完整處理。這個超時值能夠經過任務級參數Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進行配置,默認超時值爲30秒。 orm

4.3 消息的生命週期

若是消息被完整處理或者未被完整處理,Storm會如何進行接下來的操做呢?爲了弄清這個問題,咱們來研究一下從spout發出來的消息的生命週期。這裏列出了spout應該實現的接口: htm

首先, Storm使用spout實例的nextTuple()方法從spout請求一個消息(tuple)。 收到請求之後,spout使用open方法中提供的SpoutOutputCollector向它的輸出流發送一個或多個消息。每發送一個消息,Spout會給這個消息提供一個message ID,它將會被用來標識這個消息。

假設咱們從kestrel隊列中讀取消息,Spout會將kestrel 隊列爲這個消息設置的ID做爲此消息的message ID。 向SpoutOutputCollector中發送消息格式以下:

接來下,這些消息會被髮送到後續業務處理的bolts, 而且Storm會跟蹤由此消息產生出來的新消息。當檢測到一個消息衍生出來的tuple tree被完整處理後,Storm會調用Spout中的ack方法,並將此消息的messageID做爲參數傳入。同理,若是某消息處理超時,則此消息對應的Spout的fail方法會被調用,調用時此消息的messageID會被做爲參數傳入。

注意:一個消息只會由發送它的那個spout任務來調用ack或fail。若是系統中某個spout由多個任務運行,消息也只會由建立它的spout任務來應答(ack或fail),決不會由其餘的spout任務來應答。

咱們繼續使用從kestrel隊列中讀取消息的例子來闡述高可靠性下spout須要作些什麼(假設這個spout的名字是KestrelSpout)。

咱們先簡述一下kestrel消息隊列:

當KestrelSpout從kestrel隊列中讀取一個消息,表示它「打開」了隊列中某個消息。這意味着,此消息並未從隊列中真正的刪除,而是將此消息設置爲「pending」狀態,它等待來自客戶端的應答,被應答之後,此消息纔會被真正的從隊列中刪除。處於「pending」狀態的消息不會被其餘的客戶端看到。另外,若是一個客戶端意外的斷開鏈接,則由此客戶端「打開」的全部消息都會被從新加入到隊列中。當消息被「打開」的時候,kestrel隊列同時會爲這個消息提供一個惟一的標識。

KestrelSpout就是使用這個惟一的標識做爲這個tuple的messageID的。稍後當ack或fail被調用的時候,KestrelSpout會把ack或者fail連同messageID一塊兒發送給kestrel隊列,kestrel會將消息從隊列中真正刪除或者將它從新放回隊列中。

4.4 可靠相關的API

爲了使用Storm提供的可靠處理特性,咱們須要作兩件事情:

  1. 不管什麼時候在tuple tree中建立了一個新的節點,咱們須要明確的通知Storm;
  2. 當處理完一個單獨的消息時,咱們須要告訴Storm 這棵tuple tree的變化狀態。

經過上面的兩步,storm就能夠檢測到一個tuple tree什麼時候被徹底處理了,而且會調用相關的ack或fail方法。Storm提供了簡單明瞭的方法來完成上述兩步。

爲tuple tree中指定的節點增長一個新的節點,咱們稱之爲錨定(anchoring)。錨定是在咱們發送消息的同時進行的。爲了更容易說明問題,咱們使用下面代碼做爲例子。本示例的bolt將包含整句話的消息分解爲一系列的子消息,每一個子消息包含一個單詞。

每一個消息都經過這種方式被錨定:把輸入消息做爲emit方法的第一個參數。由於word消息被錨定在了輸入消息上,這個輸入消息是spout發送過來的tuple tree的根節點,若是任意一個word消息處理失敗,派生這個tuple tree那個spout 消息將會被從新發送。

與此相反,咱們來看看使用下面的方式emit消息時,Storm會如何處理:

若是以這種方式發送消息,將會致使這個消息不會被錨定。若是此tuple tree中的消息處理失敗,派生此tuple tree的根消息不會被從新發送。根據任務的容錯級別,有時候很適合發送一個非錨定的消息。

一個輸出消息能夠被錨定在一個或者多個輸入消息上,這在作join或聚合的時候是頗有用的。一個被多重錨定的消息處理失敗,會致使與之關聯的多個spout消息被從新發送。多重錨定經過在emit方法中指定多個輸入消息來實現:

多重錨定會將被錨定的消息加到多棵tuple tree上。

注意:多重綁定可能會破壞傳統的樹形結構,從而構成一個DAGs(有向無環圖),如圖2所示:

圖2 多重錨定構成的鑽石型結構

Storm的實現能夠像處理樹那樣來處理DAGs。

錨定代表瞭如何將一個消息加入到指定的tuple tree中,高可靠處理API的接下來部分將向您描述當處理完tuple tree中一個單獨的消息時咱們該作些什麼。這些是經過OutputCollector 的ack和fail方法來實現的。回頭看一下例子SplitSentence,能夠發現當全部的word消息被髮送完成後,輸入的表示句子的消息會被應答(acked)。

每一個被處理的消息必須代表成功或失敗(acked 或者failed)。Storm是使用內存來跟蹤每一個消息的處理狀況的,若是被處理的消息沒有應答的話,早晚內存會被耗盡!>

不少bolt遵循特定的處理流程: 讀取一個消息、發送它派生出來的子消息、在execute結尾處應答此消息。通常的過濾器(filter)或者是簡單的處理功能都是這類的應用。Storm有一個BasicBolt接口封裝了上述的流程。示例SplitSentence可使用BasicBolt來重寫:

使用這種方式,代碼比以前稍微簡單了一些,可是實現的功能是同樣的。發送到BasicOutputCollector的消息會被自動的錨定到輸入消息,而且,當execute執行完畢的時候,會自動的應答輸入消息。

不少狀況下,一個消息須要延遲應答,例如聚合或者是join。只有根據一組輸入消息獲得一個結果以後,纔會應答以前全部的輸入消息。而且聚合和join大部分時候對輸出消息都是多重錨定。然而,這些特性不是IBasicBolt所能處理的。

4.5 高效的實現tuple tree

Storm 系統中有一組叫作「acker」的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每一個消息。每當發現一個DAG被徹底處理,它就向建立這個根消息的spout任務發送一個信號。拓撲中acker任務的並行度能夠經過配置參數Config.TOPOLOGY_ACKERS來設置。默認的acker任務並行度爲1,當系統中有大量的消息時,應該適當提升acker任務的併發度。

爲了理解Storm可靠性處理機制,咱們從研究一個消息的生命週期和tuple tree的管理入手。當一個消息被建立的時候(不管是在spout仍是bolt中),系統都爲該消息分配一個64bit的隨機值做爲id。這些隨機的id是acker用來跟蹤由spout消息派生出來的tuple tree的。

每一個消息都知道它所在的tuple tree對應的根消息的id。每當bolt新生成一個消息,對應tuple tree中的根消息的messageId就拷貝到這個消息中。當這個消息被應答的時候,它就把關於tuple tree變化的信息發送給跟蹤這棵樹的acker。例如,他會告訴acker:本消息已經處理完畢,可是我派生出了一些新的消息,幫忙跟蹤一下吧。

舉個例子,假設消息D和E是由消息C派生出來的,這裏演示了消息C被應答時,tuple tree是如何變化的。

由於在C被從樹中移除的同時D和E會被加入到tuple tree中,所以tuple tree不會被過早的認爲已徹底處理。

關於Storm如何跟蹤tuple tree,咱們再深刻的探討一下。前面說過系統中能夠有任意個數的acker,那麼,每當一個消息被建立或應答的時候,它怎麼知道應該通知哪一個acker呢?

系統使用一種哈希算法來根據spout消息的messageId肯定由哪一個acker跟蹤此消息派生出來的tuple tree。由於每一個消息都知道與之對應的根消息的messageId,所以它知道應該與哪一個acker通訊。

當spout發送一個消息的時候,它就通知對應的acker一個新的根消息產生了,這時acker就會建立一個新的tuple tree。當acker發現這棵樹被徹底處理以後,他就會通知對應的spout任務。

tuple是如何被跟蹤的呢?系統中有成千上萬的消息,若是爲每一個spout發送的消息都構建一棵樹的話,很快內存就會耗盡。因此,必須採用不一樣的策略來跟蹤每一個消息。因爲使用了新的跟蹤算法,Storm只須要固定的內存(大約20字節)就能夠跟蹤一棵樹。這個算法是storm正確運行的核心,也是storm最大的突破。

acker任務保存了spout消息id到一對值的映射。第一個值就是spout的任務id,經過這個id,acker就知道消息處理完成時該通知哪一個spout任務。第二個值是一個64bit的數字,咱們稱之爲「ack val」, 它是樹中全部消息的隨機id的異或結果。ack val表示了整棵樹的的狀態,不管這棵樹多大,只須要這個固定大小的數字就能夠跟蹤整棵樹。當消息被建立和被應答的時候都會有相同的消息id發送過來作異或。

每當acker發現一棵樹的ack val值爲0的時候,它就知道這棵樹已經被徹底處理了。由於消息的隨機ID是一個64bit的值,所以ack val在樹處理完以前被置爲0的機率很是小。假設你每秒鐘發送一萬個消息,從機率上說,至少須要50,000,000年纔會有機會發生一次錯誤。即便如此,也只有在這個消息確實處理失敗的狀況下才會有數據的丟失!

4.6 選擇合適的可靠性級別

Acker任務是輕量級的,因此在拓撲中並不須要太多的acker存在。能夠經過Storm UI來觀察acker任務的吞吐量,若是看上去吞吐量不夠的話,說明須要添加額外的acker。

若是你並不要求每一個消息必須被處理(你容許在處理過程當中丟失一些信息),那麼能夠關閉消息的可靠處理機制,從而能夠獲取較好的性能。關閉消息的可靠處理機制意味着系統中的消息數會減半(每一個消息不須要應答了)。另外,關閉消息的可靠處理能夠減小消息的大小(不須要每一個tuple記錄它的根id了),從而節省帶寬。

有三種方法能夠關係消息的可靠處理機制:

  • 將參數Config.TOPOLOGY_ACKERS設置爲0,經過此方法,當Spout發送一個消息的時候,它的ack方法將馬上被調用;
  • 第二個方法是Spout發送一個消息時,不指定此消息的messageID。當須要關閉特定消息可靠性的時候,可使用此方法;
  • 最後,若是你不在乎某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要作錨定,即在emit方法中不指定輸入消息。由於這些子孫消息沒有被錨定在任何tuple tree中,所以他們的失敗不會引發任何spout從新發送消息。

4.7 集羣的各級容錯

到如今爲止,你們已經理解了Storm的可靠性機制,而且知道了如何選擇不一樣的可靠性級別來知足需求。接下來咱們研究一下Storm如何保證在各類狀況下確保數據不丟失。

3.7.1 任務級失敗

  • 由於bolt任務crash引發的消息未被應答。此時,acker中全部與此bolt任務關聯的消息都會由於超時而失敗,對應spout的fail方法將被調用。
  • acker任務失敗。若是acker任務自己失敗了,它在失敗以前持有的全部消息都將會由於超時而失敗。Spout的fail方法將被調用。
  • Spout任務失敗。這種狀況下,Spout任務對接的外部設備(如MQ)負責消息的完整性。例如當客戶端異常的狀況下,kestrel隊列會將處於pending狀態的全部的消息從新放回到隊列中。

4.7.2  任務槽(slot) 故障

  • worker失敗。每一個worker中包含數個bolt(或spout)任務。supervisor負責監控這些任務,當worker失敗後,supervisor會嘗試在本機重啓它。
  • supervisor失敗。supervisor是無狀態的,所以supervisor的失敗不會影響當前正在運行的任務,只要及時的將它從新啓動便可。supervisor不是自舉的,須要外部監控來及時重啓。
  • nimbus失敗。nimbus是無狀態的,所以nimbus的失敗不會影響當前正在運行的任務(nimbus失敗時,沒法提交新的任務),只要及時的將它從新啓動便可。nimbus不是自舉的,須要外部監控來及時重啓。

4.7.3.  集羣節點(機器)故障

  • storm集羣中的節點故障。此時nimbus會將此機器上全部正在運行的任務轉移到其餘可用的機器上運行。
  • zookeeper集羣中的節點故障。zookeeper保證少於半數的機器宕機仍可正常運行,及時修復故障機器便可。

4.8 小結

本章介紹了storm集羣如何實現數據的可靠處理。藉助於創新性的tuple tree跟蹤技術,storm高效的經過數據的應答機制來保證數據不丟失。

storm集羣中除nimbus外,沒有單點存在,任何節點均可以出故障而保證數據不會丟失。nimbus被設計爲無狀態的,只要能夠及時重啓,就不會影響正在運行的任務。

相關文章
相關標籤/搜索