IRichBolt和IBasicBolt/BaseBasicBolt對比

對於spout,有ISpout,IRichSpout,BaseRichSpout數據庫

對於bolt,有IBolt,IRichBolt,BaseRichBolt,IBasicBolt,BaseBasicBoltapi

IBasicBolt,BaseBasicBolt不用每次execute完成都寫ack/fail,由於已經幫你實現好了。函數


 做爲storm的使用者,有兩件事情要作以更好的利用storm的可靠性特徵。 首先,在你生成一個新的tuple的時候要通知storm; 其次,完成處理一個tuple以後要通知storm。 這樣storm就能夠檢測整個tuple樹有沒有完成處理,而且通知源spout處理結果。storm提供了一些簡潔的api來作這些事情。

由一個tuple產生一個新的tuple稱爲: anchoring。你發射一個新tuple的同時也就完成了一次anchoring。看下面這個例子: 這個bolt把一個包含一個句子的tuple分割成每一個單詞一個tuple。

   ui

Java代碼  收藏代碼spa

  1. public class SplitSentence implements IRichBolt {   orm

  2.             Output Collector _collector;   blog

  3.         

  4.             public void prepare(Map conf,   內存

  5.                                 TopologyContext context,   get

  6.                                 OutputCollector collector) {   it

  7.                 _collector = collector;   

  8.             }   

  9.         

  10.             public void execute(Tuple tuple) {   

  11.                 String sentence = tuple.getString(0);   

  12.                 for(String word: sentence.split(" ")) {   

  13.                     _collector.emit(tuple,newValues(word));   

  14.                 }   

  15.                 _collector.ack(tuple);   

  16.             }   

  17.         

  18.             public void cleanup() {   

  19.             }   

  20.         

  21.             public void declareOutputFields(OutputFieldsDeclarer declarer) {   

  22.                 declarer.declare(newFields("word"));   

  23.             }   

  24.         }    

 

 咱們經過anchoring來構造這個tuple樹,最後一件要作的事情是在你處理完當個tuple的時候告訴storm,  經過OutputCollector類的ack和fail方法來作,若是你回過頭來看看SplitSentence的例子, 你能夠看到「句子tuple」在全部「單詞tuple」被髮出以後調用了ack。

你能夠調用OutputCollector 的fail方法去當即將從消息源頭髮出的那個tuple標記爲fail, 好比你查詢了數據庫,發現一個錯誤,你能夠立刻fail那個輸入tuple, 這樣可讓這個tuple被快速的從新處理, 由於你不須要等那個timeout時間來讓它自動fail。

每一個你處理的tuple, 必須被ack或者fail。由於storm追蹤每一個tuple要佔用內存。因此若是你不ack/fail每個tuple, 那麼最終你會看到OutOfMemory錯誤。

大多數Bolt遵循這樣的規律:讀取一個tuple;發射一些新的tuple;在execute的結束的時候ack這個tuple。這些Bolt每每是一些過濾器或者簡單函數。Storm爲這類規律封裝了一個BasicBolt類。若是用BasicBolt來作, 上面那個SplitSentence能夠改寫成這樣:

   

Java代碼  收藏代碼

  1. public class SplitSentence implements IBasicBolt {   

  2.             public void prepare(Map conf,   

  3.                                 TopologyContext context) {   

  4.             }   

  5.         

  6.             public void execute(Tuple tuple,   

  7.                                 BasicOutputCollector collector) {   

  8.                 String sentence = tuple.getString(0);   

  9.                 for(String word: sentence.split(" ")) {   

  10.                     collector.emit(newValues(word));   

  11.                 }   

  12.             }   

  13.         

  14.             public void cleanup() {   

  15.             }   

  16.         

  17.             public void declareOutputFields(   

  18.                             OutputFieldsDeclarer declarer) {   

  19.                 declarer.declare(newFields("word"));   

  20.             }   

  21.         }    

 
這個實現比以前的實現簡單多了, 可是功能上是同樣的。

發送到BasicOutputCollector的tuple會自動和輸入tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack的。
咱們編寫的時候使用IBasicBolt最方便了。或者 extends BaseBasicBolt類


IRichBolt和IBasicBolt對比

相關文章
相關標籤/搜索