storm spout和bolt java api

Componentjava

Storm中,Spout和Bolt都是其Component。因此,Storm定義了一個名叫IComponent的總接口web

圖譜以下:dom

綠色部分是咱們最經常使用、比較簡單的部分。紅色部分是與事務相關的,在之後的文章會具體講解。ide

BaseComponent 是Storm提供的「偷懶」的類。爲何這麼說呢,它及其子類,都或多或少實現了其接口定義的部分方法。這樣咱們在用的時候,能夠直接繼承該類,而不是本身每次都寫全部的方法。但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null。ui

Spoutthis

在前面基本例子中,咱們實現了一個RandomSpout,來看看其類圖spa

  • Spout的最頂層抽象是ISpout接口。code

open方法是初始化動做。容許你在該spout初始化時作一些動做,傳入了上下文,方便取上下文的一些數據。orm

close方法在該spout關閉前執行,可是並不能獲得保證其必定被執行。spout是做爲task運行在worker內,在cluster模式下,supervisor會直接kill -9 woker的進程,這樣它就沒法執行了。而在本地模式下,只要不是kill -9, 若是是發送中止命令,是能夠保證close的執行的。繼承

activate和deactivate :一個spout能夠被暫時激活和關閉,這兩個方法分別在對應的時刻被調用。

nextTuple 用來發射數據。

ack(Object)

傳入的Object實際上是一個id,惟一表示一個tuple。該方法是這個id所對應的tuple被成功處理後執行。

fail(Object)

同ack,只不過是tuple處理失敗時執行。

咱們的RandomSpout 因爲繼承了BaseRichSpout,因此不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。

結論:
一般狀況下(Shell和事務型的除外),實現一個Spout,能夠直接實現接口IRichSpout,若是不想寫多餘的代碼,能夠直接繼承BaseRichSpout。

Bolt

 ExclaimBasicBolt的類圖: 

這裏能夠看到一個奇怪的問題:
爲何IBasicBolt並無繼承IBolt?
咱們帶着問題往下看。

IBolt定義了三個方法:

IBolt繼承了java.io.Serializable,咱們在nimbus上提交了topology之後,建立出來的bolt會序列化後發送到具體執行的worker上去。worker在執行該Bolt時,會先調用prepare方法傳入當前執行的上下文
execute接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果
cleanup 同ISpout的close方法,在關閉前調用。一樣不保證其必定執行。
紅色部分是Bolt實現時必定要注意的地方。而Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。
若是你確實要反饋失敗,能夠拋出FailedException。

咱們來再寫一個Bolt繼承BaseRichBolt替代ExclaimBasicBolt。代碼以下:

public class ExclaimRichBolt extends BaseRichBolt {
 
     private OutputCollector collector;
     
     @Override
     public void prepare(Map stormConf, TopologyContext context,
             OutputCollector collector) {
         this .collector = collector;
     }
 
     @Override
     public void execute(Tuple tuple) {
         this .collector.emit(tuple, new Values(tuple.getString( 0 )+ "!" ));
         this .collector.ack(tuple);
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare( new Fields( "after_excl" ));
     }
 
}

修改topology

//builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
builder.setBolt( "exclaim" , new ExclaimRichBolt(), 2 ).shuffleGrouping( "spout" );

運行下,結果一致。

結論:

一般狀況下,實現一個Bolt,能夠實現IRichBolt接口或繼承BaseRichBolt,若是不想本身處理結果反饋,能夠實現IBasicBolt接口或繼承BaseBasicBolt,它實際上至關於自動作掉了prepare方法和collector.emit.ack(inputTuple);

相關文章
相關標籤/搜索