#從一個程序異常提及 最近的一個項目走到線下測試階段,同事寫了一堆測試數據進Kafka,個人代碼負責經過KafkaSpout消費消息。結果出現一個很怪異的事情,對方每20秒寫10000條消息進Kafka,個人Spout卻讀到了這樣的消息:java
1. 20秒統計一次進入Spout的消息數量,第一個20秒正常,有10000條整。(由於寫入速度夠快,20秒足夠完成寫入和讀取工做) 2. 第二個20秒也正常,10000條消息整。 3. 第三個20秒就開始異常了,數據量在20000~30000之間。 4. 越日後數據量愈來愈多。
通過一頓排查終於發現問題出在Bolt沒有ack()應答上級Spout,致使Spout重發消息。ide
#BaseRichBolt,BaseBasicBolt,BaseBatchBolt,BaseTransactionalBolt的差異 可是隱隱然記得當初在某個地方還看過介紹說某個Bolt是不須要用戶手動ack的,可是這裏卻由於沒有現實調用ack而出錯,看來是Storm提供的多個Bolt實如今這方面是有不一樣點的。借這個機會,就來看看這些Bolt之間的異同點。 先來看看這幾個Bolt的繼承關係: 測試
首先可見BaseTransactionalBolt
實際上是繼承自BaseBatchBolt
的一個拓展,有更本質區別的應該是初前者以外剩下的三個類。他們都繼承自BaseComponent
,並分別實現了IRichBolt
、IBasicBolt
和IBatchBolt
接口。code
##IComponent BaseRichBolt
,BaseBasicBolt
,BaseBatchBolt
三個抽象類直接繼承的BaseComponent
其實並無作什麼事情。component
# BaseComponent.java package backtype.storm.topology.base; import backtype.storm.topology.IComponent; import java.util.Map; public abstract class BaseComponent implements IComponent { @Override public Map<String, Object> getComponentConfiguration() { return null; } }
而它所實現的IComponent核心則定義了兩個方法,一個是上面被BaseComponent
實現了的getComponentConfiguration()
,負責返回Component的配置參數。一個是須要實現類本身定義的declareOutputFields()
。方法中聲明瞭該bolt/spout輸出的字段個數,供下游使用,在該bolt中的execute方法中,emit發射的字段個數必須和聲明的相同,不然報錯:orm
Tuple created with wrong number of fields. Expected 2 fields but got 1 fields。
當咱們使用fieldsGrouping(id,fields)方法來作tuple分發時,Fields的定義也要根絕這裏的fields聲明來,不然會報錯:對象
「Topology submission exception. (topology name='HelloStorm') <InvalidTopologyException InvalidTopologyException(msg:Component: [3] subscribes from stream: [default] of component [2] with non-existent fields: #{"ahah1"})>」
BaseComponent的定義很清晰簡單,可是幾個Bolt的差異卻主要體如今他們實現的IxxxBolt接口。繼承
##IBasicBolt vs IRichBolt 三個Bolt恰好實現了三個不一樣的IxxxBolt,而其中的IBasicBolt和IRichBolt都在backtype.storm.topology包內,可見他們兩個仍是比較相像的。 二者都有三個抽象方法:接口
/** * 在Bolt啓動前執行,提供Bolt啓動環境配置的入口 */ void prepare(); /** * 每次調用處理一個輸入的tuple,固然,也能夠把tuple暫存起來批量處理。 * 可是!!!千萬注意,全部的tuple都必須在必定時間內應答,能夠是ack或者fail。不然,spout就會重發tuple。這也正是上文描述的意外事件的根本緣由。 * 兩個bolt不一樣的地方在於,`IBasicBolt`自動幫你ack,而`IRichBolt`須要你本身來作。 */ void exexute(); /** * 當組件關閉時被調用,可是當supervisor使用`kill -9`強制關閉worker進程時,不能保證這個方法必定會被執行。 */ void cleanup();
##Spout重發策略的配置 如今咱們知道了默認狀況下,Spout若是在必定時間內沒有發出去的tuple對應的ack,就會觸發fail。那麼這一套策略是怎麼實現的呢?咱們又能夠作什麼配置呢? 且先看看用來發射tuple的SpoutOutputCollector
,他的類圖很簡單: 隊列
主要提供瞭如下幾個方法:
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) public List<Integer> emit(List<Object> tuple, Object messageId) public List<Integer> emit(List<Object> tuple) public List<Integer> emit(String streamId, List<Object> tuple) public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) public void emitDirect(int taskId, List<Object> tuple, Object messageId) public void emitDirect(int taskId, String streamId, List<Object> tuple) public void emitDirect(int taskId, List<Object> tuple) public void reportError(Throwable error)
咱們發送消息用的emit()在這裏提供了四種不一樣的實現方式,可是要注意的是,只有提供了messageId參數,Storm纔會追蹤這條消息是否發送成功。而當咱們一路追蹤KafkaSpout
的tuple發射機制,會發現它的底層使用的是:
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
即它用了本身自定義的一個對象(KafkaMessageId有兩個字段,一個是partition一個是offset,能夠定位消息以便重發)來做爲MessageId。
來到這裏,很天然的就會產生幾個疑問,每條消息的過時時間怎麼定呢?消息過時是否是所有都會被重發呢?重發後若是還不成功怎麼辦?這是另外一個話題了,這裏且不談,只對涉及到的幾個參數作一下交代:
topology.max.spout.pending /*在一個spout task消息發送隊列中最多可保存的未ack或者fail的數量,默認爲null。若是超過這個值,storm便選擇再也不發送新數據,知道有消息ack或者fail,騰出隊列空間爲止。*/ topology.message.timeout.secs /*消息過時時間,經過public void backtype.storm.Config.setMessageTimeoutSecs(int secs)設置。超時以後storm會調用fail()方法,能夠自定義重發或者別的動做。*/