5、Storm入門之Bolt

正如你已經看到的,bolts是一個Storm集羣中的關鍵組件。你將在這一章學到bolt生命週期,一些bolt設計策略,以及幾個有關這些內容的例子。java

Bolt生命週期

Bolt是這樣一種組件,它把元組做爲輸入,而後產生新的元組做爲輸出。實現一個bolt時,一般須要實現IRichBolt接口。Bolts對象由客戶端機器建立,序列化爲拓撲,並提交給集羣中的主機。而後集羣啓動工人進程反序列化bolt,調用prepare,最後開始處理元組。

NOTE:要建立一個bolt對象,它經過構造器參數初始化成員屬性,bolt被提交到集羣時,這些屬性值會隨着一塊兒序列化。web

Bolt結構

Bolts擁有以下方法:ui

declareOutputFields(OutputFieldsDeclarer declarer)
{
    爲bolt聲明輸出模式
}
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
{

   僅在bolt開始處理元組以前調用
}
execute(Tuple input){
處理輸入的單個元組
}
cleanup(){
   在bolt即將關閉時調用
}

下面看一個例子,在這個例子中bolt把一句話分割成單詞列表:this

class SplitSentence implements IRichBolt {

   private OutputCollector collector;

   publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {

       this.collector = collector;

   }

   public void execute(Tuple tuple) {

       String sentence = tuple.getString(0);

       for(String word : sentence.split(" ")) {

           collector.emit(new Values(word));

       }

   }

   public void cleanup(){}

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields("word"));

   }}

正如你所看到的,這是一個很簡單的bolt。值得一提的是在這個例子裏,沒有消息擔保。這就意味着,若是bolt由於某些緣由丟棄了一些消息——不管是由於bolt掛了,仍是由於程序故意丟棄的——生成這條消息的spout不會收到任何通知,任何其它的spoutsbolts也不會收到。spa

然而在許多狀況下,你想確保消息在整個拓撲範圍內都被處理過了。設計

可靠的bolts和不可靠的bolts

正如前面所說的,Storm保證經過spout發送的每條消息會獲得全部bolt的全面處理。基於設計上的考慮,這意味着你要本身決定你的bolts是否保證這一點。code

拓撲是一個樹型結構,消息(元組)穿過其中一條或多條分支。樹上的每一個節點都會調用ack(tuple)fail(tuple),Storm所以知道一條消息是否失敗了,並通知那個/那些製造了這些消息的spout(s)。既然一個Storm拓撲運行在高度並行化的環境裏,跟蹤始發spout實例的最好方法就是在消息元組內包含一個始發spout引用。這一技巧稱作錨定(譯者注:原文爲Anchoring)。修改一下剛剛講過的SplitSentence,使它可以確保消息都被處理了。orm

class SplitSentence implenents IRichBolt {

   private OutputCollector collector;

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

       this.collector = collector;

   }

   public void execute(Tuple tuple) {

       String sentence = tuple.getString(0);

       for(String word : sentence.split(" ")) {

           collector.emit(tuple, new Values(word));

       }

       collector.ack(tuple);

   }

   public void cleanup(){}

   public void declareOutputFields(OutputFieldsDeclarer declarer){

       declar.declare(new Fields("word"));

   }}

錨定發生在調用collector.emit()時。正如前面提到的,Storm能夠沿着元組追蹤到始發spoutcollector.ack(tuple)collector.fail(tuple)會告知spout每條消息都發生了什麼。當樹上的每條消息都已被處理了,Storm就認爲來自spout的元組被全面的處理了。若是一個元組沒有在設置的超時時間內完成對消息樹的處理,就認爲這個元組處理失敗。默認超時時間爲30秒。對象

NOTE:你能夠經過修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓撲的超時時間。接口

固然了spout須要考慮消息的失敗狀況,並相應的重試或丟棄消息。

NOTE:你處理的每條消息要麼是確認的(譯者注:collector.ack())要麼是失敗的(譯者注:collector.fail())。Storm使用內存跟蹤每一個元組,因此若是你不調用這兩個方法,該任務最終將耗盡內存。

多數據流

一個bolt可使用emit(streamId, tuple)把元組分發到多個流,其中參數streamId是一個用來標識流的字符串。而後,你能夠在TopologyBuilder決定由哪一個流訂閱它。

多錨定

爲了用bolt鏈接或聚合數據流,你須要藉助內存緩衝元組。爲了在這一場景下確保消息完成,你不得不把流錨定到多個元組上。能夠向emit方法傳入一個元組列表來達成目的。

...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...

經過這種方式,bolt在任意時刻調用ackfail方法,都會通知消息樹,並且因爲流錨定了多個元組,全部相關的spout都會收到通知。

使用IBasicBolt自動確認

你可能已經注意到了,在許多狀況下都須要消息確認。簡單起見,Storm提供了另外一個用來實現bolt的接口,IBasicBolt。對於該接口的實現類的對象,會在執行execute方法以後自動調用ack方法。

class SplitSentence extends BaseBasicBolt {

   public void execute(Tuple tuple, BasicOutputCollector collector) {

       String sentence = tuple.getString(0);

       for(String word : sentence.split(" ")) {

           collector.emit(new Values(word));

       }}

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields("word"));

   }
 }

NOTE:分發消息的BasicOutputCollector自動錨定到做爲參數傳入的元組。

相關文章
相關標籤/搜索