Storm-編程入門


一 編程接口java

 

 

 



Spout 接口編程

 

Spout組件的實現能夠經過繼承BaseRichSpout類或者其餘*Spout類來完成,也能夠經過實現IRichSpout接口來實現。須要根據狀況實方法有:數組

 

open方法緩存

 

當一個Task被初始化的時候會調用此open方法。通常都會在此方法中對發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext初始化。示例以下:框架

 

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {   dom

          _collector = collector;  分佈式

}  函數

declareOutputFields方法ui

 

此方法用於聲明當前SpoutTuple發送流的域名字。Stream流的定義是經過OutputFieldsDeclare.declareStream方法完成的,其中參數爲域名。示例以下:google

 

public void declareOutputFields(OutputFieldsDeclarer declarer) {  

        declarer.declare(new Fields("word"));  

}

getComponentConfiguration方法

 

此方法定義在BaseComponent類內,用於聲明針對當前組件的特殊的Configuration配置。示例以下:

 

public Map<String, Object> getComponentConfiguration() {  

         if(!_isDistributed) {  

           Map<String, Object> ret = new HashMap<String, Object>();  

           ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3);  

           return ret;  

         } else {  

             return null;  

         }  

 }  

這裏即是設置了Topology中當前Component的線程數量上限。

 

nextTuple方法

 

這是Spout類中最重要的一個方法。發射一個TupleTopology都是經過這個方法來實現的。示例以下:

 

public void nextTuple() {  

     Utils.sleep(100);  

     final String[] words = new String[] {"twitter","facebook","google"};  

     final Random rand = new Random();  

     final String word = words[rand.nextInt(words.length)];  

     _collector.emit(new Values(word));  

}  

這裏即是從一個數組中隨機選取一個單詞做爲Tuple,而後經過_collector發送到Topology

 

另外,除了上述幾個方法以外,還有ackfailclose方法等。Storm在監測到一個Tuple被成功處理以後會調用ack方法,處理失敗會調用fail方法,這兩個方法在BaseRichSpout類中已經被隱式的實現了。

 

Bolts 接口

 

Bolt類接收由Spout或者其餘上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現能夠經過繼承BasicRichBolt類或者IRichBolt接口來完成。Bolt類須要實現的主要方法有:

 

prepare方法

 

此方法和Spout中的open方法相似,爲Bolt提供了OutputCollector,用來從Bolt中發送Tuple。示例以下:

 

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {         

         _collector = collector;  

注:BoltTuple的發送能夠在prepare方法中、execute方法中、cleanup等方法中進行,通常都是些在execute中。

 

declareOutputFields 方法

 

用於聲明當前Bolt發送的Tuple中包含的字段,和Spout中相似。示例以下:

 

public void declareOutputFields(OutputFieldsDeclarer declarer) {  

       declarer.declare(new Fields("obj", "count", "length"));  

}  

此例說明當前Bolt類發送的Tuple包含了三個字段:"obj", "count", "length"

 

getComponentConfiguration方法

 

Spout類同樣,在Bolt中也能夠有getComponentConfiguration方法。示例以下:

 

public Map<String, Object> getComponentConfiguration() {  

         Map<String, Object> conf = new HashMap<String, Object>();  

         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);  

         return conf;  

此例定義了從系統組件「_system」的「_tick」流中發送Tuple到當前Bolt的頻率,當系統須要每隔一段時間執行特定的處理時,就能夠利用這個系統的組件的特性來完成。

 

execute方法

 

這是Bolt中最關鍵的一個方法,對於Tuple的處理均可以放到此方法中進行。具體的發送也是經過emit方法來完成的。此時,有兩種狀況,一種是emit方法中有兩個參數,另外一個種是有一個參數。

 

(1) emit有一個參數:此惟一的參數是發送到下游BoltTuple,此時,由上游發來的舊的Tuple在此隔斷,新的Tuple和舊的Tuple再也不屬於同一棵Tuple樹。新的Tuple另起一個新的Tuple樹。

 

(2) emit有兩個參數:第一個參數是舊的Tuple的輸入流,第二個參數是發往下游Bolt的新的Tuple流。此時,新的Tuple和舊的Tuple是仍然屬於同一棵Tuple樹,即若是下游的Bolt處理Tuple失敗,則會向上傳遞到當前Bolt,當前Bolt根據舊的Tuple流繼續往上游傳遞,申請重發失敗的Tuple。保證Tuple處理的可靠性。

 

這兩種狀況要根據本身的場景來肯定。示例以下:

 

public void execute(Tuple tuple) {  

          _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));  

          _collector.ack(tuple);  

}  

注:輸入Tuple通常在最後一行被ack

 

public void execute(Tuple tuple) {  

        _collector.emit(new Values(tuple.getString(0) + "!!!"));  

 } 

此外還有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法相似,都是在當前Component關閉時調用,可是針對實時計算來講,除非一些特殊的場景要求之外,這兩個方法通常都不多用到。

 

注:cleanup方法在bolt被關閉的時候調用, 它應該清理全部被打開的資源。可是集羣不保證這個方法必定會被執行。好比執行task的機器down掉了,那麼根本就沒有辦法來調用那個方法。cleanup設計的時候是被用來在local mode的時候才被調用(也就是說在一個進程裏面模擬整個storm集羣), 而且你想在關閉一些topology的時候避免資源泄漏。

 

有幾點須要說明的地方:

 

1.每一個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。

 

2.open方法、prepare方法的調用是屢次的。入口函數中設定的setSpout或者setBolt裏的並行度參數指的是executor的數目,即負責運行組件中的task的線程的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每一個executor運行的時候調用一次。至關於一個線程的構造方法。

 

3.nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射TupleBoltexecute不斷的接收Tuple進行處理。只有這樣不斷地運行,纔會產生無界的Tuple流,體現實時性。

 

4.在提交了一個topology以後,Storm就會建立spout/bolt實例並進行序列化。以後,將序列化的component發送給全部的任務所在的機器(Supervisor節點),在每個任務上反序列化component

 

5. SpoutBolt之間、BoltBolt之間的通訊,是經過zeroMQ的消息隊列實現的。

 

二 做業的提交

 

下面的代碼展現了以本地運行方式提交一個Topology做業

 

//Topology definition  

   TopologyBuilder builder = new TopologyBuilder();  

   builder.setSpout("word-reader",new WordReader());  

   builder.setBolt("word-normalizer", new WordNormalizer())  

     .shuffleGrouping("word-reader");  

   builder.setBolt("word-counter", new WordCount(),1)  

     .fieldsGrouping("word-normalizer", new Fields("word"));  

  

  //Configuration  

    Config conf = new Config();  

   conf.put("wordsFile", args[0]);  

   conf.setDebug(true);  

  

  //Topology run  

   conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);  

   LocalCluster cluster = new LocalCluster();  

   cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());  

   Thread.sleep(2000);  

   cluster.shutdown(); 

此例中的builderTopologyBuilder對象,經過它的createTopology方法能夠建立一個Topology對象,同時此builder還要定義當前Topology中用到的SpoutBolt對象,分別經過setSpout方法和setBolt方法來完成。

 

setSpout方法和setBolt方法中的第一個參數是當前的Component組件的StreamID號;第二個參數是具體的Component實現類的構造;第三個參數是當前Component的並行執行的線程數目,Storm會根據這個數字的累加和來肯定TopologyTask數目。

 

經過一個LocalCluster對象來定義一個進程內的集羣。提交topology給這個虛擬的集羣和提交topology給分佈式集羣是同樣的。經過調用submitTopology方法來提交topology, 它接受三個參數:要運行的topology的名字,一個配置對象以及要運行的topology自己。

 

下面對workerexecutor以及task作一下說明:

 

worker:每一個worker都屬於一個特定的Topology,每一個Supervisor節點的worker能夠有多個,每一個worker使用一個單獨的端口,它對Topology中的每一個component運行一個或者多個executor線程來提供task的運行服務。其數目能夠經過設置yaml中的topology.workers屬性以及在代碼中經過ConfigsetNumWorkers方法設定。

 

executor:產生於worker進程內部的線程,會執行同一個component的一個或者多個task。 其數目能夠在Topology的入口類中setBoltsetSpout方法的最後一個參數指定,不指定的話,默認爲1

 

task:實際的數據處理由task完成,在Topology的生命週期中,每一個組件的task數目是不會發生變化的,而executor的數目卻不必定。executor數目小於等於task的數目,默認狀況下,兩者是相等的。在代碼中經過TopologyBuildersetNumTasks方法設定具體某個組件的task數目;

 

有幾點須要說明的地方:

 

1.Storm提交後,會把代碼首先存放到Nimbus節點的inbox目錄下,以後,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化以後的Topology代碼文件;

 

2.在設定Topology所關聯的SpoutsBolts時,能夠同時設置當前SpoutBoltexecutor數目和task數目,默認狀況下,一個Topologytask的總和是和executor的總和一致的。以後,系統根據worker的數目,儘可能平均的分配這些task的執行。worker在哪一個supervisor節點上運行是由storm自己決定的;

 

3. 任務分配好以後,Nimbes節點會將任務的信息提交到zookeeper集羣,同時在zookeeper集羣中會有workerbeats節點,這裏存儲了當前Topology的全部worker進程的心跳信息;

 

4. Supervisor節點會不斷的輪詢zookeeper集羣,在zookeeperassignments節點中保存了全部Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關係等,Supervisor經過輪詢此節點的內容,來領取本身的任務,啓動worker進程運行;

 

5.一個Topology運行以後,就會不斷的經過Spouts來發送Stream流,經過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。最後一步會不間斷的執行,除非手動結束Topology

 

6.經過在Nimbus節點利用以下命令來終止一個Topology的運行:storm kill topologyName kill以後,能夠經過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集羣中的和當前Topology相關的信息以後,此Topology就會完全消失了。

 

三 分組策略

 

1.shuffleGrouping 隨機分組

 

builder.setBolt("word-normalizer", new WordNormalizer())

           .shuffleGrouping("word-reader");

它只有一個參數(數據源組件),而且數據源會向隨機選擇的bolt發送元組,保證每一個消費者收到近似數量的元組。

 

2.fieldsGrouping 域數據流組

 

builder.setBolt("word-counter", new WordCounter(),2)

           .fieldsGrouping("word-normalizer", new Fields("word"));

域數據流組容許你基於元組的一個或多個域控制如何把元組發送給bolts。它保證擁有相同域組合的值集發送給同一個bolt

 

在域數據流組中的全部域集合必須存在於數據源的域聲明中

 

3.allGrouping 所有數據流組

 

builder.setBolt("word-counter", new WordCounter(),2)

           .fieldsGroupint("word-normalizer",new Fields("word"))

           .allGrouping("signals-spout","signals");

所有數據流組,爲每一個接收數據的實例複製一份元組副本。這種分組方式用於向全部bolts發送信號。好比,你要刷新緩存,你能夠向全部的bolts發送一個刷新緩存信號。

 

4.customGrouping 自定義數據流組

 

builder.setBolt("word-normalizer", new WordNormalizer())

           .customGrouping("word-reader", new ModuleGrouping());

5.directGrouping 直接數據流組

 

builder.setBolt("word-counter", new WordCounter(),2)

           .directGrouping("word-normalizer");

這是一個特殊的數據流組,數據源能夠用它決定哪一個組件接收元組。與前面的例子相似,數據源將根據單詞首字母決定由哪一個bolt接收元組。

 

6.全局數據流組

 

全局數據流組把全部數據源建立的元組發送給單一目標實例(即擁有最低ID的任務)。

 

四 配置選項

 

在運行Topology以前,能夠經過一些參數的配置來調節運行時的狀態,參數的配置是經過Storm框架部署目錄下的conf/storm.yaml文件來完成的。在此文件中能夠配置運行時的Storm本地目錄路徑、運行時Worker的數目等。

 

在代碼中,也能夠設置Config的一些參數,可是優先級是不一樣的,不一樣位置配置Config參數的優先級順序爲:

 

default.yaml < storm.yaml <Topology內部的configuration <內部組件的special configuration < 外部組件的special configuration

 

storm.yaml中經常使用的幾個選項爲:

相關文章
相關標籤/搜索