Bolt是Topology中的數據處理的單元,也是Storm針對處理過程的編程單元。Topology中全部的處理都是在這些Bolt中完成的,編程人員能夠實現自定義的處理過程,例如,過濾、函數、彙集、鏈接等計算。若是是複雜的計算過程,每每須要多個步驟和使用多個Bolt。java
Bolt能夠將數據項發送至多個數據流(Stream)。編程人員首先可使用OutputFieldsDeclarer類的declareStream()方法來聲明多個流,指定數據將要發送到的流,而後使用SpoutOutputCollector的emit方法將數據發送。編程
當聲明瞭一個Bolt的輸入流後,能夠從其餘的組件中接收這些指定的流。當接收某個組件的全部流時,須要在程序中逐個聲明接收的過程。InputDeclarer對象默認接收來自某組件默認的流。函數
//從名稱爲"1"的組件中接收默認的流。 declarer.shuffleGrouping("1")
IBolt 和 IComponent接口ui
IBolt接口:this
//在組件的任務初被初始化時,由集羣中的工做進程(worker)調用,prepare()用於實例化Bolt的已給運行時任務,被集羣中的某一個進程調用,提供Bolt運行的環境。
//sormConf對象維護Storm中針對該Bolt的配置信息。(來自Topology);context對象是一個上下文對象,用於獲取該組件運行時任務的信息。(例如Topology中該Bolt全部任務的位置,包括任務的id、組件id和輸入輸出信息等)
//collector對象用於從該Bolt發送數據項。數據項能夠在任意時刻發送,包括調用open()和close()方法。
void prepare(java.util.Map stormConf,TopologyContext context,OutputCollector collector) //接收一個數據項並處理
//該方法用來接收一個數據項(Tuple),並能夠將處理的結果做爲新的數據項發送(emit),是Bolt須要實現的最重要的方法。
//參數imput是一個數據項對象,包含了衆多的元數據(metadata),包括它來自的組件、流、任務等。數據項中的值,能夠經過Tuple類的getValue()方法得到。
void execute(Tuple input) //在IBolt將關閉時調用 void cleanup()
Tuple類的方法,這個類的對象做爲execute()方法的輸入。(方法舉例: int size() ; int fieldIndex(java.lang.String field) ; ......)spa
方法衆多,能夠整理分爲如下五類:code
一、獲取屬性的方法。 (size()、fieldIndex()和contains()三個方法)orm
二、獲取元數據的方法。(getMessageId()、getSourceComponent()、getSourceTask()、getSourceStreamId()和getSourceGlobalStreamid()方法)對象
其中MessageId是在數據項被建立時,經過必定的規則賦值的。blog
三、根據域獲取值的方法。(getValue()和多個get具體數據類型的方法)
四、根據域的名稱獲取值的方法。(這一類包括getFields()、getValues()和select()方法)
五、獲取Tuple的值或域列表的方法。(getFields()、getValues()和select()方法)
分別獲取該數據項的全部域列表、值列表和值列表子集。
簡單的案例:
class SplitSentence implements IRichBolt { private OutputCollector collector; public void prepare(Map conf,TopologyContext context,OutputCollector collector){ this.collector = collector; } public void execute(Tuple tuple){ String sentence = tuple.getString(0); for(String word : sentence.split(" "){ collector.emit(new Values(word)); } } public void cleanup(){ } public void declareOutputFields(OutpuFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } }
這裏說下declareOutputFields()函數參數,聲明瞭輸出流的數據項的結構,也即Tuple的域。
結合上節給的Spout的示例,能夠在Topology類的main函數中加入相關代碼,增長Bolt。
Topology builder builder = new TopologyBuilder(); Builder.SetSpout ("SentenceGenSpout ",new TestWord Spout(),1); builder.setBolt("splitBoult",new SplitSentence(),2).shuffleGrouping("sentenceGenSpout");