在基於Hadoop平臺的不少應用場景中,咱們須要對數據進行離線和實時分析,離線分析能夠很容易地藉助於Hive來實現統計分析,可是對於實時的需求Hive就不合適了。實時應用場景可使用Storm,它是一個實時處理系統,它爲實時處理類應用提供了一個計算模型,能夠很容易地進行編程處理。爲了統一離線和實時計算,通常狀況下,咱們都但願將離線和實時計算的數據源的集合統一塊兒來做爲輸入,而後將數據的流向分別經由實時系統和離線分析系統,分別進行分析處理,這時咱們能夠考慮將數據源(如使用Flume收集日誌)直接鏈接一個消息中間件,如Kafka,能夠整合Flume+Kafka,Flume做爲消息的Producer,生產的消息數據(日誌數據、業務請求數據等等)發佈到Kafka中,而後經過訂閱的方式,使用Storm的Topology做爲消息的Consumer,在Storm集羣中分別進行以下兩個需求場景的處理:java
- 直接使用Storm的Topology對數據進行實時分析處理
- 整合Storm+HDFS,將消息處理後寫入HDFS進行離線分析處理
實時處理,只要開發知足業務須要的Topology便可,不作過多說明。這裏,咱們主要從安裝配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS這幾點來配置實踐,知足上面提出的一些需求。配置實踐使用的軟件包以下所示:apache
- zookeeper-3.4.5.tar.gz
- kafka_2.9.2-0.8.1.1.tgz
- apache-storm-0.9.2-incubating.tar.gz
- hadoop-2.2.0.tar.gz
程序配置運行所基於的操做系統爲CentOS 5.11。編程
Kafka安裝配置api
咱們使用3臺機器搭建Kafka集羣:服務器
在安裝Kafka集羣以前,這裏沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集羣,也是使用這3臺機器,保證Zookeeper集羣正常運行。
首先,在h1上準備Kafka安裝文件,執行以下命令:app
3 |
tar xvzf kafka_2.9.2-0.8.1.1.tgz |
4 |
ln -s /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka |
5 |
chown -R kafka:kafka /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka |
修改配置文件/usr/local/kafka/config/server.properties,修改以下內容:框架
2 |
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka |
這裏須要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,若是你有其餘的應用也在使用ZooKeeper集羣,查看ZooKeeper中數據可能會不直觀,因此強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定:dom
1 |
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka |
並且,須要手動在ZooKeeper中建立路徑/kafka,使用以下命令鏈接到任意一臺ZooKeeper服務器:maven
1 |
cd /usr/ local /zookeeper |
在ZooKeeper執行以下命令建立chroot路徑:分佈式
這樣,每次鏈接Kafka集羣的時候(使用--zookeeper
選項),也必須使用帶chroot路徑的鏈接字符串,後面會看到。
而後,將配置好的安裝文件同步到其餘的h二、h3節點上:
1 |
scp -r /usr/ local /kafka_2.9.2-0.8.1.1/ h2:/usr/ local / |
2 |
scp -r /usr/ local /kafka_2.9.2-0.8.1.1/ h3:/usr/ local / |
最後,在h二、h3節點上配置,執行以下命令:
2 |
ln -s /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka |
3 |
chown -R kafka:kafka /usr/ local /kafka_2.9.2-0.8.1.1 /usr/ local /kafka |
並修改配置文件/usr/local/kafka/config/server.properties內容以下所示:
由於Kafka集羣須要保證各個Broker的id在整個集羣中必須惟一,須要調整這個配置項的值(若是在單機上,能夠經過創建多個Broker進程來模擬分佈式的Kafka集羣,也須要Broker的id惟一,還須要修改一些配置目錄的信息)。
在集羣中的h一、h二、h3這三個節點上分別啓動Kafka,分別執行以下命令:
1 |
bin/kafka-server-start.sh /usr/ local /kafka/config/server.properties & |
能夠經過查看日誌,或者檢查進程狀態,保證Kafka集羣啓動成功。
咱們建立一個名稱爲my-replicated-topic5的Topic,5個分區,而且複製因子爲3,執行以下命令:
1 |
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5 |
查看建立的Topic,執行以下命令:
1 |
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5 |
結果信息以下所示:
1 |
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: |
2 |
Topic: my-replicated-topic5 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 |
3 |
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1 |
4 |
Topic: my-replicated-topic5 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,0,1 |
5 |
Topic: my-replicated-topic5 Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1 |
6 |
Topic: my-replicated-topic5 Partition: 4 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1 |
上面Leader、Replicas、Isr的含義以下:
3 |
Replicas : 複製該分區log的節點列表 |
4 |
Isr : "in-sync" replicas,當前活躍的副本列表(是一個子集),而且可能成爲Leader |
咱們能夠經過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示若是發佈消息、消費消息。
在一個終端,啓動Producer,並向咱們上面建立的名稱爲my-replicated-topic5的Topic中生產消息,執行以下腳本:
1 |
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5 |
在另外一個終端,啓動Consumer,並訂閱咱們上面建立的名稱爲my-replicated-topic5的Topic中生產的消息,執行以下腳本:
1 |
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5 |
能夠在Producer終端上輸入字符串消息行,而後回車,就能夠在Consumer終端上看到消費者消費的消息內容。
也能夠參考Kafka的Producer和Consumer的Java API,經過API編碼的方式來實現消息生產和消費的處理邏輯。
Storm安裝配置
Storm集羣也依賴Zookeeper集羣,要保證Zookeeper集羣正常運行。Storm的安裝配置比較簡單,咱們仍然使用下面3臺機器搭建:
首先,在h1節點上,執行以下命令安裝:
3 |
tar xvzf apache-storm-0.9.2-incubating. tar .gz |
4 |
ln -s /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm |
5 |
chown -R storm:storm /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm |
而後,修改配置文件conf/storm.yaml,內容以下所示:
01 |
storm.zookeeper.servers: |
05 |
storm.zookeeper.port: 2181 |
09 |
supervisor.slots.ports: |
15 |
storm.local.dir: "/tmp/storm" |
將配置好的安裝文件,分發到其餘節點上:
1 |
scp -r /usr/ local /apache-storm-0.9.2-incubating/ h2:/usr/ local / |
2 |
scp -r /usr/ local /apache-storm-0.9.2-incubating/ h3:/usr/ local / |
最後,在h二、h3節點上配置,執行以下命令:
2 |
ln -s /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm |
3 |
chown -R storm:storm /usr/ local /apache-storm-0.9.2-incubating /usr/ local /storm |
Storm集羣的主節點爲Nimbus,從節點爲Supervisor,咱們須要在h1上啓動Nimbus服務,在從節點h二、h3上啓動Supervisor服務:
爲了方便監控,能夠啓動Storm UI,能夠從Web頁面上監控Storm Topology的運行狀態,例如在h2上啓動:
這樣能夠經過訪問http://h2:8080/來查看Topology的運行情況。
整合Kafka+Storm
消息經過各類方式進入到Kafka消息中間件,好比能夠經過使用Flume來收集日誌數據,而後在Kafka中路由暫存,而後再由實時計算程序Storm作實時分析,這時咱們就須要將在Storm的Spout中讀取Kafka中的消息,而後交由具體的Spot組件去分析處理。實際上,apache-storm-0.9.2-incubating這個版本的Storm已經自帶了一個集成Kafka的外部插件程序storm-kafka,能夠直接使用,例如我使用的Maven依賴配置,以下所示:
02 |
< groupId >org.apache.storm</ groupId > |
03 |
< artifactId >storm-core</ artifactId > |
04 |
< version >0.9.2-incubating</ version > |
05 |
< scope >provided</ scope > |
08 |
< groupId >org.apache.storm</ groupId > |
09 |
< artifactId >storm-kafka</ artifactId > |
10 |
< version >0.9.2-incubating</ version > |
13 |
< groupId >org.apache.kafka</ groupId > |
14 |
< artifactId >kafka_2.9.2</ artifactId > |
15 |
< version >0.8.1.1</ version > |
18 |
< groupId >org.apache.zookeeper</ groupId > |
19 |
< artifactId >zookeeper</ artifactId > |
22 |
< groupId >log4j</ groupId > |
23 |
< artifactId >log4j</ artifactId > |
下面,咱們開發了一個簡單WordCount示例程序,從Kafka讀取訂閱的消息行,經過空格拆分出單個單詞,而後再作詞頻統計計算,實現的Topology的代碼,以下所示:
001 |
package org.shirdrn.storm.examples; |
003 |
import java.util.Arrays; |
004 |
import java.util.HashMap; |
005 |
import java.util.Iterator; |
006 |
import java.util.Map; |
007 |
import java.util.Map.Entry; |
008 |
import java.util.concurrent.atomic.AtomicInteger; |
010 |
import org.apache.commons.logging.Log; |
011 |
import org.apache.commons.logging.LogFactory; |
013 |
import storm.kafka.BrokerHosts; |
014 |
import storm.kafka.KafkaSpout; |
015 |
import storm.kafka.SpoutConfig; |
016 |
import storm.kafka.StringScheme; |
017 |
import storm.kafka.ZkHosts; |
018 |
import backtype.storm.Config; |
019 |
import backtype.storm.LocalCluster; |
020 |
import backtype.storm.StormSubmitter; |
021 |
import backtype.storm.generated.AlreadyAliveException; |
022 |
import backtype.storm.generated.InvalidTopologyException; |
023 |
import backtype.storm.spout.SchemeAsMultiScheme; |
024 |
import backtype.storm.task.OutputCollector; |
025 |
import backtype.storm.task.TopologyContext; |
026 |
import backtype.storm.topology.OutputFieldsDeclarer; |
027 |
import backtype.storm.topology.TopologyBuilder; |
028 |
import backtype.storm.topology.base.BaseRichBolt; |
029 |
import backtype.storm.tuple.Fields; |
030 |
import backtype.storm.tuple.Tuple; |
031 |
import backtype.storm.tuple.Values; |
033 |
public class MyKafkaTopology { |
035 |
public static class KafkaWordSplitter extends BaseRichBolt { |
037 |
private static final Log LOG = LogFactory.getLog(KafkaWordSplitter. class ); |
038 |
private static final long serialVersionUID = 886149197481637894L; |
039 |
private OutputCollector collector; |
042 |
public void prepare(Map stormConf, TopologyContext context, |
043 |
OutputCollector collector) { |
044 |
this .collector = collector; |
048 |
public void execute(Tuple input) { |
049 |
String line = input.getString( 0 ); |
050 |
LOG.info( "RECV[kafka -> splitter] " + line); |
051 |
String[] words = line.split( "\\s+" ); |
052 |
for (String word : words) { |
053 |
LOG.info( "EMIT[splitter -> counter] " + word); |
054 |
collector.emit(input, new Values(word, 1 )); |
056 |
collector.ack(input); |
060 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
061 |
declarer.declare( new Fields( "word" , "count" )); |
066 |
public static class WordCounter extends BaseRichBolt { |
068 |
private static final Log LOG = LogFactory.getLog(WordCounter. class ); |
069 |
private static final long serialVersionUID = 886149197481637894L; |
070 |
private OutputCollector collector; |
071 |
private Map<String, AtomicInteger> counterMap; |
074 |
public void prepare(Map stormConf, TopologyContext context, |
075 |
OutputCollector collector) { |
076 |
this .collector = collector; |
077 |
this .counterMap = new HashMap<String, AtomicInteger>(); |
081 |
public void execute(Tuple input) { |
082 |
String word = input.getString( 0 ); |
083 |
int count = input.getInteger( 1 ); |
084 |
LOG.info( "RECV[splitter -> counter] " + word + " : " + count); |
085 |
AtomicInteger ai = this .counterMap.get(word); |
087 |
ai = new AtomicInteger(); |
088 |
this .counterMap.put(word, ai); |
091 |
collector.ack(input); |
092 |
LOG.info( "CHECK statistics map: " + this .counterMap); |
096 |
public void cleanup() { |
097 |
LOG.info( "The final result:" ); |
098 |
Iterator<Entry<String, AtomicInteger>> iter = this .counterMap.entrySet().iterator(); |
099 |
while (iter.hasNext()) { |
100 |
Entry<String, AtomicInteger> entry = iter.next(); |
101 |
LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get()); |
107 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
108 |
declarer.declare( new Fields( "word" , "count" )); |
112 |
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { |
113 |
String zks = "h1:2181,h2:2181,h3:2181" ; |
114 |
String topic = "my-replicated-topic5" ; |
115 |
String zkRoot = "/storm" ; // default zookeeper root configuration for storm |
118 |
BrokerHosts brokerHosts = new ZkHosts(zks); |
119 |
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); |
120 |
spoutConf.scheme = new SchemeAsMultiScheme( new StringScheme()); |
121 |
spoutConf.forceFromStart = false ; |
122 |
spoutConf.zkServers = Arrays.asList( new String[] { "h1" , "h2" , "h3" }); |
123 |
spoutConf.zkPort = 2181 ; |
125 |
TopologyBuilder builder = new TopologyBuilder(); |
126 |
builder.setSpout( "kafka-reader" , new KafkaSpout(spoutConf), 5 ); // Kafka咱們建立了一個5分區的Topic,這裏並行度設置爲5 |
127 |
builder.setBolt( "word-splitter" , new KafkaWordSplitter(), 2 ).shuffleGrouping( "kafka-reader" ); |
128 |
builder.setBolt( "word-counter" , new WordCounter()).fieldsGrouping( "word-splitter" , new Fields( "word" )); |
130 |
Config conf = new Config(); |
132 |
String name = MyKafkaTopology. class .getSimpleName(); |
133 |
if (args != null && args.length > 0 ) { |
134 |
// Nimbus host name passed from command line |
135 |
conf.put(Config.NIMBUS_HOST, args[ 0 ]); |
136 |
conf.setNumWorkers( 3 ); |
137 |
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
139 |
conf.setMaxTaskParallelism( 3 ); |
140 |
LocalCluster cluster = new LocalCluster(); |
141 |
cluster.submitTopology(name, conf, builder.createTopology()); |
上面程序,在本地調試(使用LocalCluster)不須要輸入任何參數,提交到實際集羣中運行時,須要傳遞一個參數,該參數爲Nimbus的主機名稱。
經過Maven構建,生成一個包含依賴的single jar文件(不要把Storm的依賴包添加進去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集羣以前,由於用到了Kafka,須要拷貝一下依賴jar文件到Storm集羣中的lib目錄下面:
1 |
cp /usr/ local /kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/ local /storm/lib/ |
2 |
cp /usr/ local /kafka/libs/scala-library-2.9.2.jar /usr/ local /storm/lib/ |
3 |
cp /usr/ local /kafka/libs/metrics-core-2.2.0.jar /usr/ local /storm/lib/ |
4 |
cp /usr/ local /kafka/libs/snappy-java-1.0.5.jar /usr/ local /storm/lib/ |
5 |
cp /usr/ local /kafka/libs/zkclient-0.3.jar /usr/ local /storm/lib/ |
6 |
cp /usr/ local /kafka/libs/log4j-1.2.15.jar /usr/ local /storm/lib/ |
7 |
cp /usr/ local /kafka/libs/slf4j-api-1.7.2.jar /usr/ local /storm/lib/ |
8 |
cp /usr/ local /kafka/libs/jopt-simple-3.2.jar /usr/ local /storm/lib/ |
而後,就能夠提交咱們開發的Topology程序了:
1 |
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1 |
能夠經過查看日誌文件(logs/目錄下)或者Storm UI來監控Topology的運行情況。若是程序沒有錯誤,可使用前面咱們使用的Kafka Producer來生成消息,就能看到咱們開發的Storm Topology可以實時接收到並進行處理。
上面Topology實現代碼中,有一個很關鍵的配置對象SpoutConfig,配置屬性以下所示:
1 |
spoutConf.forceFromStart = false ; |
該配置是指,若是該Topology因故障中止處理,下次正常運行時是否從Spout對應數據源Kafka中的該訂閱Topic的起始位置開始讀取,若是forceFromStart=true,則以前處理過的Tuple還要從新處理一遍,不然會從上次處理的位置繼續處理,保證Kafka中的Topic數據不被重複處理,是在數據源的位置進行狀態記錄。
整合Storm+HDFS
Storm實時計算集羣從Kafka消息中間件中消費消息,有實時處理需求的能夠走實時處理程序,還有須要進行離線分析的需求,如寫入到HDFS進行分析。下面實現了一個Topology,代碼以下所示:
001 |
package org.shirdrn.storm.examples; |
003 |
import java.text.DateFormat; |
004 |
import java.text.SimpleDateFormat; |
005 |
import java.util.Date; |
006 |
import java.util.Map; |
007 |
import java.util.Random; |
009 |
import org.apache.commons.logging.Log; |
010 |
import org.apache.commons.logging.LogFactory; |
011 |
import org.apache.storm.hdfs.bolt.HdfsBolt; |
012 |
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; |
013 |
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; |
014 |
import org.apache.storm.hdfs.bolt.format.FileNameFormat; |
015 |
import org.apache.storm.hdfs.bolt.format.RecordFormat; |
016 |
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; |
017 |
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; |
018 |
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; |
019 |
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; |
020 |
import org.apache.storm.hdfs.bolt.sync.SyncPolicy; |
022 |
import backtype.storm.Config; |
023 |
import backtype.storm.LocalCluster; |
024 |
import backtype.storm.StormSubmitter; |
025 |
import backtype.storm.generated.AlreadyAliveException; |
026 |
import backtype.storm.generated.InvalidTopologyException; |
027 |
import backtype.storm.spout.SpoutOutputCollector; |
028 |
import backtype.storm.task.TopologyContext; |
029 |
import backtype.storm.topology.OutputFieldsDeclarer; |
030 |
import backtype.storm.topology.TopologyBuilder; |
031 |
import backtype.storm.topology.base.BaseRichSpout; |
032 |
import backtype.storm.tuple.Fields; |
033 |
import backtype.storm.tuple.Values; |
034 |
import backtype.storm.utils.Utils; |
036 |
public class StormToHDFSTopology { |
038 |
public static class EventSpout extends BaseRichSpout { |
040 |
private static final Log LOG = LogFactory.getLog(EventSpout. class ); |
041 |
private static final long serialVersionUID = 886149197481637894L; |
042 |
private SpoutOutputCollector collector; |
044 |
private String[] records; |
047 |
public void open(Map conf, TopologyContext context, |
048 |
SpoutOutputCollector collector) { |
049 |
this .collector = collector; |
051 |
records = new String[] { |
052 |
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" , |
053 |
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02" , |
054 |
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" |
060 |
public void nextTuple() { |
062 |
DateFormat df = new SimpleDateFormat( "yyyy-MM-dd_HH-mm-ss" ); |
063 |
Date d = new Date(System.currentTimeMillis()); |
064 |
String minute = df.format(d); |
065 |
String record = records[rand.nextInt(records.length)]; |
066 |
LOG.info( "EMIT[spout -> hdfs] " + minute + " : " + record); |
067 |
collector.emit( new Values(minute, record)); |
071 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
072 |
declarer.declare( new Fields( "minute" , "record" )); |
078 |
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { |
079 |
// use "|" instead of "," for field delimiter |
080 |
RecordFormat format = new DelimitedRecordFormat() |
081 |
.withFieldDelimiter( " : " ); |
083 |
// sync the filesystem after every 1k tuples |
084 |
SyncPolicy syncPolicy = new CountSyncPolicy( 1000 ); |
087 |
FileRotationPolicy rotationPolicy = new TimedRotationPolicy( 1 .0f, TimeUnit.MINUTES); |
089 |
FileNameFormat fileNameFormat = new DefaultFileNameFormat() |
090 |
.withPath( "/storm/" ).withPrefix( "app_" ).withExtension( ".log" ); |
092 |
HdfsBolt hdfsBolt = new HdfsBolt() |
093 |
.withFsUrl( "hdfs://h1:8020" ) |
094 |
.withFileNameFormat(fileNameFormat) |
095 |
.withRecordFormat(format) |
096 |
.withRotationPolicy(rotationPolicy) |
097 |
.withSyncPolicy(syncPolicy); |
099 |
TopologyBuilder builder = new TopologyBuilder(); |
100 |
builder.setSpout( "event-spout" , new EventSpout(), 3 ); |
101 |
builder.setBolt( "hdfs-bolt" , hdfsBolt, 2 ).fieldsGrouping( "event-spout" , new Fields( "minute" )); |
103 |
Config conf = new Config(); |
105 |
String name = StormToHDFSTopology. class .getSimpleName(); |
106 |
if (args != null && args.length > 0 ) { |
107 |
conf.put(Config.NIMBUS_HOST, args[ 0 ]); |
108 |
conf.setNumWorkers( 3 ); |
109 |
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
111 |
conf.setMaxTaskParallelism( 3 ); |
112 |
LocalCluster cluster = new LocalCluster(); |
113 |
cluster.submitTopology(name, conf, builder.createTopology()); |
上面的處理邏輯,能夠對HdfsBolt進行更加詳細的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(能夠設置在知足什麼條件下,切出一個新的日誌,如能夠指定多長時間切出一個新的日誌文件,能夠指定一個日誌文件大小達到設置值後,再寫一個新日誌文件),更多設置能夠參考storm-hdfs,。
上面代碼在打包的時候,須要注意,使用storm-starter自帶的Maven打包配置,可能在將Topology部署運行的時候,會報錯,可使用maven-shade-plugin這個插件,以下配置所示:
02 |
< groupId >org.apache.maven.plugins</ groupId > |
03 |
< artifactId >maven-shade-plugin</ artifactId > |
04 |
< version >1.4</ version > |
06 |
< createDependencyReducedPom >true</ createDependencyReducedPom > |
10 |
< phase >package</ phase > |
17 |
implementation = "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> |
19 |
implementation = "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > |
20 |
< mainClass ></ mainClass > |
整合Kafka+Storm+HDFS
上面分別對整合Kafka+Storm和Storm+HDFS作了實踐,能夠將後者的Spout改爲前者的Spout,從Kafka中消費消息,在Storm中能夠作簡單處理,而後將數據寫入HDFS,最後能夠在Hadoop平臺上對數據進行離線分析處理。下面,寫了一個簡單的例子,從Kafka消費消息,而後經由Storm處理,寫入到HDFS存儲,代碼以下所示:
001 |
package org.shirdrn.storm.examples; |
003 |
import java.util.Arrays; |
004 |
import java.util.Map; |
006 |
import org.apache.commons.logging.Log; |
007 |
import org.apache.commons.logging.LogFactory; |
008 |
import org.apache.storm.hdfs.bolt.HdfsBolt; |
009 |
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; |
010 |
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; |
011 |
import org.apache.storm.hdfs.bolt.format.FileNameFormat; |
012 |
import org.apache.storm.hdfs.bolt.format.RecordFormat; |
013 |
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; |
014 |
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; |
015 |
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; |
016 |
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; |
017 |
import org.apache.storm.hdfs.bolt.sync.SyncPolicy; |
019 |
import storm.kafka.BrokerHosts; |
020 |
import storm.kafka.KafkaSpout; |
021 |
import storm.kafka.SpoutConfig; |
022 |
import storm.kafka.StringScheme; |
023 |
import storm.kafka.ZkHosts; |
024 |
import backtype.storm.Config; |
025 |
import backtype.storm.LocalCluster; |
026 |
import backtype.storm.StormSubmitter; |
027 |
import backtype.storm.generated.AlreadyAliveException; |
028 |
import backtype.storm.generated.InvalidTopologyException; |
029 |
import backtype.storm.spout.SchemeAsMultiScheme; |
030 |
import backtype.storm.task.OutputCollector; |
031 |
import backtype.storm.task.TopologyContext; |
032 |
import backtype.storm.topology.OutputFieldsDeclarer; |
033 |
import backtype.storm.topology.TopologyBuilder; |
034 |
import backtype.storm.topology.base.BaseRichBolt; |
035 |
import backtype.storm.tuple.Fields; |
036 |
import backtype.storm.tuple.Tuple; |
037 |
import backtype.storm.tuple.Values; |
039 |
public class DistributeWordTopology { |
041 |
public static class KafkaWordToUpperCase extends BaseRichBolt { |
043 |
private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase. class ); |
044 |
private static final long serialVersionUID = -5207232012035109026L; |
045 |
private OutputCollector collector; |
048 |
public void prepare(Map stormConf, TopologyContext context, |
049 |
OutputCollector collector) { |
050 |
this .collector = collector; |
054 |
public void execute(Tuple input) { |
055 |
String line = input.getString( 0 ).trim(); |
056 |
LOG.info( "RECV[kafka -> splitter] " + line); |
057 |
if (!line.isEmpty()) { |
058 |
String upperLine = line.toUpperCase(); |
059 |
LOG.info( "EMIT[splitter -> counter] " + upperLine); |
060 |
collector.emit(input, new Values(upperLine, upperLine.length())); |
062 |
collector.ack(input); |
066 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
067 |
declarer.declare( new Fields( "line" , "len" )); |
072 |
public static class RealtimeBolt extends BaseRichBolt { |
074 |
private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase. class ); |
075 |
private static final long serialVersionUID = -4115132557403913367L; |
076 |
private OutputCollector collector; |
079 |
public void prepare(Map stormConf, TopologyContext context, |
080 |
OutputCollector collector) { |
081 |
this .collector = collector; |
085 |
public void execute(Tuple input) { |
086 |
String line = input.getString( 0 ).trim(); |
087 |
LOG.info( "REALTIME: " + line); |
088 |
collector.ack(input); |
092 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
098 |
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { |
101 |
String zks = "h1:2181,h2:2181,h3:2181" ; |
102 |
String topic = "my-replicated-topic5" ; |
103 |
String zkRoot = "/storm" ; // default zookeeper root configuration for storm |
105 |
BrokerHosts brokerHosts = new ZkHosts(zks); |
106 |
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); |
107 |
spoutConf.scheme = new SchemeAsMultiScheme( new StringScheme()); |
108 |
spoutConf.forceFromStart = false ; |
109 |
spoutConf.zkServers = Arrays.asList( new String[] { "h1" , "h2" , "h3" }); |
110 |
spoutConf.zkPort = 2181 ; |
112 |
// Configure HDFS bolt |
113 |
RecordFormat format = new DelimitedRecordFormat() |
114 |
.withFieldDelimiter( "\t" ); // use "\t" instead of "," for field delimiter |
115 |
SyncPolicy syncPolicy = new CountSyncPolicy( 1000 ); // sync the filesystem after every 1k tuples |
116 |
FileRotationPolicy rotationPolicy = new TimedRotationPolicy( 1 .0f, TimeUnit.MINUTES); // rotate files |
117 |
FileNameFormat fileNameFormat = new DefaultFileNameFormat() |
118 |
.withPath( "/storm/" ).withPrefix( "app_" ).withExtension( ".log" ); // set file name format |
119 |
HdfsBolt hdfsBolt = new HdfsBolt() |
120 |
.withFsUrl( "hdfs://h1:8020" ) |
121 |
.withFileNameFormat(fileNameFormat) |
122 |
.withRecordFormat(format) |
123 |
.withRotationPolicy(rotationPolicy) |
124 |
.withSyncPolicy(syncPolicy); |
126 |
// configure & build topology |
127 |
TopologyBuilder builder = new TopologyBuilder(); |
128 |
builder.setSpout( "kafka-reader" , new KafkaSpout(spoutConf), 5 ); |
129 |
builder.setBolt( "to-upper" , new KafkaWordToUpperCase(), 3 ).shuffleGrouping( "kafka-reader" ); |
130 |
builder.setBolt( "hdfs-bolt" , hdfsBolt, 2 ).shuffleGrouping( "to-upper" ); |
131 |
builder.setBolt( "realtime" , new RealtimeBolt(), 2 ).shuffleGrouping( "to-upper" ); |
134 |
Config conf = new Config(); |
135 |
String name = DistributeWordTopology. class .getSimpleName(); |
136 |
if (args != null && args.length > 0 ) { |
137 |
String nimbus = args[ 0 ]; |
138 |
conf.put(Config.NIMBUS_HOST, nimbus); |
139 |
conf.setNumWorkers( 3 ); |
140 |
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
142 |
conf.setMaxTaskParallelism( 3 ); |
143 |
LocalCluster cluster = new LocalCluster(); |
144 |
cluster.submitTopology(name, conf, builder.createTopology()); |
上面代碼中,名稱爲to-upper的Bolt將接收到的字符串行轉換成大寫之後,會將處理過的數據向後面的hdfs-bolt、realtime這兩個Bolt各發一份拷貝,而後由這兩個Bolt分別根據實際須要(實時/離線)單獨處理。
打包後,在Storm集羣上部署並運行這個Topology:
1 |
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1 |
能夠經過Storm UI查看Topology運行狀況,能夠查看HDFS上生成的數據。



