Storm的數據處理編程單元:Bolt 學習整理

  Bolt是Topology中的數據處理的單元,也是Storm針對處理過程的編程單元。Topology中全部的處理都是在這些Bolt中完成的,編程人員能夠實現自定義的處理過程,例如,過濾、函數、彙集、鏈接等計算。若是是複雜的計算過程,每每須要多個步驟和使用多個Bolt。java

  Bolt能夠將數據項發送至多個數據流(Stream)。編程人員首先可使用OutputFieldsDeclarer類的declareStream()方法來聲明多個流,指定數據將要發送到的流,而後使用SpoutOutputCollector的emit方法將數據發送。編程

  當聲明瞭一個Bolt的輸入流後,能夠從其餘的組件中接收這些指定的流。當接收某個組件的全部流時,須要在程序中逐個聲明接收的過程。InputDeclarer對象默認接收來自某組件默認的流。函數

//從名稱爲"1"的組件中接收默認的流。
declarer.shuffleGrouping("1")

 

IBolt 和 IComponent接口ui

IBolt接口:this

//在組件的任務初被初始化時,由集羣中的工做進程(worker)調用,prepare()用於實例化Bolt的已給運行時任務,被集羣中的某一個進程調用,提供Bolt運行的環境。
//sormConf對象維護Storm中針對該Bolt的配置信息。(來自Topology);context對象是一個上下文對象,用於獲取該組件運行時任務的信息。(例如Topology中該Bolt全部任務的位置,包括任務的id、組件id和輸入輸出信息等)
//collector對象用於從該Bolt發送數據項。數據項能夠在任意時刻發送,包括調用open()和close()方法。
void prepare(java.util.Map stormConf,TopologyContext context,OutputCollector collector) //接收一個數據項並處理
//該方法用來接收一個數據項(Tuple),並能夠將處理的結果做爲新的數據項發送(emit),是Bolt須要實現的最重要的方法。
//參數imput是一個數據項對象,包含了衆多的元數據(metadata),包括它來自的組件、流、任務等。數據項中的值,能夠經過Tuple類的getValue()方法得到。
void execute(Tuple input) //在IBolt將關閉時調用 void cleanup()

Tuple類的方法,這個類的對象做爲execute()方法的輸入。(方法舉例: int size() ; int fieldIndex(java.lang.String field) ; ......)spa

方法衆多,能夠整理分爲如下五類:code

一、獲取屬性的方法。 (size()、fieldIndex()和contains()三個方法)orm

二、獲取元數據的方法。(getMessageId()、getSourceComponent()、getSourceTask()、getSourceStreamId()和getSourceGlobalStreamid()方法)對象

其中MessageId是在數據項被建立時,經過必定的規則賦值的。blog

三、根據域獲取值的方法。(getValue()和多個get具體數據類型的方法)

四、根據域的名稱獲取值的方法。(這一類包括getFields()、getValues()和select()方法)

五、獲取Tuple的值或域列表的方法。(getFields()、getValues()和select()方法)

分別獲取該數據項的全部域列表、值列表和值列表子集。

 

簡單的案例:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf,TopologyContext context,OutputCollector collector){
    this.collector = collector;
    }
    
    public void execute(Tuple tuple){
    String sentence = tuple.getString(0);
    for(String word : sentence.split(" "){
        collector.emit(new Values(word));
    }
}

public void cleanup(){
}

public void declareOutputFields(OutpuFieldsDeclarer declarer){
    declarer.declare(new Fields("word"));
    }
}

這裏說下declareOutputFields()函數參數,聲明瞭輸出流的數據項的結構,也即Tuple的域。

結合上節給的Spout的示例,能夠在Topology類的main函數中加入相關代碼,增長Bolt。

Topology builder builder = new TopologyBuilder();
Builder.SetSpout ("SentenceGenSpout ",new TestWord Spout(),1);
builder.setBolt("splitBoult",new SplitSentence(),2).shuffleGrouping("sentenceGenSpout");
相關文章
相關標籤/搜索