2、安裝部署html
1、storm僞分佈式安裝java
(一)環境準備
一、OS:debian 7
二、JDK 7.0
(二)安裝zookeeper
一、下載zookeeper並解壓
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
二、準備配置文件
cd conf
cp zoo_sample.cfg zoo.cfg
三、啓動zookeeper
bin/zkServer.sh start
四、驗證zookeeper的狀態
bin/zkServer.sh status
輸出以下:
JMX enabled by default
Using config: /home/jediael/setupfile/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
(三)安裝storm
node
一、下載storm並解壓
wget http://mirror.bit.edu.cn/apache/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz
tar -zxvf apache-storm-0.9.4.tar.gz
二、啓動storm
nohup bin/storm nimbus &
nohup bin/storm supervisor &
nohup bin/storm ui &
三、查看進程
jediael@jediael:~/setupfile/zookeeper-3.4.6$ jps | grep -v Jps
3235 supervisor
3356 core
3140 QuorumPeerMain
3214 nimbus
四、查看ui界面
http://ip:8080
(四)運行程序
一、根據《storm分佈式實時計算模式》第一章代碼及P41的修改,並打包上傳到服務器
二、運行job
storm jar word-count-1.0-SNAPSHOT.jar storm.blueprints.chapter1.v1.WordCountTopology wordcount-topology
三、在ui界面上能夠看到一個topology正在運行
git
2、storm集羣安裝github
注意:先安裝zookeeper:http://blog.csdn.net/jinhong_lu/article/details/46519899
shell
一、下載storm並解壓
wget http://mirror.bit.edu.cn/apache/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz
tar -zxvf apache-storm-0.9.4.tar.gz
並在home目錄中添加連接
ln -s src/apache-storm-0.9.4 storm
二、配置storm,在storm.yaml中添加如下內容
storm.zookeeper.servers:
- "gdc-nn01-test"
- "gdc-dn01-test"
- "gdc-dn02-test"
nimbus.host: "gdc-nn01-test"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.local.dir: "/home/hadoop/storm/data」apache
#jvm setting
nimbus.childopts:"-4096m」
supervisor.childopts:"-Xmx4096m"
nimubs.childopts:"-Xmx3072m」
數組
說明:安全
一、關於日誌
在初次運行storm程序時,可能會出現各類各樣的錯誤,通常錯誤都可在日誌中發現,在本例中,須要重點關注的日誌有:
(1)supervisor上的work日誌,位於$STORM_HOME/logs,若是集羣正常,但某個topology運行出現錯誤,通常能夠在這些work日誌中找到問題。最多見的是CLASSNOTFOUNDEXCEPTION, CLASSNOTDEFINDEXCEPTION,都是缺包致使的,將它們放入$STORM_HOME/lib便可。
(2)nimbus上的日誌,位於$STORM_HOME/logs,主要觀察整個集羣的狀態,有如下4個文件
access.log metrics.log nimbus.log ui.log
(3)kafka的日誌,位於$KAFKA_HOME/logs,觀察kafka是否運行正常。
2.關於emit與transfer(轉自http://www.reader8.cn/jiaocheng/20120801/2057699.html)
storm ui上emit和transferred的區別
最開始對storm ui上展現出來的emit和transferred數量不是很明白, 因而在storm-user上google了一把, 發現有人也有跟我同樣的困惑, nathan作了詳細的回答:
emitted欄顯示的數字表示的是調用OutputCollector的emit方法的次數.
transferred欄顯示的數字表示的是實際tuple發送到下一個task的計數.
若是一個bolt A使用all group的方式(每個bolt都要接收到)向bolt B發射tuple, 此時bolt B啓動了5個task, 那麼trasferred顯示的數量將是emitted的5倍.
若是一個bolt A內部執行了emit操做, 可是沒有指定tuple的接受者, 那麼transferred將爲0.
這裏還有關於spout, bolt之間的emitted數量的關係討論, 也解釋了個人一些疑惑:
有 的bolt的execture方法中並無emit tuple, 可是storm ui中依然有顯示emitted, 主要是由於它調用了ack方法, 而該方法將emit ack tuple到系統默認的acker bolt. 所以若是anchor方式emit一個tuple, emitted通常會包含向acker bolt發射tuple的數量.
另外collector.emit(new Values(xxx))和collector.emit(tuple, new Values(xxx)) 這兩種不一樣的emit方法也會影響後面bolt的emitted和transferred, 若是是前者, 則後續bolt的這兩個值都是0, 由於前一個emit方法是非安全的, 再也不使用acker來進行校驗.
三、注意、重點:storm運行topology時會有一大堆的包依賴問題,建議保存好現有的包,在新集羣中直接導入便可,並且都放到集羣中的每個機器上。
三、將storm整個目錄scp到dn01,dn02,dn03
四、啓動storm
(1)在nn01上啓動nimbus,ui
nohup bin/storm nimbus &
nohup bin/storm ui &
(2)在dn0[123]上啓動
nohup bin/storm superivsor &
五、驗證
(1)打開頁面看狀態
http://192.168.169.91:8080/index.html
(2)在example目錄下執行一個示例topology
$ /home/hadoop/storm/bin/storm jar storm-starter-topologies-0.9.4.jar storm.stater.WordCountTopology word-count
服務器
而後再到ui上看看是否已經提交成功
4、配置
4、配置
完整的默認配置文件見下面defaluts.yaml,若須要修改,則在storm.yaml中修改。重要參數以下:
一、storm.zookeeper.servers:指定使用哪一個zookeeper集羣
storm.zookeeper.servers:
- "gdc-nn01-test"
- "gdc-dn01-test"
- "gdc-dn02-test」
二、nimbus.host:指定nimbus是哪臺機器
nimbus.host: "gdc-nn01-test」
三、指定supervisor在哪一個端口上運行worker,每一個端口可運行一個worker,所以有多少個配置端口,則每一個supervisor有多少個slot(便可運行多少個worker)
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.local.dir: "/home/hadoop/storm/data"
四、jvm設置
#jvm setting
nimbus.childopts:"-4096m」
supervisor.childopts:"-Xmx4096m"
nimubs.childopts:"-Xmx3072m」
除此外,還有ui.childopts,logviewer.childopts
附完整配置文件:defaults.yaml
<span style="font-family:Courier New;">########### These all have default values as shown ########### Additional configuration goes into storm.yaml java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib" ### storm.* configs are general configurations # the local dir is where jars are kept storm.local.dir: "storm-local" storm.zookeeper.servers: - "localhost" storm.zookeeper.port: 2181 storm.zookeeper.root: "/storm" storm.zookeeper.session.timeout: 20000 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.cluster.mode: "distributed" # can be distributed or local storm.local.mode.zmq: false storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" ### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator" ### ui.* configs are for the master ui.port: 8080 ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" logviewer.appender.name: "A1" drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" transactional.zookeeper.root: "/transactional" transactional.zookeeper.servers: null transactional.zookeeper.port: null ### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.childopts: "-Xmx256m" #how long supervisor will wait to ensure that a worker process is started supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1 # control how many worker receiver threads we need per worker topology.worker.receiver.thread.count: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 300 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. storm.messaging.netty.flush.check.interval.ms: 10 ### topology.* configs are for specific executing storms topology.enable.message.timeouts: true topology.debug: false topology.workers: 1 topology.acker.executors: null topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched topology.executor.send.buffer.size: 1024 #individual messages topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 topology.classpath: null topology.environment: null dev.zookeeper.path: "/tmp/dev-storm-zookeeper"</span>
6、API
(一)一個例子
本示例使用storm運行經典的wordcount程序,拓撲以下:
sentence-spout—>split-bolt—>count-bolt—>report-bolt
分別完成句子的產生、拆分出單詞、單詞數量統計、統計結果輸出
完整代碼請見 https://github.com/jinhong-lu/stormdemo
如下是關鍵代碼的分析。
一、建立spout
<span style="font-family:Courier New;">public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector; private int index = 0; private String[] sentences = { "when i was young i'd listen to the radio", "waiting for my favorite songs", "when they played i'd sing along", "it make me smile", "those were such happy times and not so long ago", "how i wondered where they'd gone", "but they're back again just like a long lost friend", "all the songs i love so well", "every shalala every wo'wo", "still shines.", "every shing-a-ling-a-ling", "that they're starting", "to sing so fine"}; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void nextTuple() { this.collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; } try { Thread.sleep(1); } catch (InterruptedException e) { //e.printStackTrace(); } } }</span>
上述類中,將string數組中內容逐行發送出去,主要的方法有:
(1)open()方法完成spout的初始化工做,與bolt的prepare()方法相似
(2)declareOutputFileds()定義了發送內容的字段名稱與字段數量,bolt中的方法名稱同樣。
(3)nextTuple()方法是對每個須要處理的數據均會執行的操做,也bolt的executor()方法相似。它是整個邏輯處理的核心,經過emit()方法將數據發送到拓撲中的下一個節點。
二、建立split-bolt
<span style="font-family:Courier New;">public class SplitSentenceBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public void execute(Tuple input) { String sentence = input.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(new Values(word)); //System.out.println(word); } } }</span>
三個方法的含義與spout相似,這個類根據空格把收到的句子進行拆分,拆成一個一個的單詞,而後把單詞逐個發送出去。
input.getStringByField("sentence」)能夠根據上一節點發送的關鍵字獲取到相應的內容。
三、建立wordcount-bolt
<span style="font-family:Courier New;">public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private Map<String,Long> counts = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap<String, Long>(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } public void execute(Tuple input) { String word = input.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); this.collector.emit(new Values(word,count)); //System.out.println(count); } }</span>
本類將接收到的word進行數量統計,並把結果發送出去。
這個bolt發送了2個filed:
declarer.declare(new Fields("word","count"));
this.collector.emit(new Values(word,count));
四、建立report-bolt
<span style="font-family:Courier New;">public class ReportBolt extends BaseRichBolt{ private Map<String, Long> counts; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counts = new HashMap<String,Long>(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public void execute(Tuple input) { String word = input.getStringByField("word"); Long count = input.getLongByField("count"); counts.put(word, count); } public void cleanup() { System.out.println("Final output"); Iterator<Entry<String, Long>> iter = counts.entrySet().iterator(); while (iter.hasNext()) { Entry<String, Long> entry = iter.next(); String word = (String) entry.getKey(); Long count = (Long) entry.getValue(); System.out.println(word + " : " + count); } super.cleanup(); } }</span>
本類將從wordcount-bolt接收到的數據進行輸出。
先將結果放到一個map中,當topo被關閉時,會調用cleanup()方法,此時將map中的內容輸出。
五、建立topo
<span style="font-family:Courier New;">public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout); builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping( SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping( COUNT_BOLT_ID); Config conf = new Config(); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); try { Thread.sleep(10000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } else { try { StormSubmitter.submitTopology(args[0], conf,builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } } }</span>
關鍵步驟爲:
(1)建立TopologyBuilder,併爲這個builder指定spout與bolt
builder.setSpout(SENTENCE_SPOUT_ID, spout);
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(
SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,
new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(
COUNT_BOLT_ID);
(2)建立conf對象
Config conf = new Config();
這個對象用於指定一些與拓撲相關的屬性,如並行度、nimbus地址等。
(3)建立並運行拓撲,這裏使用了2種方式
一是當沒有參數時,創建一個localcluster,在本地上直接運行,運行10秒後,關閉集羣:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf,builder.createTopology());
Thread.sleep(10000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
二是有參數是,將拓撲提交到集羣中:
StormSubmitter.submitTopology(args[0], conf,builder.createTopology());
第一個參數爲拓撲的名稱。
六、本地運行
直接在eclipse中運行便可,輸出結果在console中看到
七、集羣運行
(1)編譯並打包
mvn clean compile
(2)把編譯好的jar包上傳到nimbus機器上,而後
storm jar com.ljh.storm.5_stormdemo com.ljh.storm.wordcount.WordCountTopology topology_name
將拓撲提交到集羣中。
7、命令
storm完整命令以下:
Commands:
activate :storm activate test-topo,激活一個topo
classpath:storm classpath,打印出storm運行拓撲時的classpath
deactivate: storm deactivate test-topo,deativate一個topo
dev-zookeeper: 設置使用哪一個zk集羣,用於開發時使用
drpc:storm drpc,啓動drpc進程
help:
jar:storm jar **.jar **(主類名稱) args... 啓動一個拓撲
kill:storm kill test-topo,殺死一個topo
list:storm list 列出正在運行的拓撲及其狀態
localconfvalue:storm localconfvalue conf-name,輸出配置中conf-name的值
logviewer:storm logviewer 啓動logviewer,能夠經過ui查看日誌
monitor:
nimbus:storm nimbus 啓動nimbus進程
rebalance: 見上面的並行度,用於調整並行度
remoteconfvalue:storm remoteconfvalue conf-name,輸出遠程集羣配置中conf-name的值
repl:
shell:執行shell腳本
supervisor: storm supervisor 啓動supervisor
ui:storm ui 啓動ui
version: storm version, 查看version
Help:
help
help <command>
詳細介紹請見:http://storm.incubator.apache.org/documentation/Command-line-client.html
Configs can be overridden using one or more -c flags, e.g. "storm list -c nimbus.host=nimbus.mycompany.com"
8、並行度
(一)storm拓撲的並行度能夠從如下4個維度進行設置:
一、node(服務器):指一個storm集羣中的supervisor服務器數量。
二、worker(jvm進程):指整個拓撲中worker進程的總數量,這些數量會隨機的平均分配到各個node。
三、executor(線程):指某個spout或者bolt的總線程數量,這些線程會被隨機平均的分配到各個worker。
四、task(spout/bolt實例):task是spout和bolt的實例,它們的nextTuple()和execute()方法會被executors線程調用。除非明確指定,storm會給每一個executor分配一個task。若是設置了多個task,即一個線程持有了多個spout/bolt實例.
注意:以上設置的都是總數量,這些數量會被平均分配到各自的宿主上,而不是設置每一個宿主進行多少個進程/線程。詳見下面的例子。
(二)並行度的設置方法
一、node:買機器吧,而後加入集羣中……
二、worker:Config#setNumWorkers() 或者配置項 TOPOLOGY_WORKERS
三、executor:Topology.setSpout()/.setBolt()
四、task:ComponentConfigurationDeclarer#setNumWorker()
(三)例子:
一、經過config.setNumWorkers(3)將worker進程數量設置爲3,假設集羣中有3個node,則每一個node會運行一個worker。
二、executor的數量分別爲:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
總共爲13個executor,這13個executor會被隨機分配到各個worker中去。
注:這段代碼是從kafka中讀取消息源的,而這個topic在kafka中的分區數量設置爲5,所以這裏spout的線程婁爲5.
三、這個示例都沒有單獨設置task的數量,即便用每一個executor一個task的默認配置。若須要設置,能夠:
builder.setBolt("log-splitter", new LogSplitterBolt(), 3)
.shuffleGrouping("filter-bolt").setNumTasks(5);
來進行設置,這5個task會被分配到3個executor中。
(四)並行度的動態調整
對storm拓撲的並行度進行調整有2種方法:
一、kill topo—>修改代碼—>編譯—>提交拓撲
二、動態調整
第1種方法太不方便了,有時候topo不能說kill就kill,另外,若是加幾臺機器,難道要把全部topo kill掉還要修改代碼?
所以storm提供了動態調整的方法,動態調整有2種方法:
一、ui方式:進入某個topo的頁面,點擊rebalance便可,此時能夠看到topo的狀態是rebalancing。但此方法只是把進程、線程在各個機器上從新分配,即適用於增長機器,或者減小機器的情形,不能調整worker數量、executor數量等
二、cli方式:storm rebalance
舉個例子
storm rebalance toponame -n 7 -e filter-bolt=6 -e hdfs-bolt=8
將topo的worker數量設置爲7,並將filter-bolt與hdfs-bolt的executor數量分別設置爲六、8.
此時,查看topo的狀態是rebalancing,調整完成後,能夠看到3臺機器中的worker數量分別爲三、二、2
9、分組
Storm經過分組來指定數據的流向,主要指定了每一個bolt消費哪一個流,以及如何消費。
storm內置了7個分組方式,並提供了CustomStreamGrouping來建立自定義的分組方式。
一、隨機分組 shuffleGrouping
這種方式會隨機分發tuple給bolt的各個task,每一個task接到到相同數量的tuple。
二、字段分組 fieldGrouping
按照指定字段進行分組,該字段具備相同組的會被髮送到同一個task,具體不一樣值的可能會被髮送到不一樣的task。
三、全複製分組 allGrouping(或者叫廣播分組)
每個tuple都會發送給全部的task,必須當心使用。
四、全局分組 globlaGrouping
將全部tuple均發送到惟一的task,會選取task ID最小的task。這種分組下,設置task的並行度是沒有意義的。另外,這種方式頗有可能引發瓶頸。
五、不分組 noneGrouping
留做之後使用,目前也隨機分組相同。
六、指向型分組 directGrouping(或者叫直接分組)
數據源會調用emitDirect()方法來判斷一個tuple應該由哪一個storm組件來接收,只能在聲明瞭是指向型的數據流上使用。
七、本地或隨機分組 localOrShuffleGrouping
若是接收bolt在同一個進程中存在一個或者多個task,tuple會優先發送給這個task。不然和隨機分組同樣。相對於隨機分組,此方式能夠減小網絡傳輸,從而提升性能。
10、可靠性
storm blueprint: P20
從零開始學storm : P40
可靠性:spout發送的消息會被拓撲樹上的全部節點ack,不然會一直重發。
致使重發的緣由有2個:
(1)fail()被調用
(2)超時無響應。
完整的可靠性示例請參考storm blueprint的chapter1 v4代碼,或者P22,或者參考從零開始學storm P102頁的例子。關鍵步驟以下:(一)spout一、建立一個map,用於記錄已經發送的tuple的id與內容,此爲待確認的tuple列表。private ConcurrentHashMap<UUID,Values> pending;二、發送tuple時,加上一個參數用於指明該tuple的id。同時,將此tuple加入map中,等待確認。UUID msgId = UUID.randomUUID();this.pending.put(msgId,values);this.collector.emit(values,msgId);三、定義ack方法與fail方法。ack方法將tuple從map中取出this.pending.remove(msgId);fail方法將tuple從新發送this.collector.emit(this.pending.get(msgId),msgId);對於沒回復的tuple,會定時從新發送。(二)bolt處理該tuple的每一個bolt均須要增長如下內容:一、emit時,增長一個參數anchor,指定響應的tuplecollector.emit(tuple,new Values(word));二、確認接收到的tuple已經處理this.collector.ack(tuple);