Storm tuple發送機制中的重發

#從一個程序異常提及 最近的一個項目走到線下測試階段,同事寫了一堆測試數據進Kafka,個人代碼負責經過KafkaSpout消費消息。結果出現一個很怪異的事情,對方每20秒寫10000條消息進Kafka,個人Spout卻讀到了這樣的消息:java

1. 20秒統計一次進入Spout的消息數量,第一個20秒正常,有10000條整。(由於寫入速度夠快,20秒足夠完成寫入和讀取工做)
2. 第二個20秒也正常,10000條消息整。
3. 第三個20秒就開始異常了,數據量在20000~30000之間。
4. 越日後數據量愈來愈多。

通過一頓排查終於發現問題出在Bolt沒有ack()應答上級Spout,致使Spout重發消息。ide

#BaseRichBolt,BaseBasicBolt,BaseBatchBolt,BaseTransactionalBolt的差異 可是隱隱然記得當初在某個地方還看過介紹說某個Bolt是不須要用戶手動ack的,可是這裏卻由於沒有現實調用ack而出錯,看來是Storm提供的多個Bolt實如今這方面是有不一樣點的。借這個機會,就來看看這些Bolt之間的異同點。 先來看看這幾個Bolt的繼承關係: 這裏寫圖片描述測試

首先可見BaseTransactionalBolt實際上是繼承自BaseBatchBolt的一個拓展,有更本質區別的應該是初前者以外剩下的三個類。他們都繼承自BaseComponent,並分別實現了IRichBoltIBasicBoltIBatchBolt接口。code

##IComponent BaseRichBolt,BaseBasicBolt,BaseBatchBolt三個抽象類直接繼承的BaseComponent其實並無作什麼事情。component

# BaseComponent.java
package backtype.storm.topology.base;

import backtype.storm.topology.IComponent;
import java.util.Map;

public abstract class BaseComponent implements IComponent {
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }    
}

而它所實現的IComponent核心則定義了兩個方法,一個是上面被BaseComponent實現了的getComponentConfiguration(),負責返回Component的配置參數。一個是須要實現類本身定義的declareOutputFields()。方法中聲明瞭該bolt/spout輸出的字段個數,供下游使用,在該bolt中的execute方法中,emit發射的字段個數必須和聲明的相同,不然報錯:orm

Tuple created with wrong number of fields. Expected 2 fields but got 1 fields。

當咱們使用fieldsGrouping(id,fields)方法來作tuple分發時,Fields的定義也要根絕這裏的fields聲明來,不然會報錯:對象

「Topology submission exception. (topology name='HelloStorm')
 <InvalidTopologyException InvalidTopologyException(msg:Component: [3] subscribes from
 stream: [default] of component [2] with non-existent fields: #{"ahah1"})>」

BaseComponent的定義很清晰簡單,可是幾個Bolt的差異卻主要體如今他們實現的IxxxBolt接口。繼承

##IBasicBolt vs IRichBolt 三個Bolt恰好實現了三個不一樣的IxxxBolt,而其中的IBasicBolt和IRichBolt都在backtype.storm.topology包內,可見他們兩個仍是比較相像的。 二者都有三個抽象方法:接口

/**
* 在Bolt啓動前執行,提供Bolt啓動環境配置的入口
*/
void prepare();
/**
* 每次調用處理一個輸入的tuple,固然,也能夠把tuple暫存起來批量處理。
* 可是!!!千萬注意,全部的tuple都必須在必定時間內應答,能夠是ack或者fail。不然,spout就會重發tuple。這也正是上文描述的意外事件的根本緣由。
* 兩個bolt不一樣的地方在於,`IBasicBolt`自動幫你ack,而`IRichBolt`須要你本身來作。
*/
void exexute();
/**
* 當組件關閉時被調用,可是當supervisor使用`kill -9`強制關閉worker進程時,不能保證這個方法必定會被執行。
*/
void cleanup();

##Spout重發策略的配置 如今咱們知道了默認狀況下,Spout若是在必定時間內沒有發出去的tuple對應的ack,就會觸發fail。那麼這一套策略是怎麼實現的呢?咱們又能夠作什麼配置呢? 且先看看用來發射tuple的SpoutOutputCollector,他的類圖很簡單: 這裏寫圖片描述隊列

主要提供瞭如下幾個方法:

public List<Integer> emit(String streamId, List<Object> tuple, Object messageId)
public List<Integer> emit(List<Object> tuple, Object messageId)
public List<Integer> emit(List<Object> tuple) 
public List<Integer> emit(String streamId, List<Object> tuple)

public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)
public void emitDirect(int taskId, List<Object> tuple, Object messageId)
public void emitDirect(int taskId, String streamId, List<Object> tuple) 
public void emitDirect(int taskId, List<Object> tuple) 

public void reportError(Throwable error)

咱們發送消息用的emit()在這裏提供了四種不一樣的實現方式,可是要注意的是,只有提供了messageId參數,Storm纔會追蹤這條消息是否發送成功。而當咱們一路追蹤KafkaSpout的tuple發射機制,會發現它的底層使用的是:

collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

即它用了本身自定義的一個對象(KafkaMessageId有兩個字段,一個是partition一個是offset,能夠定位消息以便重發)來做爲MessageId。

來到這裏,很天然的就會產生幾個疑問,每條消息的過時時間怎麼定呢?消息過時是否是所有都會被重發呢?重發後若是還不成功怎麼辦?這是另外一個話題了,這裏且不談,只對涉及到的幾個參數作一下交代:

topology.max.spout.pending		/*在一個spout task消息發送隊列中最多可保存的未ack或者fail的數量,默認爲null。若是超過這個值,storm便選擇再也不發送新數據,知道有消息ack或者fail,騰出隊列空間爲止。*/
topology.message.timeout.secs	/*消息過時時間,經過public void backtype.storm.Config.setMessageTimeoutSecs(int secs)設置。超時以後storm會調用fail()方法,能夠自定義重發或者別的動做。*/
相關文章
相關標籤/搜索