Storm的acker機制,可以保證消息至少被處理一次(at least once)。也就是說,可以保證不丟消息。這裏就詳細解析一下acker的實現原理。算法
假設咱們有一個簡單的topology,結構爲spout -> bolt。 spout emit了一條消息,發送至bolt。bolt做爲最後一個處理者,沒有再向下游emit消息。框架
從 上圖能夠看到,全部的ack消息都會發送到acker,acker會根據算法計算從特定spout發射出來的tuple tree是否被徹底處理。若是成功處理,則發送__acker_ack消息給spout,不然發送__acker_fail消息給spout。而後 spout中能夠作相應的邏輯如重發消息等。dom
在JStorm中,acker是一種bolt,所以它的處理、消息發送跟正常的bolt是同樣的。只不過,acker是JStorm框架建立的bolt,用戶不能自行建立。若是用戶在代碼中使用:spa
Config.setNumAckers(conf, 1);
就會自動建立並行度爲1的acker bolt;若是爲0,則就沒有acker bolt了。code
acker的算法很是巧妙,它利用了數學上的異或操做來實現對整個tuple tree的判斷。在一個topology中的一條消息造成的tuple tree中,全部的消息,都會有一個MessageId,它內部其實就是一個map:orm
Map<Long, Long> _anchorsToIds;
存儲的是anchor和anchor value。而anchor其實就是root_id,它在spout中生成,而且一路透傳到全部的bolt中,屬於同一個tuple tree中的消息都會有相同的root_id,它能夠惟一標識spout發出來的這條消息(以及從下游bolt根據這個tuple衍生髮出的消息)。數學
下面是一個tuple的ack流程:it
<root_id, random()>
,即爲這個root_id對應一個隨機數值,而後隨着消息自己發送到下游bolt中。假設有2個bolt,生成的隨機數對分別爲:<root_id, r1>
, <root_id, r2>
。<root_id, r1 ^ r2>
(即全部task產生的隨機數列表的異或值)。當前值 ^ 新生成的隨機數
。以此類推。咱們以一個稍微複雜一點的topology爲例,描述一下它的整個過程。 假設咱們的topology結構爲: spout -> bolt1/bolt2 -> bolt3
即spout同時向bolt1和bolt2發送消息,它們處理完後,都向bolt3發送消息。bolt3沒有後續處理節點。ast
1). spout發射一條消息,生成root_id,因爲這個值不變,咱們就用root_id來標識。 spout -> bolt1的MessageId = <root_id, 1>
spout -> bolt2的MessageId = <root_id, 2>
spout -> acker的MessageId = <root_id, 1^2>
原理
2). bolt1收到消息後,生成以下消息: bolt1 -> bolt3的MessageId = <root_id, 3>
bolt1 -> acker的MessageId = <root_id, 1^3>
3). 一樣,bolt2收到消息後,生成以下消息: bolt2 -> bolt3的MessageId = <root_id, 4>
bolt2 -> acker的MessageId = <root_id, 2^4>
4). bolt3收到消息後,生成以下消息: bolt3 -> acker的MessageId = <root_id, 3>
bolt3 -> acker的MessageId = <root_id, 4>
5). acker中總共收到如下消息: <root_id, 1^2>
<root_id, 1^3>
<root_id, 2^4>
<root_id, 3>
<root_id, 4>
全部的值進行異或以後,即爲1^2^1^3^2^4^3^4
= 0。