1 storm基本概念 + storm編程規範及demo編寫

 

本博文的主要內容有html

  .Storm的單機模式安裝java

  .Storm的分佈式安裝(3節點)mysql

      .No space left on deviceredis

  .storm工程的eclipse的java編寫sql

 

 

 

 

     http://storm.apache.org/數據庫

 

 

  分佈式的一個計算系統,可是跟mr不同,就是實時的,實時的跟Mr離線批處理不同。express

      離線mr主要是作數據挖掘、數據分析、數據統計和br分析。apache

      Storm,主要是在線的業務系統。數據像水同樣,源源不斷的來,而後,在流動的過程當中啊,就要把數據處理完。好比說,一些解析,業務系統裏採集的一些日誌信息、報文啊,而後呢,把它們解析成某一種格式,好比說解析過來的xml格式,而後呢,最後呢,要落到一個SQL或NoSQL數據庫裏去。vim

      在這落進去以前,就得源源不斷地,就要處理好,這一工具就是靠storm工具。數組

      固然,hadoop也能夠作,可是它那邊是離線的批量。

 

  

  

  Storm它本身,是不做任何存儲的,數據有地方來,結果有地方去。通常是結合消息隊列或數據庫來用的,消息隊列是數據源,數據庫是數據目的地。

 

  Bolts,能夠理解爲水廠裏的處理的每一個環節。

 

 

storm相關概念圖

 

參考連接:http://www.xuebuyuan.com/1932716.html

http://www.aboutyun.com/thread-15397-1-1.html

Storm單機運行是否是不須要啓動zookeeperNimbusSupervisor ?  About雲開發

http://www.dataguru.cn/thread-477891-1-1.html

Storm單機+zookeeper集羣安裝

 

 

因爲,Storm須要zookeeper,而,storm自帶是沒有zookeeper的。

須要依賴外部安裝的zookeeper集羣。業務裏,通常都是3節點的zookeeper集羣,而是這裏只是如今入門,先來玩玩。

 

         Zookeeper的單機模式安裝,這裏就很少贅述了。

見,個人博客

 

1 week110的zookeeper的安裝 + zookeeper提供少許數據的存儲

 

 

 

Storm的單機模式安裝

一、 apache-storm-0.9.2-incubating.tar.gz的下載

http://storm.apache.org/downloads.html  

 

 

 

 

 

二、 apache-storm-0.9.2-incubating.tar.gz的上傳

sftp> cd /home/hadoop/app/

sftp> put c:/apache-storm-0.9.2-incubating.tar.gz

Uploading apache-storm-0.9.2-incubating.tar.gz to /home/hadoop/app/apache-storm-0.9.2-incubating.tar.gz

  100% 19606KB   6535KB/s 00:00:03    

c:/apache-storm-0.9.2-incubating.tar.gz: 20077564 bytes transferred in 3 seconds (6535 KB/s)

sftp>

 

 

[hadoop@weekend110 app]$ ls

hadoop-2.4.1  hbase-0.96.2-hadoop2  hive-0.12.0  jdk1.7.0_65  kafka_2.10-0.8.1.1

[hadoop@weekend110 app]$ ls

apache-storm-0.9.2-incubating.tar.gz  hadoop-2.4.1  hbase-0.96.2-hadoop2  hive-0.12.0  jdk1.7.0_65  kafka_2.10-0.8.1.1

 

三、 apache-storm-0.9.2-incubating.tar.gz的壓縮

[hadoop@weekend110 app]$ ll

total 19628

-rw-r--r--.  1 root   root   20077564 May 12 03:45 apache-storm-0.9.2-incubating.tar.gz

drwxr-xr-x. 11 hadoop hadoop     4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop     4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop     4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop     4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  6 hadoop hadoop     4096 Oct 13 22:09 kafka_2.10-0.8.1.1

[hadoop@weekend110 app]$ su root

Password:

[root@weekend110 app]# tar -zxvf apache-storm-0.9.2-incubating.tar.gz

 

 

四、  apache-storm-0.9.2-incubating.tar.gz的權限修改和刪除壓縮包

 

[root@weekend110 app]# ll

total 19632

drwxr-xr-x.  9 root   root       4096 Oct 14 17:12 apache-storm-0.9.2-incubating

-rw-r--r--.  1 root   root   20077564 May 12 03:45 apache-storm-0.9.2-incubating.tar.gz

drwxr-xr-x. 11 hadoop hadoop     4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop     4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop     4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop     4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  6 hadoop hadoop     4096 Oct 13 22:09 kafka_2.10-0.8.1.1

[root@weekend110 app]# chown -R hadoop:hadoop apache-storm-0.9.2-incubating

[root@weekend110 app]# ll

total 19632

drwxr-xr-x.  9 hadoop hadoop     4096 Oct 14 17:12 apache-storm-0.9.2-incubating

-rw-r--r--.  1 root   root   20077564 May 12 03:45 apache-storm-0.9.2-incubating.tar.gz

drwxr-xr-x. 11 hadoop hadoop     4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop     4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop     4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop     4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  6 hadoop hadoop     4096 Oct 13 22:09 kafka_2.10-0.8.1.1

[root@weekend110 app]# rm apache-storm-0.9.2-incubating.tar.gz

rm: remove regular file `apache-storm-0.9.2-incubating.tar.gz'? y

[root@weekend110 app]# ll

total 24

drwxr-xr-x.  9 hadoop hadoop 4096 Oct 14 17:12 apache-storm-0.9.2-incubating

drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop 4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1

[root@weekend110 app]#

 

 

五、  apache-storm-0.9.2-incubating.tar.gz的配置

[hadoop@weekend110 app]$ ll

total 24

drwxr-xr-x.  9 hadoop hadoop 4096 Oct 14 17:12 apache-storm-0.9.2-incubating

drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop 4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  6 hadoop hadoop 4096 Oct 13 22:09 kafka_2.10-0.8.1.1

[hadoop@weekend110 app]$ cd apache-storm-0.9.2-incubating/

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ ls

bin           conf        examples  lib      logback  public           RELEASE

CHANGELOG.md  DISCLAIMER  external  LICENSE  NOTICE   README.markdown  SECURITY.md

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ cd conf/

[hadoop@weekend110 conf]$ ls

storm_env.ini  storm.yaml

[hadoop@weekend110 conf]$ vim storm.yaml

 

# storm.zookeeper.servers:

#     - "server1"

#     - "server2"

#

# nimbus.host: "nimbus"

 

修改成

 

#storm所使用的zookeeper集羣主機

storm.zookeeper.servers:

     - "weekend110"

    

 

#nimbus所在的主機名

nimbus.host: " weekend110"

 

 

 

 

 

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.  The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

 

########### These MUST be filled in for a storm configuration

 storm.zookeeper.servers:

     - "weekend110"

    

 

 nimbus.host: "weekend110"

 

#

# ##### These may optionally be filled in:

#

## List of custom serializations

# topology.kryo.register:

#     - org.mycompany.MyType

#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer

#

## List of custom kryo decorators

# topology.kryo.decorators:

#     - org.mycompany.MyDecorator

#

## Locations of the drpc servers

# drpc.servers:

#     - "server1"

#     - "server2"

 

## Metrics Consumers

# topology.metrics.consumer.register:

#   - class: "backtype.storm.metric.LoggingMetricsConsumer"

#     parallelism.hint: 1

#   - class: "org.mycompany.MyMetricsConsumer"

#     parallelism.hint: 1

#     argument:

#       - endpoint: "metrics-collector.mycompany.org"

 

         在這裏,也許,修改不了,就換成root權限。

 

 

 

六、apache-storm-0.9.2-incubating.tar.gz環境變量

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ pwd

/home/hadoop/app/apache-storm-0.9.2-incubating

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ su root

Password:

[root@weekend110 apache-storm-0.9.2-incubating]# vim /etc/profile

 

export JAVA_HOME=/home/hadoop/app/jdk1.7.0_65

export HADOOP_HOME=/home/hadoop/app/hadoop-2.4.1

export ZOOKEEPER_HOME=/home/hadoop/app/zookeeper-3.4.6

export HIVE_HOME=/home/hadoop/app/hive-0.12.0

export HBASE_HOME=/home/hadoop/app/hbase-0.96.2-hadoop2

export STORM_HOME=/home/hadoop/app/apache-storm-0.9.2-incubating

export KAFKA_HOME=/home/hadoop/app/kafka_2.10-0.8.1.1

export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$HBASE_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin

 

[root@weekend110 apache-storm-0.9.2-incubating]# source /etc/profile

[root@weekend110 apache-storm-0.9.2-incubating]#

 

 

啓動

 

         先啓動,外部安裝的zookeeper,

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ pwd

/home/hadoop/app/apache-storm-0.9.2-incubating

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

4640 Jps

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ cd /home/hadoop/app/zookeeper-3.4.6/

[hadoop@weekend110 zookeeper-3.4.6]$ pwd

/home/hadoop/app/zookeeper-3.4.6

[hadoop@weekend110 zookeeper-3.4.6]$ cd bin

[hadoop@weekend110 bin]$ ./zkServer.sh start

JMX enabled by default

Using config: /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[hadoop@weekend110 bin]$ jps

4675 Jps

4659 QuorumPeerMain

[hadoop@weekend110 bin]$ cd /home/hadoop/app/apache-storm-0.9.2-incubating/

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ cd bin

[hadoop@weekend110 bin]$ ls

storm  storm.cmd  storm-config.cmd

[hadoop@weekend110 bin]$ ./storm nimbus

 

 

參考:

http://zhidao.baidu.com/link?url=GXpabgBPsQQERdSalEw5f2KC1YH4vo7xQlZzsz5xR7gongO2CspeezWxq1_Gg94ijSiner42flaJQBsONonxOjQwpDLKr-y4bNmDMyUoQiO

通常,推薦

在nimbus機器上,執行

[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &  

//意思是,啓動主節點

[hadoop@weekend110 bin]$ nohup ./storm ui 1>/dev/null 2>&1 &

                                     //意思是,啓動ui界面

 

啓動,報錯誤。

http://blog.csdn.net/asas1314/article/details/44088003

 

 

參考這篇博客。

storm.zookeeper.servers:

        - "192.168.1.117"

 nimbus.host: "192.168.1.117"

 storm.local.dir: "/home/chenny/Storm/tmp/storm"

 java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"

 topology.debug: "true"

   須要注意的是Storm讀取此配置文件,要求每一行開始都要有一個空格,每個冒號後面也要有一個空格,不然就會出現錯誤,形成啓動失敗。咱們一樣能夠爲Storm添加環境變量,來方便咱們的啓動、中止。

 

 

storm.zookeeper.servers:

      - "weekedn110"

 

  nimbus.host: "weekend110"

  storm.local.dir: "/home/hadoop/data/apache-storm-0.9.2-incubating/tmp/storm"

  topology.debug: "true"

 

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ pwd

/home/hadoop/app/apache-storm-0.9.2-incubating

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ mkdir -p /home/hadoop/data/apache-storm-0.9.2-incubating/tmp/storm

mkdir: cannot create directory `/home/hadoop/data/apache-storm-0.9.2-incubating': No space left on device

[hadoop@weekend110 apache-storm-0.9.2-incubating]$

 

 

磁盤清理

 

 

       通過,這個問題,依然仍是解決不了。。

爲此,我把storm的路徑,安裝到了,/usr/local/下,

吸收了,教訓,就是,在系統安裝以前。分區要大些。

 

       特別對於/和/home/,這兩個分區。由於是常安裝軟件的目錄啊!!!嗚嗚~~

 在這裏,我依然仍是未解決問題。

 

   記本博文於此,爲了方便往後的再常閱和再解決!

 

 

 錯誤:

Exception in thread "main" java.lang.IllegalArgumentException: field topology.debug 'true' must be a 'java.lang.Boolean'  

 

可是,這是前臺程序,把這個窗口一關,就不行了。

 

 

通常,推薦

[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &  

//意思是,啓動主節點

[hadoop@weekend110 bin]$ nohup ./storm ui 1>/dev/null 2>&1 &

                                     //意思是,啓動ui界面

 

[hadoop@weekend110 bin]$ pwd

/home/hadoop/app/apache-storm-0.9.2-incubating/bin

[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &

[1] 2700

[hadoop@weekend110 bin]$ nohup ./storm ui 1>/dev/null 2>&1 &

[2] 2742

 

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

2116 QuorumPeerMain

2701 config_value      //表明,正在啓動,是中間進程,這裏是nimbus的中間進程

2710 Jps

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

2116 QuorumPeerMain

2700 nimbus

2743 config_value    //表明,正在啓動,是中間進程,這裏是core的中間進程

2752 Jps

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

2116 QuorumPeerMain

2797 nimbus

2742 core

2826 Jps

[hadoop@weekend110 apache-storm-0.9.2-incubating]$

 

 

啓動storm

在nimbus主機上

nohup ./storm nimbus 1>/dev/null 2>&1 &

nohup ./storm ui 1>/dev/null 2>&1 &

 

在supervisor主機上

nohup ./storm supervisor 1>/dev/null 2>&1 &

 

 

 

[hadoop@weekend110 bin]$ nohup ./storm nimbus 1>/dev/null 2>&1 &

[3] 2864

[hadoop@weekend110 bin]$ nohup ./storm supervisor 1>/dev/null 2>&1 &

[4] 2875

 

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

2116 QuorumPeerMain

2855 Jps

2742 core

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

2116 QuorumPeerMain

2903 config_value

2885 config_value

2742 core

2894 Jps

[hadoop@weekend110 apache-storm-0.9.2-incubating]$ jps

2116 QuorumPeerMain

2937 Jps

2742 core

2875 supervisor

2947 nimbus

[hadoop@weekend110 apache-storm-0.9.2-incubating]$

 

 

進入,

http://weekend110:8080

 

 

 

 

 

Storm UI

Cluster Summary

Version

Nimbus uptime

Supervisors

Used slots

Free slots

Total slots

Executors

Tasks

0.9.2-incubating

10m 41s

1

0

4

4

0

0

Topology summary

Name

Id

Status

Uptime

Num workers

Num executors

Num tasks

Supervisor summary

Id

Host

Uptime

Slots

Used slots

3a41e7dd-0160-4ad0-bad5-096cdba4647e

weekend110

9m 30s

4

0

Nimbus Configuration

Key

Value

dev.zookeeper.path

/tmp/dev-storm-zookeeper

topology.tick.tuple.freq.secs

 

topology.builtin.metrics.bucket.size.secs

60

topology.fall.back.on.java.serialization

true

topology.max.error.report.per.interval

5

zmq.linger.millis

5000

topology.skip.missing.kryo.registrations

false

storm.messaging.netty.client_worker_threads

1

ui.childopts

-Xmx768m

storm.zookeeper.session.timeout

20000

nimbus.reassign

true

topology.trident.batch.emit.interval.millis

500

storm.messaging.netty.flush.check.interval.ms

10

nimbus.monitor.freq.secs

10

logviewer.childopts

-Xmx128m

java.library.path

/usr/local/lib:/opt/local/lib:/usr/lib

topology.executor.send.buffer.size

1024

storm.local.dir

/home/hadoop/data/apache-storm-0.9.2-incubating/tmp/storm

storm.messaging.netty.buffer_size

5242880

supervisor.worker.start.timeout.secs

120

topology.enable.message.timeouts

true

nimbus.cleanup.inbox.freq.secs

600

nimbus.inbox.jar.expiration.secs

3600

drpc.worker.threads

64

topology.worker.shared.thread.pool.size

4

nimbus.host

weekend110

storm.messaging.netty.min_wait_ms

100

storm.zookeeper.port

2181

transactional.zookeeper.port

 

topology.executor.receive.buffer.size

1024

transactional.zookeeper.servers

 

storm.zookeeper.root

/storm

storm.zookeeper.retry.intervalceiling.millis

30000

supervisor.enable

true

storm.messaging.netty.server_worker_threads

1

storm.zookeeper.servers

weekend110

transactional.zookeeper.root

/transactional

topology.acker.executors

 

topology.transfer.buffer.size

1024

topology.worker.childopts

 

drpc.queue.size

128

worker.childopts

-Xmx768m

supervisor.heartbeat.frequency.secs

5

topology.error.throttle.interval.secs

10

zmq.hwm

0

drpc.port

3772

supervisor.monitor.frequency.secs

3

drpc.childopts

-Xmx768m

topology.receiver.buffer.size

8

task.heartbeat.frequency.secs

3

topology.tasks

 

storm.messaging.netty.max_retries

30

topology.spout.wait.strategy

backtype.storm.spout.SleepSpoutWaitStrategy

nimbus.thrift.max_buffer_size

1048576

topology.max.spout.pending

 

storm.zookeeper.retry.interval

1000

topology.sleep.spout.wait.strategy.time.ms

1

nimbus.topology.validator

backtype.storm.nimbus.DefaultTopologyValidator

supervisor.slots.ports

6700,6701,6702,6703

topology.debug

false

nimbus.task.launch.secs

120

nimbus.supervisor.timeout.secs

60

topology.message.timeout.secs

30

task.refresh.poll.secs

10

topology.workers

1

supervisor.childopts

-Xmx256m

nimbus.thrift.port

6627

topology.stats.sample.rate

0.05

worker.heartbeat.frequency.secs

1

topology.tuple.serializer

backtype.storm.serialization.types.ListDelegateSerializer

topology.disruptor.wait.strategy

com.lmax.disruptor.BlockingWaitStrategy

topology.multilang.serializer

backtype.storm.multilang.JsonSerializer

nimbus.task.timeout.secs

30

storm.zookeeper.connection.timeout

15000

topology.kryo.factory

backtype.storm.serialization.DefaultKryoFactory

drpc.invocations.port

3773

logviewer.port

8000

zmq.threads

1

storm.zookeeper.retry.times

5

topology.worker.receiver.thread.count

1

storm.thrift.transport

backtype.storm.security.auth.SimpleTransportPlugin

topology.state.synchronization.timeout.secs

60

supervisor.worker.timeout.secs

30

nimbus.file.copy.expiration.secs

600

storm.messaging.transport

backtype.storm.messaging.netty.Context

logviewer.appender.name

A1

storm.messaging.netty.max_wait_ms

1000

drpc.request.timeout.secs

600

storm.local.mode.zmq

false

ui.port

8080

nimbus.childopts

-Xmx1024m

storm.cluster.mode

distributed

topology.max.task.parallelism

 

storm.messaging.netty.transfer.batch.size

262144

 

 

 

 

       這裏呢,我由於,是方便入門和深刻理解概念。因此,玩得是單機模式。

              

 

 storm分佈式模式

一、安裝一個zookeeper集羣

 

二、上傳storm的安裝包,解壓

 

三、修改配置文件storm.yaml

 

#所使用的zookeeper集羣主機

storm.zookeeper.servers:

     - "weekend05"

     - "weekend06"

     - "weekend07"

 

#nimbus所在的主機名

nimbus.host: "weekend05"

 

supervisor.slots.ports

-6701

-6702

-6703

-6704

-6705

 

 

啓動storm

在nimbus主機上

nohup ./storm nimbus 1>/dev/null 2>&1 &

nohup ./storm ui 1>/dev/null 2>&1 &

 

在supervisor主機上

nohup ./storm supervisor 1>/dev/null 2>&1 &

 

 

 

storm的深刻學習:

                     分佈式共享鎖的實現

                     事務topology的實現機制及開發模式

                     在具體場景中的跟其餘框架的整合(flume/activeMQ/kafka(分佈式的消息隊列系統)       /redis/hbase/mysql cluster)

 

 

 

 

 

手機實時位置查詢。

 

 

新建storm工程

 

這裏,推薦用新建Maven工程,多好!

固然,爲了照顧初學者,手工添加導入依賴包。

 

同時,各位來觀看我本博客的博友們,其實,在生產是必定要是Maven的啊!何止能出書的人。

 

 

 

weekend110-storm    ->     Build Path  ->   Configure Build Path

 

D:\SoftWare\apache-storm-0.9.2-incubating\lib 

 

D:\SoftWare\apache-storm-0.9.2-incubating\external\storm-kafka

這個很重要,通常storm和kafka,作整合,是必需要藉助用到這個jar包的。

 

 

 

 

新建包cn.itcast.stormdemo 

 

 

 

新建類RandomWordSpout.java

 

 

 

新建類UpperBolt.java

 

 

 

 

新建類 SuffixBolt.java

 

 

 

新建類 TopoMain.java

 

 

編寫代碼

RandomWordSpout.java

package cn.itcast.stormdemo;

 

import java.util.Map;

import java.util.Random;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

 

public class RandomWordSpout extends BaseRichSpout{

 

       private SpoutOutputCollector collector;

      

       //模擬一些數據

       String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};

      

       //不斷地往下一個組件發送tuple消息

       //這裏面是該spout組件的核心邏輯

       @Override

       public void nextTuple() {

 

              //能夠從kafka消息隊列中拿到數據,簡便起見,咱們從words數組中隨機挑選一個商品名發送出去

              Random random = new Random();

              int index = random.nextInt(words.length);

             

              //經過隨機數拿到一個商品名

              String godName = words[index];

             

             

              //將商品名封裝成tuple,發送消息給下一個組件

              collector.emit(new Values(godName));

             

              //每發送一個消息,休眠500ms

              Utils.sleep(500);

             

             

       }

 

       //初始化方法,在spout組件實例化時調用一次

       @Override

       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

 

              this.collector = collector;

             

             

       }

 

       //聲明本spout組件發送出去的tuple中的數據的字段名

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

 

              declarer.declare(new Fields("orignname"));

             

       }

 

}

 

 

 

UpperBolt.java

package cn.itcast.stormdemo;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class UpperBolt extends BaseBasicBolt{

 

      

       //業務處理邏輯

       @Override

       public void execute(Tuple tuple, BasicOutputCollector collector) {

             

              //先獲取到上一個組件傳遞過來的數據,數據在tuple裏面

              String godName = tuple.getString(0);

             

              //將商品名轉換成大寫

              String godName_upper = godName.toUpperCase();

             

              //將轉換完成的商品名發送出去

              collector.emit(new Values(godName_upper));

             

       }

 

      

      

       //聲明該bolt組件要發出去的tuple的字段

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

             

              declarer.declare(new Fields("uppername"));

       }

 

}

 

 

 

 

SuffixBolt.java

package cn.itcast.stormdemo;

 

import java.io.FileWriter;

import java.io.IOException;

import java.util.Map;

import java.util.UUID;

 

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

 

public class SuffixBolt extends BaseBasicBolt{

      

       FileWriter fileWriter = null;

      

      

       //在bolt組件運行過程當中只會被調用一次

       @Override

       public void prepare(Map stormConf, TopologyContext context) {

 

              try {

                     fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());

              } catch (IOException e) {

                     throw new RuntimeException(e);

              }

             

       }

      

      

      

       //該bolt組件的核心處理邏輯

       //每收到一個tuple消息,就會被調用一次

       @Override

       public void execute(Tuple tuple, BasicOutputCollector collector) {

 

              //先拿到上一個組件發送過來的商品名稱

              String upper_name = tuple.getString(0);

              String suffix_name = upper_name + "_itisok";

             

             

              //爲上一個組件發送過來的商品名稱添加後綴

             

              try {

                     fileWriter.write(suffix_name);

                     fileWriter.write("\n");

                     fileWriter.flush();

                    

              } catch (IOException e) {

                     throw new RuntimeException(e);

              }

             

             

             

       }

 

      

      

      

       //本bolt已經不須要發送tuple消息到下一個組件,因此不須要再聲明tuple的字段

       @Override

       public void declareOutputFields(OutputFieldsDeclarer arg0) {

 

             

       }

 

}

 

 

 

TopoMain.java

package cn.itcast.stormdemo;

 

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.generated.StormTopology;

import backtype.storm.topology.TopologyBuilder;

 

/**

 * 組織各個處理組件造成一個完整的處理流程,就是所謂的topology(相似於mapreduce程序中的job)

 * 而且將該topology提交給storm集羣去運行,topology提交到集羣后就將永無休止地運行,除非人爲或者異常退出

 *

 *

 */

public class TopoMain {

 

      

       public static void main(String[] args) throws Exception {

             

              TopologyBuilder builder = new TopologyBuilder();

             

              //將咱們的spout組件設置到topology中去

              //parallelism_hint :4  表示用4個excutor來執行這個組件

              //setNumTasks(8) 設置的是該組件執行時的併發task數量,也就意味着1個excutor會運行2個task

              builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);

             

              //將大寫轉換bolt組件設置到topology,而且指定它接收randomspout組件的消息

              //.shuffleGrouping("randomspout")包含兩層含義:

              //一、upperbolt組件接收的tuple消息必定來自於randomspout組件

              //二、randomspout組件和upperbolt組件的大量併發task實例之間收發消息時採用的分組策略是隨機分組shuffleGrouping

              builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");

             

              //將添加後綴的bolt組件設置到topology,而且指定它接收upperbolt組件的消息

              builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");

             

              //用builder來建立一個topology

              StormTopology demotop = builder.createTopology();

             

             

              //配置一些topology在集羣中運行時的參數

              Config conf = new Config();

              //這裏設置的是整個demotop所佔用的槽位數,也就是worker的數量

              conf.setNumWorkers(4);

              conf.setDebug(true);

              conf.setNumAckers(0);

             

             

              //將這個topology提交給storm集羣運行

              StormSubmitter.submitTopology("demotopo", conf, demotop);

             

       }

}

 

 

 

 

補充:

 

http://www.cnblogs.com/vincent-vg/p/5850852.html 

 

相關文章
相關標籤/搜索