storm可靠機制

一  可靠性 簡介

       Storm的可靠性是指Storm會告知用戶每個消息單元是否在一個指定的時間(timeout)內被徹底處理。 徹底處理的意思是該MessageId綁定的源Tuple以及由該源Tuple衍生的全部Tuple都通過了Topology中每個應該到達的Bolt的處理。html

注:  timetout 能夠經過 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS   來指定java

       Storm中的每個Topology中都包含有一個Acker組件。 Acker組件的任務就是跟蹤從某個task中的Spout流出的每個messageId所綁定的Tuple樹中的全部Tuple的處理狀況。 若是在用戶設置的最大超時時間內這些Tuple沒有被徹底處理,那麼Acker會告訴Spout該消息處理失敗,相反則會告知Spout該消息處理成功,它會分別調用Spout中的fail和ack方法。git

       Storm容許用戶在Spout中發射一個新的源Tuple時爲其指定一個MessageId,這個MessageId能夠是任意的Object對象。 多個源Tuple能夠共用同一個MessageId,表示這多個源Tuple對用戶來講是同一個消息單元,它們會被放到同一棵tuple樹中 ,以下圖所示:github

                                                    

                                                                          Tuple 樹web

       在Spout中由message 1綁定的tuple1和tuple2分別通過bolt1和bolt2的處理,而後生成了兩個新的Tuple,並最終流向了bolt3。當bolt3處理完以後,稱message 1被徹底處理了。算法

二 Acker  原理分析數據庫

       storm裏面有一類 特殊的task稱爲acker (acker bolt) ,  負責跟蹤spout發出的每個tuple的tuple樹 。當acker發現一個tuple樹已經處理完成了。它會發送一個消息給產生這個tuple的那個task。你能夠經過 Config.TOPOLOGY_ACKERS 來設置一個topology裏面的acker的數量, 默認值是1。 若是你的topology裏面的tuple比較多的話, 那麼把acker的數量設置多一點,效率會高一點。編程

       理解storm的可靠性的最好的方法是來看看tuple和tuple樹的生命週期, 當一個tuple被建立, 不論是spout仍是bolt建立的,  它會被賦予一個64位的id ,而acker就是利用這個id去跟蹤全部的tuple的。 每一個tuple知道它的祖宗的id(從spout發出來的那個tuple的id), 每當你新發射一個tuple, 它的祖宗id都會傳給這個新的tuple。 因此當一個tuple被ack的時候,它會發一個消息給acker,告訴它這個tuple樹發生了怎麼樣的變化。具體來講就是它告訴acker:  我已經完成了, 我有這些兒子tuple, 你跟蹤一下他們吧。api

                                  (spout-tuple-id, tmp-ack-val)app

                 tmp-ark-val =  tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )

   tmp-ack-val是要ack的tuple的id與由它新建立的全部的tuple的id異或的結果

        當一個tuple須要ack的時候,它到底選擇哪一個acker來發送這個信息呢?

          storm使用一致性哈希來把一個spout-tuple-id對應到acker, 由於每個tuple知道它全部的祖宗的tuple-id, 因此它天然能夠算出要通知哪一個acker來ack。

         注:一個tuple可能存在於多個tuple樹,全部可能存在多個祖宗的tuple-id

          acker是怎麼知道每個spout tuple應該交給哪一個task來處理?

       當一個spout發射一個新的tuple, 它會簡單的發一個消息給一個合適的acker,而且告訴acker它本身的id(taskid), 這樣storm就有了taskid-tupleid的對應關係。 當acker發現一個樹完成處理了, 它知道給哪一個task發送成功的消息。

Acker的高效性

           acker task並不顯式的跟蹤tuple樹。 對於那些有成千上萬個節點的tuple樹,把這麼多的tuple信息都跟蹤起來會耗費太多的內存。相反,  acker用了一種不一樣的方式, 使得對於每一個spout tuple所須要的內存量是恆定的(20 bytes) .  這個跟蹤算法是storm如何工做的關鍵,而且也是它的主要突破。

        一個acker task存儲了一個spout-tuple-id到一對值的一個mapping。這個對子的第一個值是建立這個tuple的taskid, 這個是用來在完成處理tuple的時候發送消息用的。 第二個值是一個64位的數字稱做:ack val , ack val是整個tuple樹的狀態的一個表示,無論這棵樹多大。它只是簡單地把這棵樹上的全部建立的tupleid/ack的tupleid一塊兒異或(XOR)。

            當一個acker task 發現一個 ack val變成0了, 它知道這棵樹已經處理完成了。

 例以下圖是一個簡單的Topology。

                         

                                                                          一 個簡單的 Topology

        ack_val的初值爲0,varl_x表示新產生的tuple id ,它們通過Spout,Bolt1,Bolt2,Bolt3 處理,並與arv_val異或,最終arv_val變爲0,表示tuple1被成功處理。

   下面看一個稍微複雜一點的例子:

        注:紅色虛線框表示的是Acker組件,ack_val表示acker value的值,它的初值爲0

        msg1綁定了兩個源tuple,它們的id分別爲1001和1010.在通過Bolt1處理後新生成了tuple id爲1110,新生成的tuple與傳入的tuple 1001進行異或獲得的值爲0111,而後Bolt1經過spout-tuple-id映射到指定的Acker組件,向它發送消息,Acker組件將Bolt1傳過來的值與ack_val異或,更新ack_val的值變爲了0100。與此相同通過Bolt2處理後,ack_val的值變爲0001。最後經Bolt3處理後ack_val的值變爲了0,說明此時由msg1標識的Tuple處理成功,此時Acker組件會經過事先綁定的task id映射找到對應的Spout,而後調用該Spout的ack方法。

            其流程以下圖所示:

     注: 1. Acker (ack bolt)組件由系統自動產生,通常來講一個topology只有一個ack bolt(固然能夠經過配置參數指定多個),當bolt處理並下發完tuple給下一跳的bolt時,會發送一個ack給ack bolt。ack bolt經過簡單的異或原理(即同一個數與本身異或結果爲零)來斷定從spout發出的某一個Tuple是否已經被徹底處理完畢。若是結果爲真,ack bolt發送消息給spout,spout中的ack函數被調用並執行。若是超時,則發送fail消息給spout,spout中的fail函數被調用並執行,spout中的ack和fail的處理邏輯由用戶自行填寫。

          2.  Acker對於每一個Spout-tuple保存一個ack-val的校驗值,它的初始值是0, 而後每發射一個tuple 就ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,而且把獲得的值更新爲ack-val的新值。那麼假設每一個發射出去的tuple都被ack了, 那麼最後ack-val必定是0(由於一個數字跟本身異或獲得的值是0)。

            A xor A = 0.

          A xor B…xor B xor A = 0,其中每個操做數出現且僅出現兩次。

        3.  tupleid是隨機的64位數字, ack val碰巧變成0(例如:ark_val = 1 ^ 2  ^ 3 = 0)而不是由於全部建立的tuple都完成了,這樣的機率極小。 算一下就知道了, 就算每秒發生10000個ack, 那麼須要50000000萬年纔可能碰到一個錯誤。並且就算碰到了一個錯誤, 也只有在這個tuple失敗的時候纔會形成數據丟失。 

      看看storm在每種異常狀況下是怎麼避免數據丟失的:

         1. 因爲對應的task掛掉了,一個tuple沒有被ack:  storm的超時機制在超時以後會把這個tuple標記爲失敗,從而能夠從新處理。

         2. Acker掛掉了:  這種狀況下由這個acker所跟蹤的全部spout tuple都會超時,也就會被從新處理。

          3. Spout掛掉了:  在這種狀況下給spout發送消息的消息源負責從新發送這些消息。好比Kestrel和RabbitMQ在一個客戶端斷開以後會把全部」處理中「的消息放回隊列。

就像你看到的那樣, storm的可靠性機制是徹底分佈式的, 可伸縮的而且是高度容錯的。

四Acker 工做流程

咱們來看看acker的工做流程:

1. Spout在初始化時會產生一個tasksId;

2. Spout中建立新的Tuple,其id是一個64位的隨機數;

3. Spout將新建的Tuple發送出去(給出了messageId來開啓Tuple的追蹤), 同時會發送一個消息到某個acker,要求acker進行追蹤。該消息包含兩部分

  • Spout的taskId:用戶acker在整個tuple樹被徹底處理後找到原始的Spout進行回調ack或fail

  • 一個64位的ack val值: 標誌該tuple是否被徹底處理。初始值爲0。

4. 一個Bolt在處理完Tuple後,若是發射了一個新的anchor tuple,Storm會維護anchor tuple的列表;

5. 該Bolt調用OutputCollector.ack()時,Storm會作以下操做:

  • anchor tuple列表中每一個已經ack過的和新建立的Tuple的id作異或(XOR)。假定Spout發出的TupleID是tuple-id-0,該Bolt新生成的TupleID爲tuple-id-1,那麼,tuple-id-0XORtuple-id-0XORtuple-id-1

  • Storm根據該原始TupleID進行一致性hash算法,找到最開始Spout發送的那個acker,而後把上面異或後得出的ack val值發送給acker

6. acker收到新的ack val值後,與保存的原始的Tuple的id進行異或,若是爲0,表示該Tuple已被徹底處理,則根據其taskId找到原始的Spout,回調其ack()方法。


fail的機制相似,在發現fail後直接回調Spout的fail方法。

四 Acker 編程接口

        在Spout中,Storm系統會爲用戶指定的MessageId生成一個對應的64位的整數,做爲整個Tuple Tree的RootId。RootId會被傳遞給Acker以及後續的Bolt來做爲該消息單元的惟一標識。同時,不管Spout仍是Bolt每次新生成一個Tuple時,都會賦予該Tuple一個惟一的64位整數的Id。

      當Spout發射完某個MessageId對應的源Tuple以後,它會告訴Acker本身發射的RootId以及生成的那些源Tuple的Id。而當Bolt處理完一個輸入Tuple併產生出新的Tuple時,也會告知Acker本身處理的輸入Tuple的Id以及新生成的那些Tuple的Id。Acker只須要對這些Id進行異或運算,就能判斷出該RootId對應的消息單元是否成功處理完成了。

   下面這個是spout要實現的接口:

public interface ISpout extends Serializable {
     void open(Map conf, TopologyContext context,SpoutOutputCollector collector);      
     void close();      
     void nextTuple();      
     void ack(Object msgId);     
     void fail(Object msgId); 
    }

        首先 storm經過調用spout的nextTuple方法來獲取下一個tuple, Spout經過open方法參數裏面提供的SpoutOutputCollector來發射新tuple到它的其中一個輸出消息流, 發射tuple的時候spout會提供一個message-id, 後面經過這個message-id來追蹤這個tuple。

this.collector.emit(new Values("hello world"),msgId);

          注:

msgId是提供給Acker組件使用的,Acker組件使用msgId來跟蹤Tuple樹

       接下來, 這個發射的tuple被傳送到消息處理者bolt那裏, storm會跟蹤由此所產生的這課tuple樹。若是storm檢測到一個tuple被徹底處理了, 那麼storm會以最開始的那個message-id做爲參數去調用消息源的ack方法;反之storm會調用spout的fail方法。值得注意的是, storm調用ack或者fail的task始終是產生這個tuple的那個task。因此若是一個spout被分紅不少個task來執行, 消息執行的成功失敗與否始終會通知最開始發出tuple的那個task。

         做爲storm的使用者,有兩件事情要作以更好的利用storm的可靠性特徵。 首先,在你生成一個新的tuple的時候要通知storm; 其次,完成處理一個tuple以後要通知storm。 這樣storm就能夠檢測整個tuple樹有沒有完成處理,而且通知源spout處理結果。storm提供了一些簡潔的api來作這些事情。

         由一個tuple產生一個新的tuple稱爲:anchoring。 你發射一個新tuple的同時也就完成了一次anchoring。看下面這個例子: 這個bolt把一個包含一個句子的tuple分割成每一個單詞一個tuple。  

public class SplitSentence implements IRichBolt {  
    OutputCollector _collector; 
    public void prepare(Map conf,TopologyContext context,OutputCollector collector) {        
        _collector = collector;  
        } 
    public void execute(Tuple tuple) {    
        String sentence = tuple.getString(0);    
        for(String word: sentence.split(" ")) {          
            _collector.emit(tuple,new Values(word));    
            }       
         _collector.ack(tuple);  
         }
 
       public void cleanup() {}
       public void declareOutputFields(OutputFieldsDeclarer declarer) {      
           declarer.declare(newFields("word"));
       }
   }

        看一下這個execute方法, emit的第一個參數是輸入tuple, 第二個參數則是輸出tuple, 這其實就是經過輸入tuple anchoring了一個新的輸出tuple。由於這個「單詞tuple」被anchoring在「句子tuple」一塊兒, 若是其中一個單詞處理出錯,那麼這整個句子會被從新處理。做爲對比, 咱們看看若是經過下面這行代碼來發射一個新的tuple的話會有什麼結果。

     _collector.emit(new Values(word));

        用這種方法發射會致使新發射的這個tuple脫離原來的tuple樹(unanchoring), 若是這個tuple處理失敗了, 整個句子不會被從新處理。一個輸出tuple能夠被anchoring到多個輸入tuple。這種方式在stream合併或者stream聚合的時候頗有用。一個多入口tuple處理失敗的話,那麼它對應的全部輸入tuple都要從新執行。看看下面演示怎麼指定多個輸入tuple:

 List<Tuple> anchors = new ArrayList<Tuple>();

anchors.add(tuple1);

anchors.add(tuple2);

_collector.emit(anchors,new Values(1,2,3));

         咱們經過anchoring來構造這個tuple樹,最後一件要作的事情是在你處理完這個tuple的時候告訴storm,  經過OutputCollector類的ack和fail方法來作,若是你回過頭來看看 SplitSentence 的例子, 你能夠看到「句子tuple」在全部「單詞tuple」被髮出以後調用了ack。

       你能夠調用 OutputCollector  的fail方法去當即將從消息源頭髮出的那個tuple標記爲fail, 好比你查詢了數據庫,發現一個錯誤,你能夠立刻fail那個輸入tuple, 這樣可讓這個tuple被快速的從新處理, 由於你不須要等那個timeout時間來讓它自動fail。

        每一個你處理的tuple, 必須被ack或者fail。由於storm追蹤每一個tuple要佔用內存。因此若是你不ack/fail每個tuple, 那麼最終你會看到OutOfMemory錯誤。

       大多數Bolt遵循這樣的規律:讀取一個tuple;發射一些新的tuple;在execute的結束的時候ack這個tuple。這些Bolt每每是一些過濾器或者簡單函數。Storm爲這類規律封裝了一個BasicBolt類。若是用BasicBolt來作, 上面那個SplitSentence能夠改寫成這樣:

public class SplitSentence implements IBasicBolt {  
    public void prepare(Map conf,TopologyContext context) {} 
    public void execute(Tuple tuple,BasicOutputCollector collector) {      
        String sentence = tuple.getString(0);      
        for(String word: sentence.split(" ")) {        
            collector.emit(newValues(word));      
            }
       } 
    public void cleanup() {} 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {      
        declarer.declare(newFields("word"));  
        }
    }

         這個實現比以前的實現簡單多了, 可是功能上是同樣的,發送到BasicOutputCollector的tuple會自動和輸入tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack的。

          做爲對比,處理聚合和合並的bolt每每要處理一大堆的tuple以後才能被ack, 而這類tuple一般都是多輸入的tuple, 因此這個已經不是IBasicBolt能夠罩得住的了。

  注:當一個Tuple處理失敗的時候,storm不會自動的重發該tuple,須要用戶本身來編寫邏輯從新處理fail掉的Tuple,能夠將其 放入一個列表中,在nextTuple()中獲取這些失敗的tuple,從新發射。

五 調整可靠性 

       acker task是很是輕量級的, 因此一個topology裏面不須要不少acker。你能夠經過Strom UI(id: -1)來跟蹤它的性能。 若是它的吞吐量看起來不正常,那麼你就須要多加點acker了。

若是可靠性對你來講不是那麼重要 — 你不太在乎在一些失敗的狀況下損失一些數據, 那麼你能夠經過不跟蹤這些tuple樹來獲取更好的性能。不去跟蹤消息的話會使得系統裏面的消息數量減小一半, 由於對於每個tuple都要發送一個ack消息。而且它須要更少的id來保存下游的tuple, 減小帶寬佔用。

  有三種方法能夠去掉可靠性:

       第一是把Config.TOPOLOGY_ACKERS 設置成 0. 在這種狀況下, storm會在spout發射一個tuple以後立刻調用spout的ack方法。也就是說這個tuple樹不會被跟蹤。

第二個方法是在tuple層面去掉可靠性。 你能夠在發射tuple的時候不指定messageid來達到不跟蹤某個特定的spout tuple的目的。

最後一個方法是若是你對於一個tuple樹裏面的某一部分到底成不成功不是很關心,那麼能夠在發射這些tuple的時候unanchor它們。 這樣這些tuple就不在tuple樹裏面, 也就不會被跟蹤了。

總結

spout裏的msgId(也就是tupleId,系統自動生成64位)和bolt裏的錨點有什麼相關性呢?

msgId用來跟蹤一個tuple,可是一個tuple可能要被N個bolt進行處理,你須要保證每一個bolt都處理正常,這時候你在任何一個bolt調用ack(tuple)的話,spout都會認定完成,這種狀況明顯不符合要求。 錨定就是對每一個數據的路徑負責,只有當路徑執行到底,而且正確的狀況下,反饋信息到錨定節點,錨定節點纔會確認數據正常。

相關文章
相關標籤/搜索