JStorm如何保證消息不丟失

Storm的acker機制,可以保證消息至少被處理一次(at least once)。也就是說,可以保證不丟消息。這裏就詳細解析一下acker的實現原理。算法

消息流

假設咱們有一個簡單的topology,結構爲spout -> bolt。 spout emit了一條消息,發送至bolt。bolt做爲最後一個處理者,沒有再向下游emit消息。框架

 

從 上圖能夠看到,全部的ack消息都會發送到acker,acker會根據算法計算從特定spout發射出來的tuple tree是否被徹底處理。若是成功處理,則發送__acker_ack消息給spout,不然發送__acker_fail消息給spout。而後 spout中能夠作相應的邏輯如重發消息等。dom

 

在JStorm中,acker是一種bolt,所以它的處理、消息發送跟正常的bolt是同樣的。只不過,acker是JStorm框架建立的bolt,用戶不能自行建立。若是用戶在代碼中使用:spa

Config.setNumAckers(conf, 1);

就會自動建立並行度爲1的acker bolt;若是爲0,則就沒有acker bolt了。code

如何判斷消息是否被成功處理?

acker的算法很是巧妙,它利用了數學上的異或操做來實現對整個tuple tree的判斷。在一個topology中的一條消息造成的tuple tree中,全部的消息,都會有一個MessageId,它內部其實就是一個map:orm

Map<Long, Long> _anchorsToIds;

存儲的是anchor和anchor value。而anchor其實就是root_id,它在spout中生成,而且一路透傳到全部的bolt中,屬於同一個tuple tree中的消息都會有相同的root_id,它能夠惟一標識spout發出來的這條消息(以及從下游bolt根據這個tuple衍生髮出的消息)。數學

下面是一個tuple的ack流程:it

  1. spout發送消息時,先生成root_id。
  2. 對每個目標bolt task,生成<root_id, random()>,即爲這個root_id對應一個隨機數值,而後隨着消息自己發送到下游bolt中。假設有2個bolt,生成的隨機數對分別爲:<root_id, r1>, <root_id, r2>
  3. spout向acker發送ack_init消息,它的MessageId = <root_id, r1 ^ r2>(即全部task產生的隨機數列表的異或值)。
  4. bolt收到spout或上游bolt發送過來的tuple以後,首先它會向acker發送ack消息,MessageId即爲收到的值。同時,若是bolt下游還有bolt,則跟步驟2相似,會對每個bolt,生成隨機數對,root_id相同,可是值變爲當前值 ^ 新生成的隨機數。以此類推。
  5. acker收到消息後,會對root_id下全部的值作異或操做,若是算出來的值爲0,表示整個tuple tree被成功處理;不然就會一直等待,直到超時,則tuple tree處理失敗。
  6. acker通知spout消息處理成功或失敗。

咱們以一個稍微複雜一點的topology爲例,描述一下它的整個過程。 假設咱們的topology結構爲: spout -> bolt1/bolt2 -> bolt3即spout同時向bolt1和bolt2發送消息,它們處理完後,都向bolt3發送消息。bolt3沒有後續處理節點。ast

 

1). spout發射一條消息,生成root_id,因爲這個值不變,咱們就用root_id來標識。 spout -> bolt1的MessageId = <root_id, 1> spout -> bolt2的MessageId = <root_id, 2> spout -> acker的MessageId = <root_id, 1^2>原理

 

2). bolt1收到消息後,生成以下消息: bolt1 -> bolt3的MessageId = <root_id, 3> bolt1 -> acker的MessageId = <root_id, 1^3>

3). 一樣,bolt2收到消息後,生成以下消息: bolt2 -> bolt3的MessageId = <root_id, 4> bolt2 -> acker的MessageId = <root_id, 2^4>

4). bolt3收到消息後,生成以下消息: bolt3 -> acker的MessageId = <root_id, 3> bolt3 -> acker的MessageId = <root_id, 4>

5). acker中總共收到如下消息: <root_id, 1^2> <root_id, 1^3> <root_id, 2^4> <root_id, 3> <root_id, 4> 全部的值進行異或以後,即爲1^2^1^3^2^4^3^4 = 0。

相關文章
相關標籤/搜索