本博文的主要內容有html
.Storm的單機模式安裝java
.Storm的分佈式安裝(3節點)mysql
.No space left on deviceredis
.storm工程的eclipse的java編寫sql
分佈式的一個計算系統,可是跟mr不同,就是實時的,實時的跟Mr離線批處理不同。express
離線mr主要是作數據挖掘、數據分析、數據統計和br分析。apache
Storm,主要是在線的業務系統。數據像水同樣,源源不斷的來,而後,在流動的過程當中啊,就要把數據處理完。好比說,一些解析,業務系統裏採集的一些日誌信息、報文啊,而後呢,把它們解析成某一種格式,好比說解析過來的xml格式,而後呢,最後呢,要落到一個SQL或NoSQL數據庫裏去。vim
在這落進去以前,就得源源不斷地,就要處理好,這一工具就是靠storm工具。數組
固然,hadoop也能夠作,可是它那邊是離線的批量。
Storm它本身,是不做任何存儲的,數據有地方來,結果有地方去。通常是結合消息隊列或數據庫來用的,消息隊列是數據源,數據庫是數據目的地。
Bolts,能夠理解爲水廠裏的處理的每一個環節。
storm相關概念圖
參考連接:http://www.xuebuyuan.com/1932716.html
http://www.aboutyun.com/thread-15397-1-1.html
Storm單機運行是否是不須要啓動zookeeper、Nimbus、Supervisor ? About雲開發
http://www.dataguru.cn/thread-477891-1-1.html
Storm單機+zookeeper集羣安裝
因爲,Storm須要zookeeper,而,storm自帶是沒有zookeeper的。
須要依賴外部安裝的zookeeper集羣。業務裏,通常都是3節點的zookeeper集羣,而是這裏只是如今入門,先來玩玩。
Zookeeper的單機模式安裝,這裏就很少贅述了。
見,個人博客
Storm的單機模式安裝
一、 apache-storm-0.9.2-incubating.tar.gz的下載
http://storm.apache.org/downloads.html
二、 apache-storm-0.9.2-incubating.tar.gz的上傳
sftp> cd /home/hadoop/app/
sftp> put c:/apache-storm-0.9.2-incubating.tar.gz
Uploading apache-storm-0.9.2-incubating.tar.gz to /home/hadoop/app/apache-storm-0.9.2-incubating.tar.gz
100% 19606KB 6535KB/s 00:00:03
c:/apache-storm-0.9.2-incubating.tar.gz: 20077564 bytes transferred in 3 seconds (6535 KB/s)
sftp>
[hadoop@weekend110 app]$ ls
hadoop-2.4.1 hbase-0.96.2-hadoop2 hive-0.12.0 jdk1.7.0_65 kafka_2.10-0.8.1.1
[hadoop@weekend110 app]$ ls
apache-storm-0.9.2-incubating.tar.gz hadoop-2.4.1 hbase-0.96.2-hadoop2 hive-0.12.0 jdk1.7.0_65 kafka_2.10-0.8.1.1
三、 apache-storm-0.9.2-incubating.tar.gz的壓縮
[hadoop@weekend110 app]$ ll
total 19628
-rw-r--r--. 1 root root 20077564 May 12 03:45 apache-storm-0.9.2-incubating.tar.gz
drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1
drwxrwxr-x. 8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2
drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0
drwxr-xr-x. 8 hadoop hadoop 4096 Jun 17 2014 jdk1.7.0_65
drwxr-xr-x. 6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1
[hadoop@weekend110 app]$ su root
Password:
[root@weekend110 app]# tar -zxvf apache-storm-0.9.2-incubating.tar.gz
四、 apache-storm-0.9.2-incubating.tar.gz的權限修改和刪除壓縮包
[root@weekend110 app]# ll
total 19632
drwxr-xr-x. 9 root root 4096 Oct 14 17:12 apache-storm-0.9.2-incubating
-rw-r--r--. 1 root root 20077564 May 12 03:45 apache-storm-0.9.2-incubating.tar.gz
drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1
drwxrwxr-x. 8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2
drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0
drwxr-xr-x. 8 hadoop hadoop 4096 Jun 17 2014 jdk1.7.0_65
drwxr-xr-x. 6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1
[root@weekend110 app]# chown -R hadoop:hadoop apache-storm-0.9.2-incubating
[root@weekend110 app]# ll
total 19632
drwxr-xr-x. 9 hadoop hadoop 4096 Oct 14 17:12 apache-storm-0.9.2-incubating
-rw-r--r--. 1 root root 20077564 May 12 03:45 apache-storm-0.9.2-incubating.tar.gz
drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1
drwxrwxr-x. 8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2
drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0
drwxr-xr-x. 8 hadoop hadoop 4096 Jun 17 2014 jdk1.7.0_65
drwxr-xr-x. 6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1
[root@weekend110 app]# rm apache-storm-0.9.2-incubating.tar.gz
rm: remove regular file `apache-storm-0.9.2-incubating.tar.gz'? y
[root@weekend110 app]# ll
total 24
drwxr-xr-x. 9 hadoop hadoop 4096 Oct 14 17:12 apache-storm-0.9.2-incubating
drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1
drwxrwxr-x. 8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2
drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0
drwxr-xr-x. 8 hadoop hadoop 4096 Jun 17 2014 jdk1.7.0_65
drwxr-xr-x. 6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1
[root@weekend110 app]#
五、 apache-storm-0.9.2-incubating.tar.gz的配置
[hadoop@weekend110 app]$ ll
total 24
drwxr-xr-x. 9 hadoop hadoop 4096 Oct 14 17:12 apache-storm-0.9.2-incubating
drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1
drwxrwxr-x. 8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2
drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0
drwxr-xr-x. 8 hadoop hadoop 4096 Jun 17 2014 jdk1.7.0_65
drwxr-xr-x. 6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1
[hadoop@weekend110 app]$ cd apache-storm-0.9.2-incubating/
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ ls
bin conf examples lib logback public RELEASE
CHANGELOG.md DISCLAIMER external LICENSE NOTICE README.markdown SECURITY.md
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ cd conf/
[hadoop@weekend110 conf]$ ls
storm_env.ini storm.yaml
[hadoop@weekend110 conf]$ vim storm.yaml
# storm.zookeeper.servers:
# - "server1"
# - "server2"
#
# nimbus.host: "nimbus"
修改成
#storm所使用的zookeeper集羣主機
storm.zookeeper.servers:
- "weekend110"
#nimbus所在的主機名
nimbus.host: " weekend110"
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "weekend110"
nimbus.host: "weekend110"
#
# ##### These may optionally be filled in:
#
## List of custom serializations
# topology.kryo.register:
# - org.mycompany.MyType
# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
# - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
# - "server1"
# - "server2"
## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "backtype.storm.metric.LoggingMetricsConsumer"
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
在這裏,也許,修改不了,就換成root權限。
六、apache-storm-0.9.2-incubating.tar.gz環境變量
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ pwd
/home/hadoop/app/apache-storm-0.9.2-incubating
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ su root
Password:
[root@weekend110 apache-storm-0.9.2-incubating]# vim /etc/profile
export JAVA_HOME=/home/hadoop/app/jdk1.7.0_65
export HADOOP_HOME=/home/hadoop/app/hadoop-2.4.1
export ZOOKEEPER_HOME=/home/hadoop/app/zookeeper-3.4.6
export HIVE_HOME=/home/hadoop/app/hive-0.12.0
export HBASE_HOME=/home/hadoop/app/hbase-0.96.2-hadoop2
export STORM_HOME=/home/hadoop/app/apache-storm-0.9.2-incubating
export KAFKA_HOME=/home/hadoop/app/kafka_2.10-0.8.1.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$HBASE_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin
[root@weekend110 apache-storm-0.9.2-incubating]# source /etc/profile
[root@weekend110 apache-storm-0.9.2-incubating]#
啓動
先啓動,外部安裝的zookeeper,
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ pwd
/home/hadoop/app/apache-storm-0.9.2-incubating
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
4640 Jps
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ cd /home/hadoop/app/zookeeper-3.4.6/
[hadoop@weekend110 zookeeper-3.4.6]$ pwd
/home/hadoop/app/zookeeper-3.4.6
[hadoop@weekend110 zookeeper-3.4.6]$ cd bin
[hadoop@weekend110 bin]$ ./zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@weekend110 bin]$ jps
4675 Jps
4659 QuorumPeerMain
[hadoop@weekend110 bin]$ cd /home/hadoop/app/apache-storm-0.9.2-incubating/
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ cd bin
[hadoop@weekend110 bin]$ ls
storm storm.cmd storm-config.cmd
[hadoop@weekend110 bin]$ ./storm nimbus
參考:
通常,推薦
在nimbus機器上,執行
[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &
//意思是,啓動主節點
[hadoop@weekend110 bin]$ nohup ./storm ui 1>/dev/null 2>&1 &
//意思是,啓動ui界面
啓動,報錯誤。
http://blog.csdn.net/asas1314/article/details/44088003
參考這篇博客。
storm.zookeeper.servers:
- "192.168.1.117"
nimbus.host: "192.168.1.117"
storm.local.dir: "/home/chenny/Storm/tmp/storm"
java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
topology.debug: "true"
須要注意的是Storm讀取此配置文件,要求每一行開始都要有一個空格,每個冒號後面也要有一個空格,不然就會出現錯誤,形成啓動失敗。咱們一樣能夠爲Storm添加環境變量,來方便咱們的啓動、中止。
storm.zookeeper.servers:
- "weekedn110"
nimbus.host: "weekend110"
storm.local.dir: "/home/hadoop/data/apache-storm-0.9.2-incubating/tmp/storm"
topology.debug: "true"
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ pwd
/home/hadoop/app/apache-storm-0.9.2-incubating
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ mkdir -p /home/hadoop/data/apache-storm-0.9.2-incubating/tmp/storm
mkdir: cannot create directory `/home/hadoop/data/apache-storm-0.9.2-incubating': No space left on device
[hadoop@weekend110 apache-storm-0.9.2-incubating]$
通過,這個問題,依然仍是解決不了。。
爲此,我把storm的路徑,安裝到了,/usr/local/下,
吸收了,教訓,就是,在系統安裝以前。分區要大些。
特別對於/和/home/,這兩個分區。由於是常安裝軟件的目錄啊!!!嗚嗚~~
在這裏,我依然仍是未解決問題。
記本博文於此,爲了方便往後的再常閱和再解決!
錯誤:
Exception in thread "main" java.lang.IllegalArgumentException: field topology.debug 'true' must be a 'java.lang.Boolean'
可是,這是前臺程序,把這個窗口一關,就不行了。
通常,推薦
[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &
//意思是,啓動主節點
[hadoop@weekend110 bin]$ nohup ./storm ui 1>/dev/null 2>&1 &
//意思是,啓動ui界面
[hadoop@weekend110 bin]$ pwd
/home/hadoop/app/apache-storm-0.9.2-incubating/bin
[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &
[1] 2700
[hadoop@weekend110 bin]$ nohup ./storm ui 1>/dev/null 2>&1 &
[2] 2742
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
2116 QuorumPeerMain
2701 config_value //表明,正在啓動,是中間進程,這裏是nimbus的中間進程
2710 Jps
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
2116 QuorumPeerMain
2700 nimbus
2743 config_value //表明,正在啓動,是中間進程,這裏是core的中間進程
2752 Jps
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
2116 QuorumPeerMain
2797 nimbus
2742 core
2826 Jps
[hadoop@weekend110 apache-storm-0.9.2-incubating]$
啓動storm
在nimbus主機上
nohup ./storm nimbus 1>/dev/null 2>&1 &
nohup ./storm ui 1>/dev/null 2>&1 &
在supervisor主機上
nohup ./storm supervisor 1>/dev/null 2>&1 &
[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &
[3] 2864
[hadoop@weekend110 bin]$ nohup ./storm supervisor 1>/dev/null 2>&1 &
[4] 2875
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
2116 QuorumPeerMain
2855 Jps
2742 core
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
2116 QuorumPeerMain
2903 config_value
2885 config_value
2742 core
2894 Jps
[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps
2116 QuorumPeerMain
2937 Jps
2742 core
2875 supervisor
2947 nimbus
[hadoop@weekend110 apache-storm-0.9.2-incubating]$
進入,
Version |
Nimbus uptime |
Supervisors |
Used slots |
Free slots |
Total slots |
Executors |
Tasks |
0.9.2-incubating |
10m 41s |
1 |
0 |
4 |
4 |
0 |
0 |
Name |
Id |
Status |
Uptime |
Num workers |
Num executors |
Num tasks |
Id |
Host |
Uptime |
Slots |
Used slots |
3a41e7dd-0160-4ad0-bad5-096cdba4647e |
weekend110 |
9m 30s |
4 |
0 |
Key |
Value |
dev.zookeeper.path |
/tmp/dev-storm-zookeeper |
topology.tick.tuple.freq.secs |
|
topology.builtin.metrics.bucket.size.secs |
60 |
topology.fall.back.on.java.serialization |
true |
topology.max.error.report.per.interval |
5 |
zmq.linger.millis |
5000 |
topology.skip.missing.kryo.registrations |
false |
storm.messaging.netty.client_worker_threads |
1 |
ui.childopts |
-Xmx768m |
storm.zookeeper.session.timeout |
20000 |
nimbus.reassign |
true |
topology.trident.batch.emit.interval.millis |
500 |
storm.messaging.netty.flush.check.interval.ms |
10 |
nimbus.monitor.freq.secs |
10 |
logviewer.childopts |
-Xmx128m |
java.library.path |
/usr/local/lib:/opt/local/lib:/usr/lib |
topology.executor.send.buffer.size |
1024 |
storm.local.dir |
/home/hadoop/data/apache-storm-0.9.2-incubating/tmp/storm |
storm.messaging.netty.buffer_size |
5242880 |
supervisor.worker.start.timeout.secs |
120 |
topology.enable.message.timeouts |
true |
nimbus.cleanup.inbox.freq.secs |
600 |
nimbus.inbox.jar.expiration.secs |
3600 |
drpc.worker.threads |
64 |
topology.worker.shared.thread.pool.size |
4 |
nimbus.host |
weekend110 |
storm.messaging.netty.min_wait_ms |
100 |
storm.zookeeper.port |
2181 |
transactional.zookeeper.port |
|
topology.executor.receive.buffer.size |
1024 |
transactional.zookeeper.servers |
|
storm.zookeeper.root |
/storm |
storm.zookeeper.retry.intervalceiling.millis |
30000 |
supervisor.enable |
true |
storm.messaging.netty.server_worker_threads |
1 |
storm.zookeeper.servers |
weekend110 |
transactional.zookeeper.root |
/transactional |
topology.acker.executors |
|
topology.transfer.buffer.size |
1024 |
topology.worker.childopts |
|
drpc.queue.size |
128 |
worker.childopts |
-Xmx768m |
supervisor.heartbeat.frequency.secs |
5 |
topology.error.throttle.interval.secs |
10 |
zmq.hwm |
0 |
drpc.port |
3772 |
supervisor.monitor.frequency.secs |
3 |
drpc.childopts |
-Xmx768m |
topology.receiver.buffer.size |
8 |
task.heartbeat.frequency.secs |
3 |
topology.tasks |
|
storm.messaging.netty.max_retries |
30 |
topology.spout.wait.strategy |
backtype.storm.spout.SleepSpoutWaitStrategy |
nimbus.thrift.max_buffer_size |
1048576 |
topology.max.spout.pending |
|
storm.zookeeper.retry.interval |
1000 |
topology.sleep.spout.wait.strategy.time.ms |
1 |
nimbus.topology.validator |
backtype.storm.nimbus.DefaultTopologyValidator |
supervisor.slots.ports |
6700,6701,6702,6703 |
topology.debug |
false |
nimbus.task.launch.secs |
120 |
nimbus.supervisor.timeout.secs |
60 |
topology.message.timeout.secs |
30 |
task.refresh.poll.secs |
10 |
topology.workers |
1 |
supervisor.childopts |
-Xmx256m |
nimbus.thrift.port |
6627 |
topology.stats.sample.rate |
0.05 |
worker.heartbeat.frequency.secs |
1 |
topology.tuple.serializer |
backtype.storm.serialization.types.ListDelegateSerializer |
topology.disruptor.wait.strategy |
com.lmax.disruptor.BlockingWaitStrategy |
topology.multilang.serializer |
backtype.storm.multilang.JsonSerializer |
nimbus.task.timeout.secs |
30 |
storm.zookeeper.connection.timeout |
15000 |
topology.kryo.factory |
backtype.storm.serialization.DefaultKryoFactory |
drpc.invocations.port |
3773 |
logviewer.port |
8000 |
zmq.threads |
1 |
storm.zookeeper.retry.times |
5 |
topology.worker.receiver.thread.count |
1 |
storm.thrift.transport |
backtype.storm.security.auth.SimpleTransportPlugin |
topology.state.synchronization.timeout.secs |
60 |
supervisor.worker.timeout.secs |
30 |
nimbus.file.copy.expiration.secs |
600 |
storm.messaging.transport |
backtype.storm.messaging.netty.Context |
logviewer.appender.name |
A1 |
storm.messaging.netty.max_wait_ms |
1000 |
drpc.request.timeout.secs |
600 |
storm.local.mode.zmq |
false |
ui.port |
8080 |
nimbus.childopts |
-Xmx1024m |
storm.cluster.mode |
distributed |
topology.max.task.parallelism |
|
storm.messaging.netty.transfer.batch.size |
262144 |
這裏呢,我由於,是方便入門和深刻理解概念。因此,玩得是單機模式。
storm分佈式模式
一、安裝一個zookeeper集羣
二、上傳storm的安裝包,解壓
三、修改配置文件storm.yaml
#所使用的zookeeper集羣主機
storm.zookeeper.servers:
- "weekend05"
- "weekend06"
- "weekend07"
#nimbus所在的主機名
nimbus.host: "weekend05"
supervisor.slots.ports
-6701
-6702
-6703
-6704
-6705
啓動storm
在nimbus主機上
nohup ./storm nimbus 1>/dev/null 2>&1 &
nohup ./storm ui 1>/dev/null 2>&1 &
在supervisor主機上
nohup ./storm supervisor 1>/dev/null 2>&1 &
storm的深刻學習:
分佈式共享鎖的實現
事務topology的實現機制及開發模式
在具體場景中的跟其餘框架的整合(flume/activeMQ/kafka(分佈式的消息隊列系統) /redis/hbase/mysql cluster)
手機實時位置查詢。
新建storm工程
這裏,推薦用新建Maven工程,多好!
固然,爲了照顧初學者,手工添加導入依賴包。
同時,各位來觀看我本博客的博友們,其實,在生產是必定要是Maven的啊!何止能出書的人。
weekend110-storm -> Build Path -> Configure Build Path
D:\SoftWare\apache-storm-0.9.2-incubating\lib
D:\SoftWare\apache-storm-0.9.2-incubating\external\storm-kafka
這個很重要,通常storm和kafka,作整合,是必需要藉助用到這個jar包的。
新建包cn.itcast.stormdemo
新建類RandomWordSpout.java
新建類UpperBolt.java
新建類 SuffixBolt.java
新建類 TopoMain.java
編寫代碼
RandomWordSpout.java
package cn.itcast.stormdemo;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RandomWordSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
//模擬一些數據
String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
//不斷地往下一個組件發送tuple消息
//這裏面是該spout組件的核心邏輯
@Override
public void nextTuple() {
//能夠從kafka消息隊列中拿到數據,簡便起見,咱們從words數組中隨機挑選一個商品名發送出去
Random random = new Random();
int index = random.nextInt(words.length);
//經過隨機數拿到一個商品名
String godName = words[index];
//將商品名封裝成tuple,發送消息給下一個組件
collector.emit(new Values(godName));
//每發送一個消息,休眠500ms
Utils.sleep(500);
}
//初始化方法,在spout組件實例化時調用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
//聲明本spout組件發送出去的tuple中的數據的字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("orignname"));
}
}
UpperBolt.java
package cn.itcast.stormdemo;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class UpperBolt extends BaseBasicBolt{
//業務處理邏輯
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先獲取到上一個組件傳遞過來的數據,數據在tuple裏面
String godName = tuple.getString(0);
//將商品名轉換成大寫
String godName_upper = godName.toUpperCase();
//將轉換完成的商品名發送出去
collector.emit(new Values(godName_upper));
}
//聲明該bolt組件要發出去的tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uppername"));
}
}
SuffixBolt.java
package cn.itcast.stormdemo;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SuffixBolt extends BaseBasicBolt{
FileWriter fileWriter = null;
//在bolt組件運行過程當中只會被調用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//該bolt組件的核心處理邏輯
//每收到一個tuple消息,就會被調用一次
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先拿到上一個組件發送過來的商品名稱
String upper_name = tuple.getString(0);
String suffix_name = upper_name + "_itisok";
//爲上一個組件發送過來的商品名稱添加後綴
try {
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//本bolt已經不須要發送tuple消息到下一個組件,因此不須要再聲明tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
TopoMain.java
package cn.itcast.stormdemo;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
/**
* 組織各個處理組件造成一個完整的處理流程,就是所謂的topology(相似於mapreduce程序中的job)
* 而且將該topology提交給storm集羣去運行,topology提交到集羣后就將永無休止地運行,除非人爲或者異常退出
*
*
*/
public class TopoMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//將咱們的spout組件設置到topology中去
//parallelism_hint :4 表示用4個excutor來執行這個組件
//setNumTasks(8) 設置的是該組件執行時的併發task數量,也就意味着1個excutor會運行2個task
builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
//將大寫轉換bolt組件設置到topology,而且指定它接收randomspout組件的消息
//.shuffleGrouping("randomspout")包含兩層含義:
//一、upperbolt組件接收的tuple消息必定來自於randomspout組件
//二、randomspout組件和upperbolt組件的大量併發task實例之間收發消息時採用的分組策略是隨機分組shuffleGrouping
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
//將添加後綴的bolt組件設置到topology,而且指定它接收upperbolt組件的消息
builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
//用builder來建立一個topology
StormTopology demotop = builder.createTopology();
//配置一些topology在集羣中運行時的參數
Config conf = new Config();
//這裏設置的是整個demotop所佔用的槽位數,也就是worker的數量
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
//將這個topology提交給storm集羣運行
StormSubmitter.submitTopology("demotopo", conf, demotop);
}
}
補充:
http://www.cnblogs.com/vincent-vg/p/5850852.html