Storm的ack機制在項目應用中的坑

正在學習storm的大兄弟們,我又來傳道授業解惑了,是否是以爲本身會用ack了。好吧,那就讓我開始啪啪打大家臉吧。mysql

先說一下ACK機制:redis

  爲了保證數據能正確的被處理, 對於spout產生的每個tuple, storm都會進行跟蹤。算法

  這裏面涉及到ack/fail的處理,若是一個tuple處理成功是指這個Tuple以及這個Tuple產生的全部Tuple都被成功處理, 會調用spout的ack方法;sql

  若是失敗是指這個Tuple或這個Tuple產生的全部Tuple中的某一個tuple處理失敗, 則會調用spout的fail方法;數據庫

  在處理tuple的每個bolt都會經過OutputCollector來告知storm, 當前bolt處理是否成功。api

  另外須要注意的,當spout觸發fail動做時,不會自動重發失敗的tuple,須要咱們在spout中從新獲取發送失敗數據,手動從新再發送一次。緩存

Ack原理
  Storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每個Tuple的Tuple樹(由於一個tuple經過spout發出了,通過每個bolt處理後,會生成一個新的tuple發送出去)。當acker(框架自啓動的task)發現一個Tuple樹已經處理完成了,它會發送一個消息給產生這個Tuple的那個task。
Acker的跟蹤算法是Storm的主要突破之一,對任意大的一個Tuple樹,它只須要恆定的20字節就能夠進行跟蹤。
Acker跟蹤算法的原理:acker對於每一個spout-tuple保存一個ack-val的校驗值,它的初始值是0,而後每發射一個Tuple或Ack一個Tuple時,這個Tuple的id就要跟這個校驗值異或一下,而且把獲得的值更新爲ack-val的新值。那麼假設每一個發射出去的Tuple都被ack了,那麼最後ack-val的值就必定是0。Acker就根據ack-val是否爲0來判斷是否徹底處理,若是爲0則認爲已徹底處理。
框架

要實現ack機制:
1,spout發射tuple的時候指定messageId
2,spout要重寫BaseRichSpout的fail和ack方法
3,spout對發射的tuple進行緩存(不然spout的fail方法收到acker發來的messsageId,spout也沒法獲取到發送失敗的數據進行重發),看看系統提供的接口,只有msgId這個參數,這裏的設計不合理,其實在系統裏是有cache整個msg的,只給用戶一個messageid,用戶如何取得原來的msg貌似須要本身cache,而後用這個msgId去查詢,太坑爹了
3,spout根據messageId對於ack的tuple則從緩存隊列中刪除,對於fail的tuple能夠選擇重發。
4,設置acker數至少大於0;Config.setNumAckers(conf, ackerParal);
dom

Storm的Bolt有BsicBolt和RichBolt:
  在BasicBolt中,BasicOutputCollector在emit數據的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。
  使用RichBolt須要在emit數據的時候,顯示指定該數據的源tuple要加上第二個參數anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);而且須要在execute執行成功後調用OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);
分佈式

由一個tuple產生一個新的tuple稱爲:anchoring,你發射一個tuple的同時也就完成了一次anchoring。

  ack機制即,spout發送的每一條消息,在規定的時間內,spout收到Acker的ack響應,即認爲該tuple 被後續bolt成功處理;在規定的時間內(默認是30秒),沒有收到Acker的ack響應tuple,就觸發fail動做,即認爲該tuple處理失敗,timeout時間能夠經過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設定。或者收到Acker發送的fail響應tuple,也認爲失敗,觸發fail動做

  注意,我開始覺得若是繼承BaseBasicBolt那麼程序拋出異常,也會讓spout進行重發,可是我錯了,程序直接異常中止了

  這裏我以分佈式程序入門案例worldcount爲例子吧。請看下面大屏幕:沒有錯我就是那個大家走在路上常常聽見的名字劉洋。

  這裏spout1-1task發送句子"i am liu yang"給bolt2-2task進行處理,該task把句子切分爲單詞,根據字段分發到下一個bolt中,bolt2-2,bolt4-4,bolt5-5對每個單詞添加一個後綴1後再發送給下一個bolt進行存儲到數據庫的操做,這個時候bolt7-7task在存儲數據到數據庫時失敗,向spout發送fail響應,這個時候spout收到消息就會再次發送的該數據。

  好,那麼我思考一個問題:spout如何保證再次發送的數據就是以前失敗的數據,因此在spout實例中,絕對要定義一個map緩存,緩存發出去的每一條數據,key固然就是messageId,當spout實例收到全部bolt的響應後若是是ack,就會調用咱們重寫的ack方法,在這個方法裏面咱們就要根據messageId刪除這個key-value,若是spout實例收到全部bolt響應後,發現是faile,則會調用咱們重寫的fail方法,根據messageId查詢到對應的數據再次發送該數據出去。

spout代碼以下

public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L; // key:messageId,Data private HashMap<String, String> waitAck = new HashMap<String, String>(); private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { String sentence = "i am liu yang"; String messageId = UUID.randomUUID().toString().replaceAll("-", ""); waitAck.put(messageId, sentence); //指定messageId,開啓ackfail機制 collector.emit(new Values(sentence), messageId); } @Override public void ack(Object msgId) { System.out.println("消息處理成功:" + msgId); System.out.println("刪除緩存中的數據..."); waitAck.remove(msgId); } @Override public void fail(Object msgId) { System.out.println("消息處理失敗:" + msgId); System.out.println("從新發送失敗的信息..."); //重發若是不開啓ackfail機制,那麼spout的map對象中的該數據不會被刪除的。 collector.emit(new Values(waitAck.get(msgId)),msgId); } }

 雖然在storm項目中咱們的spout源一般來源kafka,並且咱們使用storm提供的工具類KafkaSpout類,其實這個類裏面就維護者<messageId,Tuple>對的集合。

Storm怎麼處理重複的tuple?
  由於Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail並從新發送該tuple,那麼就會有tuple重複計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。由於實時計算一般並不要求很高的精確度,後續的批處理計算會更正實時計算的偏差。
(2)使用第三方集中存儲來過濾,好比利用mysql,memcached或者redis根據邏輯主鍵來去重。
(3)使用bloom filter作過濾,簡單高效。

問題一:大家有沒有想過若是某一個task節點處理的tuple一直失敗,消息一直重發會怎麼樣?

  咱們都知道,spout做爲消息的發送源,在沒有收到該tuple來至左右bolt的返回信息前,是不會刪除的,那麼若是消息一直失敗,就會致使spout節點存儲的tuple數據愈來愈多,致使內存溢出。

問題二:有沒有想過,若是該tuple的衆多子tuple中,某一個子tuple處理failed了,可是另外的子tuple仍然會繼續執行,若是子tuple都是執行數據存儲操做,那麼就算整個消息失敗,那些生成的子tuple仍是會成功執行而不會回滾的。

  這個時候storm的原生api是沒法支持這種事務性操做,咱們可使用storm提供的高級api-trident來作到(具體如何我不清楚,目前沒有研究它,可是我能夠它內部必定是根據分佈式協議好比兩階段提交協議等)。向這種業務中要保證事務性功能,咱們徹底能夠根據咱們自身的業務來作到,好比這裏的入庫操做,咱們先記錄該消息是否已經入庫的狀態,再入庫時查詢狀態來決定是否給予執行。

問題三:tuple的追蹤並不必定要是從spout結點到最後一個bolt,只要是spout開始,能夠在任意層次bolt中止追蹤作出應答。

Acker task 組件來設置一個topology裏面的acker的數量,默認值是一,若是你的topoogy裏面的tuple比較多的話,那麼請把acker的數量設置多一點,效率會更高一點。

調整可靠性 
acker task是很是輕量級的, 因此一個topology裏面不須要不少acker。你能夠經過Strom UI(id: -1)來跟蹤它的性能。 若是它的吞吐量看起來不正常,那麼你就須要多加點acker了。

若是可靠性對你來講不是那麼重要 — 你不太在乎在一些失敗的狀況下損失一些數據, 那麼你能夠經過不跟蹤這些tuple樹來獲取更好的性能。不去跟蹤消息的話會使得系統裏面的消息數量減小一半, 由於對於每個tuple都要發送一個ack消息。而且它須要更少的id來保存下游的tuple 減小帶寬佔用。
有三種方法能夠去掉可靠性。
第一是把Config.TOPOLOGY_ACKERS 設置成 0. 在這種狀況下, storm會在spout發射一個tuple以後立刻調用spoutack方法。也就是說這個tuple樹不會被跟蹤。
第二個方法是在tuple層面去掉可靠性。 你能夠在發射tuple的時候不指定messageid來達到不跟糉某個特定的spout tuple的目的。
最後一個方法是若是你對於一個tuple樹裏面的某一部分到底成不成功不是很關心,那麼能夠在發射這些tuple的時候unanchor它們。 這樣這些tuple就不在tuple樹裏面, 也就不會被跟蹤了。

可靠性配置

有三種方法能夠去掉消息的可靠性:

將參數Config.TOPOLOGY_ACKERS設置爲0,經過此方法,當Spout發送一個消息的時候,它的ack方法將馬上被調用;

Spout發送一個消息時,不指定此消息的messageID。當須要關閉特定消息可靠性的時候,可使用此方法;

最後,若是你不在乎某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要作錨定,即在emit方法中不指定輸入消息。由於這些子孫消息沒有被錨定在任何tuple tree中,所以他們的失敗不會引發任何spout從新發送消息。

如何關閉Ack機制

2種途徑

spout發送數據是不帶上msgid

設置acker數等於0

值得注意的一點是Storm調用Ack或者fail的task始終是產生這個tuple的那個task,因此若是一個Spout,被分爲不少個task來執行,消息執行的成功失敗與否始終會通知最開始發出tuple的那個task。

做爲Storm的使用者,有兩件事情要作以更好的利用Storm的可靠性特徵,首先你在生成一個tuple的時候要通知Storm,其次,徹底處理一個tuple以後要通知Storm,這樣Storm就能夠檢測到整個tuple樹有沒有完成處理,而且通知源Spout處理結果。

1 因爲對應的task掛掉了,一個tuple沒有被Ack:

Storm的超時機制在超時以後會把這個tuple標記爲失敗,從而能夠從新處理。

2 Acker掛掉了: 在這種狀況下,由這個Acker所跟蹤的全部spout tuple都會出現超時,也會被從新的處理。

3 Spout 掛掉了:在這種狀況下給Spout發送消息的消息源負責從新發送這些消息。

三個基本的機制,保證了Storm的徹底分佈式,可伸縮的而且高度容錯的。

另外Ack機制還經常使用於限流做用 爲了不spout發送數據太快,而bolt處理太慢,經常設置pending數,當spout有等於或超過pending數的tuple沒有收到ackfail響應時,跳過執行nextTuple, 從而限制spout發送數據。

經過conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);設置spout pend數。

相關文章
相關標籤/搜索