轉載自http://www.cnblogs.com/Jack47/p/guaranteeing-message-processing-in-storm.htmljavascript
Storm能夠保證從Spout發出的每一個消息都能被徹底處理。Storm的可靠性機制是徹底分佈式的(distributed),可伸縮的(scalable),容錯的(fault-tolerant)。本文介紹了Storm如何保證可靠性以及做爲Storm使用者,咱們須要怎麼作,才能充分利用Storm的可靠性。理解一些實現細節,也可以幫助咱們領悟Storm的設計理念。html
考慮以下的流式計算文章中單詞個數的拓撲:java
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22133, "sentence_queue", new StringScheme())); builder.setBolt("split", new SplitStentence(), 10).shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20).fieldsGrouping("split", new Fields("word"));
這個拓撲由3個處理單元組成:一個叫"sentences"的Spout,負責從Kestrel隊列中讀取句子並做爲新的Spout元組發送出去。名稱爲"split"的Bolt是Spout元組的下游消費方,它把接收到句子切分紅單詞併發送出去。名稱爲"count"的Bolt是"split" Bolt的下游消費方,它使用HashMap<String, Interger>
存儲了每一個任務中每一個單詞出現的次數,每次讀取到新的單詞元組就讓該單詞的計數加一。"count" Bolt接收"split" Bolt發出的消息時,是使用元組中的"word"(單詞)字段來做爲路由策略,因此相同的單詞元組會被路由到相同的任務(task)裏,這樣就可以計數了。git
在下游的Bolt中會基於某個Spout元組發射出不少新的元組:句子中的每一個單詞會生成一個新元組(在split Bolt完成),每一個單詞的計數更新後(在count Bolt完成)也會觸發一個新的元組。某個Spout元組觸發的消息樹以下圖:github
一個Spout元組觸發的消息樹算法
能夠看到這棵消息樹的根節點是Spout產生的句子內容爲"the cow jumped over the moon"的元組。這個Spout元組在"split"這個Bolt裏被切分爲6個單詞,觸發了6個單詞元組,"count" Bolt接收到這6個單詞元組後,更新了每一個單詞的計數併爲之產生了一個新的元組。數據庫
一條消息被「完整處理」apache
指一個從Spout發出的元組所觸發的消息樹中全部的消息都被Storm處理了。若是在指定的超時時間裏,這個Spout元組觸發的消息樹中有任何一個消息沒有處理完,就認爲這個Spout元組處理失敗了。這個超時時間是經過每一個拓撲的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置項來進行配置的,默認是30秒。api
在前面消息樹的例子裏,只有消息樹中全部的消息(包含一條Spout消息,六條split Bolt消息,六條count Bolt消息)都被Storm處理完了,纔算是這條Spout消息被完整處理了。markdown
當消息沒有被完整處理或者處理失敗了會怎麼樣?爲了理解這個問題,應該首先看一下Spout發出的一個元組的生命週期。Spout須要實現的接口(接口文檔見這裏)以下:
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
首先,Storm經過調用Spout的nextTuple
函數來從Spout請求一個元組。Spout任務使用open
函數入參中提供的SpoutOutputCollector
來給Spout任務的某個輸出流發射一個新元組。當發射一個元組時,Spout
提供了一個"消息標識"(message-id),用來後續識別這個元組。例如,上面的例子裏,sentence Spout從Kestrel隊列中讀取一條消息,而後把Kestrel提供的這個消息的message-id做爲"消息標識"來發送出去。向SpoutOutputCollector
中發送消息的例子以下:
_collector.emit(new Values("the cow jumped over the moon"), msgId);
接下來,元組就被髮送到下游的Bolt進行消費,Storm會負責跟蹤這個Spout元組建立的消息樹。若是Storm檢測到一個元組被完整地處理了,Storm會調用產生這個元組的Spout任務(Spout Bolt有多個任務來運行)的ack
函數,參數是Spout以前發送這個消息時提供給Storm的message-id。相似的,當元組處理超時或處理失敗時,Storm會在元組對應的Spout任務上調用fail
函數,參數是以前Spout發送這個消息時提供給Storm的message-id。這樣應用程序經過實現Spout Bolt中的ack
接口和fail
接口來處理消息處理成功和失敗的狀況。例如當消息處理成功時記錄當前處理的進度,當處理失敗時,從新發送消息來對這個消息進行從新處理。但在本文的例子裏fail
函數中不須要作任何處理,由於這些元組不會從Kestrel隊列中去掉,下次從隊列取消息,仍然會取到這些消息,只有處理成功後,纔會從Kestrel隊列中摘除這些消息。
做爲Storm用戶,若是想利用Storm的可靠性,須要作兩件事:
1. 建立一個元組時(消息樹上建立一個新節點)須要通知Storm 2. 處理完一個元組,須要通知Storm
經過這兩個操做,當消息樹被徹底處理完,Storm就能夠當即檢測到,從而能夠正確地確認這個Spout元組處理成功或者失敗。Storm的API提供了一套簡潔地處理這些操做的方法。
在Storm消息樹(元組樹)中添加一個子結點的操做叫作錨定
(anchoring)。在應用程序發送一個新元組時候,Storm會在幕後作錨定。仍是以前的流式計算單詞個數的例子,請看以下的代碼片斷:
public class SplitSentence extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector){ _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple, new Values(word)); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
每一個單詞元組是經過把輸入的元組做爲emit
函數中的第一個參數來作錨定的。經過錨定,Storm就可以獲得元組之間的關聯關係(輸入元組觸發了新的元組),繼而構建出Spout元組觸發的整個消息樹。因此當下遊處理失敗時,就能夠通知Spout當前消息樹根節點的Spout元組處理失敗,讓Spout從新處理。相反,若是在emit
的時候沒有指定輸入的元組,叫作不錨定
:
_collector.emit(new Values(word));
這樣發射單詞元組,會致使這個元組不被錨定(unanchored)
,這樣Storm就不能獲得這個元組的消息樹,繼而不能跟蹤消息樹是否被完整處理。這樣下游處理失敗,不能通知到上游的Spout任務。不一樣的應用的有不一樣的容錯處理方式,有時候須要這樣不錨定的場景。
一個輸出的元組能夠被錨定到多個輸入元組上,叫作多錨定(multi-anchoring)
。這在作流的合併或者聚合的時候很是有用。一個多錨定的元組處理失敗,會致使Spout上從新處理對應的多個輸入元組。多錨定是經過指定一個多個輸入元組的列表而不是單個元組來完成的。例如:
List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values(word));
多錨定會把這個新輸出的元組添加到多棵消息樹上。注意多錨定可能會打破消息的樹形結構,變成有向無環圖(DAG),Storm的實現既支持樹形結構,也支持有向無環圖(DAG)。在本文中,提到的消息樹跟有向無環圖是等價的。消息之間的關係是有向無環圖的例子見下圖:
消息造成的有向無環圖
Spout元組A觸發了B和C兩個元組,而這兩個元組做爲輸入,共同做用後觸發D元組。
錨定
的做用就是指定元組樹的結構--下一步是當元組樹中某個元組已經處理完成時,通知Storm。通知是經過OutputCollector
中的ack
和fail
函數來完成的。例如上面流式計算單詞個數例子中的split Bolt
的實現SplitSentence類,能夠看到句子被切分紅單詞後,當全部的單詞元組都被髮射後,會確認(ack)輸入的元組處理完成。
能夠利用OutputCollector
的fail
函數來當即通知Storm,當前消息樹的根元組處理失敗了。例如,應用程序可能捕捉到了數據庫客戶端的一個異常,就顯示地通知Storm輸入元組處理失敗。經過顯示地通知Storm元組處理失敗,這個Spout元組就不用等待超時而能更快地被從新處理。
Storm須要佔用內存來跟蹤每一個元組,因此每一個被處理的元組都必須被確認。由於若是不對每一個元組進行確認,任務最終會耗光可用的內存。
作聚合或者合併操做的Bolt可能會延遲確認一個元組,直到根據一堆元組計算出了一個結果後,纔會確認。聚合或者合併操做的Bolt,一般也會對他們的輸出元組進行多錨定。
Storm 0.7.0引入了「事務拓撲」(transactional topologies)的特性,它讓你在大多數場景下可以獲得徹底容錯的只被處理一次的消息語義。更多關於事物拓撲的介紹見這裏
一個Storm拓撲有一組特殊的"acker"任務,它們負責跟蹤由每一個Spout元組觸發的消息的處理狀態。當一個"acker"看到一個Spout元組產生的有向無環圖中的消息被徹底處理,就通知當初建立這個Spout元組的Spout任務,這個元組被成功處理。能夠經過拓撲配置項Config.TOPOLOGY_ACKER_EXECUTORS來設置一個拓撲中acker任務executor
的數量。Storm默認TOPOLOGY_ACKER_EXECUTORS
和拓撲中配置的Worker的數量相同(關於executor和Worker的介紹,參見理解Storm併發一文)--對於須要處理大量消息的拓撲來講,須要增大acker executor的數量。
理解Storm的可靠性實現方式的最好方法是查看元組的生命週期和元組構成的有向無環圖。當拓撲的Spout或者Bolt中建立一個元組時,都會被賦予一個隨機的64比特的標識(message id)。acker任務使用這些id來跟蹤每一個Spout元組產生的有向無環圖的處理狀態。在Bolt中產生一個新的元組時,會從錨定的一個或多個輸入元組中拷貝全部Spout元組的message-id,因此每一個元組都攜帶了本身所在元組樹的根節點Spout元組的message-id。當確認一個元組處理成功了,Storm就會給對應的acker任務發送特定的消息--通知acker當前這個Spout元組產生的消息樹中某個消息處理完了,並且這個特定消息在消息樹中又產生了一個新消息(新消息錨定的輸入是這個特定的消息)。
舉個例子,假設"D"元組和"E"元組是基於「C」元組產生的,那麼下圖描述了確認「C」元組成功處理後,元組樹的變化。圖中虛線框表示的元組表明已經在消息樹上被刪除了:
確認元組成功處理後消息樹的變化
因爲在「C」從消息樹中刪除(經過acker函數確認成功處理)的同時,「D」和「E」也被添加到(經過emit函數來錨定的)元組樹中,因此這棵樹歷來不會被提前處理完。
正如上面已經提到的,在一個拓撲中,能夠有任意數量的acker任務。這致使了以下的兩個問題:
Storm採用對元組中攜帶的Spout元組message-id哈希取模的方法來把一個元組映射到一個acker任務上(因此同一個消息樹裏的全部消息都會映射到同一個acker任務)。由於每一個元組攜帶了本身所處的元組樹中根節點Spout元組(可能有多個)的標識,因此Storm就能決定通知哪一個acker任務。
當一個Spout任務產出一個新的元組,僅須要簡單的發送一個消息給對應的acker(Spout元組message-id哈希取模)來告知Spout的任務標示(task id),以此來通知acker當前這個Spout任務負責這個消息。當acker看到一個消息樹被徹底處理完,它就能根據處理的元組中攜帶的Spout元組message-id來肯定產生這個Spout元組的task id,而後通知這個Spout任務消息樹處理完成(調用 Spout任務的ack
函數)。
對於擁有上萬節點(或者更多)的巨大的元組樹,跟蹤全部的元組樹會耗盡acker使用的內存。acker任務不顯示地(記錄完整的樹型結構)跟蹤元組樹,相反它使用了一種每一個Spout元組只佔用固定大小空間(大約20字節)的策略。這個跟蹤算法是Storm工做的關鍵,並且是重大突破之一。
一個acker任務存儲了從一個Spout元組message-id到一對值的映射關係spout-message-id--><spout-task-id, ack-val>
。第一個值是建立了這個Spout元組的任務id,用來後續處理完成時通知到這個Spout任務。第二個值是一個64比特的叫作「ack val」的數值。它是簡單的把消息樹中全部被建立或者被確認的元組message-id異或起來的值。每一個消息建立和被確認處理後都會異或到"ack val"上,A xor A = 0
,因此當一個「ack val」變成了0,說明整個元組樹都徹底被處理了。不管是很大的仍是很小的元組樹,"ack val"值都表明了整個元組樹中消息的處理狀態。因爲元組message-id是隨機的64比特的整數,因此同一個元組樹中不一樣元組message-id發生撞車的可能性特別小,所以「ack val」意外的變成0的可能性很是小。若是真的發生了這種狀況,而剛好這個元組也處理失敗了,那僅僅會致使這個元組的數據丟失。
使用異或操做來跟蹤消息樹處理狀態的想法很是有才。由於消息的數量可能有成千上萬條,每一個都單獨跟蹤(讀者能夠思考下怎麼搞)是很是低效並且不可水平擴展的。並且採用異或的方式後,就不依賴於acker接收到消息的順序了。
搞明白了可靠性的算法,讓咱們看看全部失敗的場景下Storm如何避免數據丟失:
acker任務是輕量級的,因此在一個拓撲中不須要太多的acker任務。能夠經過Storm UI(id爲"__acker"的組件)來觀察acker任務的性能。若是吞吐量看起來不正常,就須要添加更多的acker任務。
若是可靠性可有可無--例如你不關心元組失敗場景下的消息丟失--那麼你能夠經過不跟蹤元組的處理過程來提升性能。不跟蹤一個元組樹會讓傳遞的消息數量減半,由於正常狀況下,元組樹中的每一個元組都會有一個確認消息。另外,這也能減小每一個元組須要存儲的id的數量(指每一個元組存儲的Spout message-id),減小了帶寬的使用。
有三種方法來去掉可靠性:
Config.TOPOLOGY_ACKERS
爲0。這種狀況下,Storm會在Spout吐出一個元組後立馬調用Spout的ack
函數。這個元組樹不會被跟蹤。emit
函數的時候經過忽略消息message-id參數來關閉這個元組的跟蹤機制。emit
的時候不要使用錨定。因爲它們沒有被錨定到某個Spout元組上,因此當它們沒有被成功處理,不會致使Spout元組處理失敗。