一 編程接口java
Spout 接口編程
Spout組件的實現能夠經過繼承BaseRichSpout類或者其餘*Spout類來完成,也能夠經過實現IRichSpout接口來實現。須要根據狀況實方法有:數組
open方法緩存
當一個Task被初始化的時候會調用此open方法。通常都會在此方法中對發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext初始化。示例以下:框架
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { dom
_collector = collector; 分佈式
} 函數
declareOutputFields方法ui
此方法用於聲明當前Spout的Tuple發送流的域名字。Stream流的定義是經過OutputFieldsDeclare.declareStream方法完成的,其中參數爲域名。示例以下:google
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
getComponentConfiguration方法
此方法定義在BaseComponent類內,用於聲明針對當前組件的特殊的Configuration配置。示例以下:
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3);
return ret;
} else {
return null;
}
}
這裏即是設置了Topology中當前Component的線程數量上限。
nextTuple方法
這是Spout類中最重要的一個方法。發射一個Tuple到Topology都是經過這個方法來實現的。示例以下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"twitter","facebook","google"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
這裏即是從一個數組中隨機選取一個單詞做爲Tuple,而後經過_collector發送到Topology。
另外,除了上述幾個方法以外,還有ack、fail和close方法等。Storm在監測到一個Tuple被成功處理以後會調用ack方法,處理失敗會調用fail方法,這兩個方法在BaseRichSpout類中已經被隱式的實現了。
Bolts 接口
Bolt類接收由Spout或者其餘上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現能夠經過繼承BasicRichBolt類或者IRichBolt接口來完成。Bolt類須要實現的主要方法有:
prepare方法
此方法和Spout中的open方法相似,爲Bolt提供了OutputCollector,用來從Bolt中發送Tuple。示例以下:
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
注:Bolt中Tuple的發送能夠在prepare方法中、execute方法中、cleanup等方法中進行,通常都是些在execute中。
declareOutputFields 方法
用於聲明當前Bolt發送的Tuple中包含的字段,和Spout中相似。示例以下:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("obj", "count", "length"));
}
此例說明當前Bolt類發送的Tuple包含了三個字段:"obj", "count", "length"。
getComponentConfiguration方法
和Spout類同樣,在Bolt中也能夠有getComponentConfiguration方法。示例以下:
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
此例定義了從系統組件「_system」的「_tick」流中發送Tuple到當前Bolt的頻率,當系統須要每隔一段時間執行特定的處理時,就能夠利用這個系統的組件的特性來完成。
execute方法
這是Bolt中最關鍵的一個方法,對於Tuple的處理均可以放到此方法中進行。具體的發送也是經過emit方法來完成的。此時,有兩種狀況,一種是emit方法中有兩個參數,另外一個種是有一個參數。
(1) emit有一個參數:此惟一的參數是發送到下游Bolt的Tuple,此時,由上游發來的舊的Tuple在此隔斷,新的Tuple和舊的Tuple再也不屬於同一棵Tuple樹。新的Tuple另起一個新的Tuple樹。
(2) emit有兩個參數:第一個參數是舊的Tuple的輸入流,第二個參數是發往下游Bolt的新的Tuple流。此時,新的Tuple和舊的Tuple是仍然屬於同一棵Tuple樹,即若是下游的Bolt處理Tuple失敗,則會向上傳遞到當前Bolt,當前Bolt根據舊的Tuple流繼續往上游傳遞,申請重發失敗的Tuple。保證Tuple處理的可靠性。
這兩種狀況要根據本身的場景來肯定。示例以下:
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
注:輸入Tuple通常在最後一行被ack
public void execute(Tuple tuple) {
_collector.emit(new Values(tuple.getString(0) + "!!!"));
}
此外還有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法相似,都是在當前Component關閉時調用,可是針對實時計算來講,除非一些特殊的場景要求之外,這兩個方法通常都不多用到。
注:cleanup方法在bolt被關閉的時候調用, 它應該清理全部被打開的資源。可是集羣不保證這個方法必定會被執行。好比執行task的機器down掉了,那麼根本就沒有辦法來調用那個方法。cleanup設計的時候是被用來在local mode的時候才被調用(也就是說在一個進程裏面模擬整個storm集羣), 而且你想在關閉一些topology的時候避免資源泄漏。
有幾點須要說明的地方:
1.每一個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
2.open方法、prepare方法的調用是屢次的。入口函數中設定的setSpout或者setBolt裏的並行度參數指的是executor的數目,即負責運行組件中的task的線程的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每一個executor運行的時候調用一次。至關於一個線程的構造方法。
3.nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。只有這樣不斷地運行,纔會產生無界的Tuple流,體現實時性。
4.在提交了一個topology以後,Storm就會建立spout/bolt實例並進行序列化。以後,將序列化的component發送給全部的任務所在的機器(即Supervisor節點),在每個任務上反序列化component。
5. Spout和Bolt之間、Bolt和Bolt之間的通訊,是經過zeroMQ的消息隊列實現的。
二 做業的提交
下面的代碼展現了以本地運行方式提交一個Topology做業
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCount(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
此例中的builder是TopologyBuilder對象,經過它的createTopology方法能夠建立一個Topology對象,同時此builder還要定義當前Topology中用到的Spout和Bolt對象,分別經過setSpout方法和setBolt方法來完成。
setSpout方法和setBolt方法中的第一個參數是當前的Component組件的Stream流ID號;第二個參數是具體的Component實現類的構造;第三個參數是當前Component的並行執行的線程數目,Storm會根據這個數字的累加和來肯定Topology的Task數目。
經過一個LocalCluster對象來定義一個進程內的集羣。提交topology給這個虛擬的集羣和提交topology給分佈式集羣是同樣的。經過調用submitTopology方法來提交topology, 它接受三個參數:要運行的topology的名字,一個配置對象以及要運行的topology自己。
下面對worker、executor以及task作一下說明:
worker:每一個worker都屬於一個特定的Topology,每一個Supervisor節點的worker能夠有多個,每一個worker使用一個單獨的端口,它對Topology中的每一個component運行一個或者多個executor線程來提供task的運行服務。其數目能夠經過設置yaml中的topology.workers屬性以及在代碼中經過Config的setNumWorkers方法設定。
executor:產生於worker進程內部的線程,會執行同一個component的一個或者多個task。 其數目能夠在Topology的入口類中setBolt、setSpout方法的最後一個參數指定,不指定的話,默認爲1;
task:實際的數據處理由task完成,在Topology的生命週期中,每一個組件的task數目是不會發生變化的,而executor的數目卻不必定。executor數目小於等於task的數目,默認狀況下,兩者是相等的。在代碼中經過TopologyBuilder的setNumTasks方法設定具體某個組件的task數目;
有幾點須要說明的地方:
1.Storm提交後,會把代碼首先存放到Nimbus節點的inbox目錄下,以後,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化以後的Topology代碼文件;
2.在設定Topology所關聯的Spouts和Bolts時,能夠同時設置當前Spout和Bolt的executor數目和task數目,默認狀況下,一個Topology的task的總和是和executor的總和一致的。以後,系統根據worker的數目,儘可能平均的分配這些task的執行。worker在哪一個supervisor節點上運行是由storm自己決定的;
3. 任務分配好以後,Nimbes節點會將任務的信息提交到zookeeper集羣,同時在zookeeper集羣中會有workerbeats節點,這裏存儲了當前Topology的全部worker進程的心跳信息;
4. Supervisor節點會不斷的輪詢zookeeper集羣,在zookeeper的assignments節點中保存了全部Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關係等,Supervisor經過輪詢此節點的內容,來領取本身的任務,啓動worker進程運行;
5.一個Topology運行以後,就會不斷的經過Spouts來發送Stream流,經過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。最後一步會不間斷的執行,除非手動結束Topology。
6.經過在Nimbus節點利用以下命令來終止一個Topology的運行:storm kill topologyName kill以後,能夠經過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集羣中的和當前Topology相關的信息以後,此Topology就會完全消失了。
三 分組策略
1.shuffleGrouping 隨機分組
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
它只有一個參數(數據源組件),而且數據源會向隨機選擇的bolt發送元組,保證每一個消費者收到近似數量的元組。
2.fieldsGrouping 域數據流組
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));
域數據流組容許你基於元組的一個或多個域控制如何把元組發送給bolts。它保證擁有相同域組合的值集發送給同一個bolt。
注: 在域數據流組中的全部域集合必須存在於數據源的域聲明中
3.allGrouping 所有數據流組
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGroupint("word-normalizer",new Fields("word"))
.allGrouping("signals-spout","signals");
所有數據流組,爲每一個接收數據的實例複製一份元組副本。這種分組方式用於向全部bolts發送信號。好比,你要刷新緩存,你能夠向全部的bolts發送一個刷新緩存信號。
4.customGrouping 自定義數據流組
builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping());
5.directGrouping 直接數據流組
builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");
這是一個特殊的數據流組,數據源能夠用它決定哪一個組件接收元組。與前面的例子相似,數據源將根據單詞首字母決定由哪一個bolt接收元組。
6.全局數據流組
全局數據流組把全部數據源建立的元組發送給單一目標實例(即擁有最低ID的任務)。
四 配置選項
在運行Topology以前,能夠經過一些參數的配置來調節運行時的狀態,參數的配置是經過Storm框架部署目錄下的conf/storm.yaml文件來完成的。在此文件中能夠配置運行時的Storm本地目錄路徑、運行時Worker的數目等。
在代碼中,也能夠設置Config的一些參數,可是優先級是不一樣的,不一樣位置配置Config參數的優先級順序爲:
default.yaml < storm.yaml <Topology內部的configuration <內部組件的special configuration < 外部組件的special configuration
在storm.yaml中經常使用的幾個選項爲: