storm 可靠性和非可靠性

    咱們知道Storm有一個很重要的特性,那就是Storm API可以保證它的一個Tuple可以被徹底處理,這一點尤其重要,其實storm中的可靠性是由spout和bolt組件共同完成的,下面就從spout和bolt兩個方便給你們介紹一下storm中的可靠性,最後會給出一個實現了可靠性的例子。緩存

1.Spout的可靠性保證

     在Storm中,消息處理可靠性從Spout開始的。storm爲了保證數據能正確的被處理, 對於spout產生的每個tuple,storm都可以進行跟蹤,這裏面涉及到了ack/fail的處理, 若是一個tuple被處理成功,那麼spout便會調用其ack方法,若是失敗,則會調用fail方法。而topology中處理tuple的每個bolt都會經過OutputCollector來告知storm,當前bolt處理是否成功。dom

     咱們知道spout必須可以追蹤它發射的全部tuples或其子tuples,而且在這些tuples處理失敗時可以重發。那麼spout如何追蹤tuple呢?storm是經過一個簡單的anchor機制來實現的(在下面的bolt可靠性中會講到)。this

       spout發射根tuple,根tuple產生子tuples。這就造成一個TupleTree。在這個tree中,全部的bolt都會ack或fail一個tuple,若是tree中全部的bolt都ack了通過它的tuple,那麼Spout的ack方法就會被調用,表示整個消息被處理完成。若是tree中的任何一個bolt fail一個tuple,或者整個處理過程超時,則Spout的fail方法便會被調用。spa

     另一點, storm只是經過ack/fail機制來告訴應用方bolt中間的處理狀況, 對於成功/失敗該如何處理, 必須由應用本身來決定, 由於storm內部也沒有保存失敗的具體數據, 可是也有辦法知道失敗記錄,由於spout的ack/fail方法會附帶一個msgId對象, 咱們能夠在最初發射tuple的時候將將msgId設置爲tuple, 而後在ack/fail中對該tuple進行處理。這裏其實有個問題, 就是每一個bolt執行完以後要顯式的調用ack/fail,不然會出現tuple不釋放致使oom. 不知道storm在最初設計的時候,爲何不將bolt的ack設置爲默認調用。設計

     Storm的ISpout接口定義了三個與可靠性有關的方法:nextTuple,ack和fail。code

public interface ISpout extends Serializable {
           void open( Map conf, TopologyContext context, SpoutOutputCollector collector);
           void close();
           void nextTuple();
           void ack(Object msgId);
           void fail(Object msgId);
    }

    咱們知道,當Storm的Spout發射一個Tuple後,他便會調用nextTuple()方法,在這個過程當中,保證可靠性處理的第一步就是爲發射出的Tuple分配一個惟一的ID,並把這個ID傳給emit()方法:orm

collector.emit( new Values("value1" , "value2") , msgId );

    爲Tuple分配一個惟一ID的目的就是爲了告訴Storm,Spout但願這個Tuple產生的Tuple tree在處理完成或失敗後告知它,若是Tuple被處理成功,Spout的ack()方法就會被調用,相反若是處理失敗,Spout的fail()方法就會被調用,Tuple的ID也都會傳入這兩個方法中。對象

     須要注意的是,雖然spout有可靠性機制,但這個機制是否啓用由咱們控制的。IBasicBolt在emit一個tuple後自動調用ack()方法,用來實現比較簡單的計算,這個是不可靠的。若是是IRichBolt的話,若是想要實現anchor,必須本身調用ack方法,這個保證可靠性。繼承

2.Bolt中的可靠性

     Bolt中的可靠性主要靠兩步來實現:接口

    1. 發射衍生Tuple的同時anchor原Tuple
    2. 對各個Tuples作ack或fail處理     

     anchor一個Tuple就意味着在輸入Tuple和其衍生Tuple之間創建了關聯,關聯以後的Tuple便加入了Tuple tree。咱們能夠經過以下方式anchor一個Tuple:

collector.emit( tuple, new Values( word));

    若是咱們發射新tuple的時候不一樣時發射元tuple,那麼新發射的Tuple不會參與到整個可靠性機制中,它們的fail不會引發root tuple的重發,咱們成爲unanchor:

collector.emit( new Values( word));

ack和fail一個tuple的操做方法:

this .collector.ack(tuple);
this .collector.fail(tuple);

    上面講過了,IBasicBolt 實現類不關心ack/fail, spout的ack/fail徹底由後面的bolt的ack/fail來決定. 其execute方法的BasicOutputCollector參數也沒有提供ack/fail方法給你調用. 至關於忽略了該bolt的ack/fail行爲。

     在 IRichBolt實現類中, 若是OutputCollector.emit(oldTuple,newTuple)這樣調用來發射tuple(anchoring), 那麼後面的bolt的ack/fail會影響spout ack/fail, 若是collector.emit(newTuple)這樣來發射tuple(在storm稱之爲anchoring), 則至關於斷開了後面bolt的ack/fail對spout的影響.spout將當即根據當前bolt前面的ack/fail的狀況來決定調用spout的ack/fail. 因此某個bolt後面的bolt的成功失敗對你來講不關心, 你能夠直接經過這種方式來忽略.中間的某個bolt fail了, 不會影響後面的bolt執行, 可是會當即觸發spout的fail. 至關於短路了, 後面bolt雖然也執行了, 可是ack/fail對spout已經無心義了. 也就是說, 只要bolt集合中的任何一個fail了, 會當即觸發spout的fail方法. 而ack方法須要全部的bolt調用爲ack才能觸發. 因此IBasicBolt用來作filter或者簡單的計算比較合適。

3.總結

    storm的可靠性是由spout和bolt共同決定的,storm利用了anchor機制來保證處理的可靠性。若是spout發射的一個tuple被徹底處理,那麼spout的ack方法即會被調用,若是失敗,則其fail方法便會被調用。在bolt中,經過在emit(oldTuple,newTuple)的方式來anchor一個tuple,若是處理成功,則須要調用bolt的ack方法,若是失敗,則調用其fail方法。一個tuple及其子tuple共同構成了一個tupletree,當這個tree中全部tuple在指定時間內都完成時spout的ack纔會被調用,可是當tree中任何一個tuple失敗時,spout的fail方法則會被調用。

    IBasicBolt類會自動調用ack/fail方法,而IRichBolt則須要咱們手動調用ack/fail方法。咱們能夠經過TOPOLOGY_MESSAGE_TIMEOUT_SECS參數來指定一個tuple的處理完成時間,若這個時間未被處理完成,則spout也會調用fail方法。

4.一個可靠的WordCount例子

一個實現可靠性的spout: 

public class ReliableSentenceSpout extends BaseRichSpout {
     private static final long serialVersionUID = 1L;
     private ConcurrentHashMap<UUID, Values> pending;
     private SpoutOutputCollector collector;
     private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" };
     private int index = 0;
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "sentence"));
      }
     public void open( Map config, TopologyContext context, SpoutOutputCollector collector) {
           this. collector = collector;
           this. pending = new ConcurrentHashMap<UUID, Values>();
      }
     public void nextTuple() {
          Values values = new Values( sentences[ index]);
          UUID msgId = UUID. randomUUID();
           this. pending.put(msgId, values);
           this. collector.emit(values, msgId);
           index++;
           if ( index >= sentences. length) {
               index = 0;
          }
           //Utils.waitForMillis(1);
      }
     public void ack(Object msgId) {
           this. pending.remove(msgId);
      }
     public void fail(Object msgId) {
           this. collector.emit( this. pending.get(msgId), msgId);
      }
 }

一個實現可靠性的bolt:

public class ReliableSplitSentenceBolt extends BaseRichBolt {
     private OutputCollector collector;
     public void prepare( Map config, TopologyContext context, OutputCollector collector) {
           this. collector = collector;
      }
     public void execute(Tuple tuple) {
          String sentence = tuple.getStringByField("sentence" );
          String[] words = sentence.split( " ");
           for (String word : words) {
               this. collector.emit(tuple, new Values(word));
          }
           this. collector.ack(tuple);
      }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "word"));
      }
 }

這個例子中咱們實現了storm的可靠性,tuple失敗了將會從新發送,直處處理成功。這裏pending是一個map,爲了實現tuple的失敗重發。storm裏面topology.max.spout.pending屬性解釋:
1.同時活躍的batch數量,你必須設置同時處理的batch數量。你能夠經過」topology.max.spout.pending」 來指定, 若是你不指定,默認是1。
2.topology.max.spout.pending 的意義在於 ,緩存spout發送出去的tuple,當下流的bolt還有topology.max.spout.pending 個 tuple 沒有消費完時,spout會停下來,等待下游bolt去消費,當tuple 的個數少於topology.max.spout.pending個數時,spout 會繼續從消息源讀取消息。(這個屬性僅對可靠消息處理)。

若是使用事務,則表示同時處理的batch數量,若是非事務,則理解成第二種。

總而言之,若是不須要保證可靠性,spout繼承BaseRichSpout,bolt繼承BaseBasicBolt,它們內部實現了一些方法,自動ack,咱們不須要關心ack和fail;若是要保證可靠性,spout實現IRichSpout接口,發tuple的時候,帶上msgId,自定義fail和ack方法,bolt繼承BaseRichBolt,發送tuple的時候要帶上原tuple,要手動ack。

相關文章
相關標籤/搜索