Storm's reliability API: how Storm guarantees that every message coming off a spout will be fully processed.html
(storm的可靠性API: storm如何保證spout發出的每個tuple都被完整處理。)java
本文導讀: node
1、簡介 2、理解消息被完整處理 3、消息的生命週期 4、可靠相關的API 5、高效的實現tuple tree 6、選擇合適的可靠性級別 7、集羣的各級容錯性 7.1 任務級失敗 7.2任務槽(slot)故障 7.3集羣節點(機器)故障 8、小結 附:官網文檔guaranteeing message processing 譯文
storm能夠確保spout發送出來的每一個消息都會被完整的處理。本章將會描述storm體系是如何達到這個目標的,並將會詳述開發者應該如何使用storm的這些機制來實現數據的可靠處理。git
一個消息(tuple)從spout發送出來,可能會致使成百上千的消息基於此消息被建立。github
咱們來思考一下流式的「單詞統計」的例子:算法
storm任務從數據源(Kestrel queue)每次讀取一個完整的英文句子;將這個句子分解爲獨立的單詞,最後,實時的輸出每一個單詞以及它出現過的次數。apache
本例中,每一個從spout發送出來的消息(每一個英文句子)都會觸發不少的消息被建立,那些從句子中分隔出來的單詞就是被建立出來的新消息。api
這些消息構成一個樹狀結構,咱們稱之爲「tuple tree」,看起來如圖1所示:併發
圖1 示例tuple treeapp
在什麼條件下,Storm纔會認爲一個從spout發送出來的消息被完整處理呢?答案就是下面的條件同時被知足:
若是在指定的時間內,一個消息衍生出來的tuple tree未被徹底處理成功,則認爲此消息未被完整處理。這個超時值能夠經過任務級參數Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進行配置,默認超時值爲30秒。
若是消息被完整處理或者未被完整處理,Storm會如何進行接下來的操做呢?爲了弄清這個問題,咱們來研究一下從spout發出來的消息的生命週期。這裏列出了spout應該實現的接口:
首先, Storm使用spout實例的nextTuple()方法從spout請求一個消息(tuple)。 收到請求之後,spout使用open方法中提供的SpoutOutputCollector向它的輸出流發送一個或多個消息。每發送一個消息,Spout會給這個消息提供一個message ID,它將會被用來標識這個消息。
假設咱們從kestrel隊列中讀取消息,Spout會將kestrel 隊列爲這個消息設置的ID做爲此消息的message ID。 向SpoutOutputCollector中發送消息格式以下:
接來下,這些消息會被髮送到後續業務處理的bolts, 而且Storm會跟蹤由此消息產生出來的新消息。當檢測到一個消息衍生出來的tuple tree被完整處理後,Storm會調用Spout中的ack方法,並將此消息的messageID做爲參數傳入。同理,若是某消息處理超時,則此消息對應的Spout的fail方法會被調用,調用時此消息的messageID會被做爲參數傳入。
注意:一個消息只會由發送它的那個spout任務來調用ack或fail。若是系統中某個spout由多個任務運行,消息也只會由建立它的spout任務來應答(ack或fail),決不會由其餘的spout任務來應答。
咱們繼續使用從kestrel隊列中讀取消息的例子來闡述高可靠性下spout須要作些什麼(假設這個spout的名字是KestrelSpout)。
咱們先簡述一下kestrel消息隊列:
當KestrelSpout從kestrel隊列中讀取一個消息,表示它「打開」了隊列中某個消息。這意味着,此消息並未從隊列中真正的刪除,而是將此消息設置爲「pending」狀態,它等待來自客戶端的應答,被應答之後,此消息纔會被真正的從隊列中刪除。處於「pending」狀態的消息不會被其餘的客戶端看到。另外,若是一個客戶端意外的斷開鏈接,則由此客戶端「打開」的全部消息都會被從新加入到隊列中。當消息被「打開」的時候,kestrel隊列同時會爲這個消息提供一個惟一的標識。
KestrelSpout就是使用這個惟一的標識做爲這個tuple的messageID的。稍後當ack或fail被調用的時候,KestrelSpout會把ack或者fail連同messageID一塊兒發送給kestrel隊列,kestrel會將消息從隊列中真正刪除或者將它從新放回隊列中。
爲了使用Storm提供的可靠處理特性,咱們須要作兩件事情:
經過上面的兩步,storm就能夠檢測到一個tuple tree什麼時候被徹底處理了,而且會調用相關的ack或fail方法。Storm提供了簡單明瞭的方法來完成上述兩步。
爲tuple tree中指定的節點增長一個新的節點,咱們稱之爲錨定(anchoring)。錨定是在咱們發送消息的同時進行的。爲了更容易說明問題,咱們使用下面代碼做爲例子。本示例的bolt將包含整句話的消息分解爲一系列的子消息,每一個子消息包含一個單詞。
每一個消息都經過這種方式被錨定:把輸入消息做爲emit方法的第一個參數。由於word消息被錨定在了輸入消息上,這個輸入消息是spout發送過來的tuple tree的根節點,若是任意一個word消息處理失敗,派生這個tuple tree那個spout 消息將會被從新發送。
與此相反,咱們來看看使用下面的方式emit消息時,Storm會如何處理:
若是以這種方式發送消息,將會致使這個消息不會被錨定。若是此tuple tree中的消息處理失敗,派生此tuple tree的根消息不會被從新發送。根據任務的容錯級別,有時候很適合發送一個非錨定的消息。
一個輸出消息能夠被錨定在一個或者多個輸入消息上,這在作join或聚合的時候是頗有用的。一個被多重錨定的消息處理失敗,會致使與之關聯的多個spout消息被從新發送。多重錨定經過在emit方法中指定多個輸入消息來實現:
多重錨定會將被錨定的消息加到多棵tuple tree上。
注意:多重綁定可能會破壞傳統的樹形結構,從而構成一個DAGs(有向無環圖),如圖2所示:
圖2 多重錨定構成的鑽石型結構
Storm的實現能夠像處理樹那樣來處理DAGs。
錨定代表瞭如何將一個消息加入到指定的tuple tree中,高可靠處理API的接下來部分將向您描述當處理完tuple tree中一個單獨的消息時咱們該作些什麼。這些是經過OutputCollector 的ack和fail方法來實現的。回頭看一下例子SplitSentence,能夠發現當全部的word消息被髮送完成後,輸入的表示句子的消息會被應答(acked)。
每一個被處理的消息必須代表成功或失敗(acked 或者failed)。Storm是使用內存來跟蹤每一個消息的處理狀況的,若是被處理的消息沒有應答的話,早晚內存會被耗盡!>
不少bolt遵循特定的處理流程: 讀取一個消息、發送它派生出來的子消息、在execute結尾處應答此消息。通常的過濾器(filter)或者是簡單的處理功能都是這類的應用。Storm有一個BasicBolt接口封裝了上述的流程。示例SplitSentence可使用BasicBolt來重寫:
使用這種方式,代碼比以前稍微簡單了一些,可是實現的功能是同樣的。發送到BasicOutputCollector的消息會被自動的錨定到輸入消息,而且,當execute執行完畢的時候,會自動的應答輸入消息。
不少狀況下,一個消息須要延遲應答,例如聚合或者是join。只有根據一組輸入消息獲得一個結果以後,纔會應答以前全部的輸入消息。而且聚合和join大部分時候對輸出消息都是多重錨定。然而,這些特性不是IBasicBolt所能處理的。
Storm 系統中有一組叫作「acker」的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每一個消息。每當發現一個DAG被徹底處理,它就向建立這個根消息的spout任務發送一個信號。拓撲中acker任務的並行度能夠經過配置參數Config.TOPOLOGY_ACKERS來設置。默認的acker任務並行度爲1,當系統中有大量的消息時,應該適當提升acker任務的併發度。
爲了理解Storm可靠性處理機制,咱們從研究一個消息的生命週期和tuple tree的管理入手。當一個消息被建立的時候(不管是在spout仍是bolt中),系統都爲該消息分配一個64bit的隨機值做爲id。這些隨機的id是acker用來跟蹤由spout消息派生出來的tuple tree的。
每一個消息都知道它所在的tuple tree對應的根消息的id。每當bolt新生成一個消息,對應tuple tree中的根消息的messageId就拷貝到這個消息中。當這個消息被應答的時候,它就把關於tuple tree變化的信息發送給跟蹤這棵樹的acker。例如,他會告訴acker:本消息已經處理完畢,可是我派生出了一些新的消息,幫忙跟蹤一下吧。
舉個例子,假設消息D和E是由消息C派生出來的,這裏演示了消息C被應答時,tuple tree是如何變化的。
由於在C被從樹中移除的同時D和E會被加入到tuple tree中,所以tuple tree不會被過早的認爲已徹底處理。
關於Storm如何跟蹤tuple tree,咱們再深刻的探討一下。前面說過系統中能夠有任意個數的acker,那麼,每當一個消息被建立或應答的時候,它怎麼知道應該通知哪一個acker呢?
系統使用一種哈希算法來根據spout消息的messageId肯定由哪一個acker跟蹤此消息派生出來的tuple tree。由於每一個消息都知道與之對應的根消息的messageId,所以它知道應該與哪一個acker通訊。
當spout發送一個消息的時候,它就通知對應的acker一個新的根消息產生了,這時acker就會建立一個新的tuple tree。當acker發現這棵樹被徹底處理以後,他就會通知對應的spout任務。
tuple是如何被跟蹤的呢?系統中有成千上萬的消息,若是爲每一個spout發送的消息都構建一棵樹的話,很快內存就會耗盡。因此,必須採用不一樣的策略來跟蹤每一個消息。因爲使用了新的跟蹤算法,Storm只須要固定的內存(大約20字節)就能夠跟蹤一棵樹。這個算法是storm正確運行的核心,也是storm最大的突破。
acker任務保存了spout消息id到一對值的映射。第一個值就是spout的任務id,經過這個id,acker就知道消息處理完成時該通知哪一個spout任務。第二個值是一個64bit的數字,咱們稱之爲「ack val」, 它是樹中全部消息的隨機id的異或結果。ack val表示了整棵樹的的狀態,不管這棵樹多大,只須要這個固定大小的數字就能夠跟蹤整棵樹。當消息被建立和被應答的時候都會有相同的消息id發送過來作異或。
每當acker發現一棵樹的ack val值爲0的時候,它就知道這棵樹已經被徹底處理了。由於消息的隨機ID是一個64bit的值,所以ack val在樹處理完以前被置爲0的機率很是小。假設你每秒鐘發送一萬個消息,從機率上說,至少須要50,000,000年纔會有機會發生一次錯誤。即便如此,也只有在這個消息確實處理失敗的狀況下才會有數據的丟失!
Acker任務是輕量級的,因此在拓撲中並不須要太多的acker存在。能夠經過Storm UI來觀察acker任務的吞吐量,若是看上去吞吐量不夠的話,說明須要添加額外的acker。
若是你並不要求每一個消息必須被處理(你容許在處理過程當中丟失一些信息),那麼能夠關閉消息的可靠處理機制,從而能夠獲取較好的性能。關閉消息的可靠處理機制意味着系統中的消息數會減半(每一個消息不須要應答了)。另外,關閉消息的可靠處理能夠減小消息的大小(不須要每一個tuple記錄它的根id了),從而節省帶寬。
有三種方法能夠關係消息的可靠處理機制:
到如今爲止,你們已經理解了Storm的可靠性機制,而且知道了如何選擇不一樣的可靠性級別來知足需求。接下來咱們研究一下Storm如何保證在各類狀況下確保數據不丟失。
本章介紹了storm集羣如何實現數據的可靠處理。藉助於創新性的tuple tree跟蹤技術,storm高效的經過數據的應答機制來保證數據不丟失。
storm集羣中除nimbus外,沒有單點存在,任何節點均可以出故障而保證數據不會丟失。nimbus被設計爲無狀態的,只要能夠及時重啓,就不會影響正在運行的任務。
參考連接:
1、storm官方文檔:Guaranteeing message processing
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed. A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. This timeout can be configured on a topology-specific basis using theConfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.
If Storm detects that a tuple is fully processed, Storm will call the ack
method on the originating Spout
task with the message id that the Spout
provided to Storm. Likewise, if the tuple times-out Storm will call the fail
method on the Spout
.
Note that a tuple will be acked or failed by the exact same Spout
task that created it. So if a Spout
is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.
There's two things you have to do as a user to benefit from Storm's reliability capabilities. First, you need to tell Storm whenever you're creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm's API provides a concise way of doing both of these tasks.
Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.
Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.
As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.
A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple. When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.
問:when a tuple is acked in the topology, how does it know to which acker task to send that information?(選 acker task)—— 哈希 spout-tuple-id對應acker
答:Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.
問:how the acker tasks track which spout tasks are responsible for each spout tuple they're tracking.(選 spout task)—— taskid-tupleid的對應關係
答:When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.
問:Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. (tracking algorithm跟蹤算法) —— a spout tuple id mapping (task id 到 ack val) \ 異或(XOR)
答:Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree.
When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.
Let's go over all the failure cases and see how in each case Storm avoids data loss:(失敗場景,避免數據丟失)—— 超時、從新處理
As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.
六、調整可靠性:
Acker tasks are lightweight. You can track their performance through the Storm UI (component id "__acker").(據此調整acker數量)
If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires fewer ids to be kept in each downstream tuple, reducing bandwidth usage.(減小帶寬佔用)
注:There are three ways to remove reliability.(去掉可靠性的三種方式)
The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack
method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.(設置ackers爲0)
The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in the SpoutOutputCollector.emit
method.(發射tuple的時候不指定messageid)
Finally, if you don't care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they're not anchored to any spout tuples, they won't cause any spout tuples to fail if they aren't acked.(不anchor綁定,不跟蹤)