對於spout,有ISpout,IRichSpout,BaseRichSpout數據庫
對於bolt,有IBolt,IRichBolt,BaseRichBolt,IBasicBolt,BaseBasicBoltapi
IBasicBolt,BaseBasicBolt不用每次execute完成都寫ack/fail,由於已經幫你實現好了。函數
做爲storm的使用者,有兩件事情要作以更好的利用storm的可靠性特徵。 首先,在你生成一個新的tuple的時候要通知storm; 其次,完成處理一個tuple以後要通知storm。 這樣storm就能夠檢測整個tuple樹有沒有完成處理,而且通知源spout處理結果。storm提供了一些簡潔的api來作這些事情。
由一個tuple產生一個新的tuple稱爲: anchoring。你發射一個新tuple的同時也就完成了一次anchoring。看下面這個例子: 這個bolt把一個包含一個句子的tuple分割成每一個單詞一個tuple。
ui
public class SplitSentence implements IRichBolt { orm
Output Collector _collector; blog
public void prepare(Map conf, 內存
TopologyContext context, get
OutputCollector collector) { it
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple,newValues(word));
}
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
咱們經過anchoring來構造這個tuple樹,最後一件要作的事情是在你處理完當個tuple的時候告訴storm, 經過OutputCollector類的ack和fail方法來作,若是你回過頭來看看SplitSentence的例子, 你能夠看到「句子tuple」在全部「單詞tuple」被髮出以後調用了ack。
你能夠調用OutputCollector 的fail方法去當即將從消息源頭髮出的那個tuple標記爲fail, 好比你查詢了數據庫,發現一個錯誤,你能夠立刻fail那個輸入tuple, 這樣可讓這個tuple被快速的從新處理, 由於你不須要等那個timeout時間來讓它自動fail。
每一個你處理的tuple, 必須被ack或者fail。由於storm追蹤每一個tuple要佔用內存。因此若是你不ack/fail每個tuple, 那麼最終你會看到OutOfMemory錯誤。
大多數Bolt遵循這樣的規律:讀取一個tuple;發射一些新的tuple;在execute的結束的時候ack這個tuple。這些Bolt每每是一些過濾器或者簡單函數。Storm爲這類規律封裝了一個BasicBolt類。若是用BasicBolt來作, 上面那個SplitSentence能夠改寫成這樣:
public class SplitSentence implements IBasicBolt {
public void prepare(Map conf,
TopologyContext context) {
}
public void execute(Tuple tuple,
BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(newValues(word));
}
}
public void cleanup() {
}
public void declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
這個實現比以前的實現簡單多了, 可是功能上是同樣的。
發送到BasicOutputCollector的tuple會自動和輸入tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack的。
咱們編寫的時候使用IBasicBolt最方便了。或者 extends BaseBasicBolt類