Kafka+Storm+HDFS整合實踐

在基於Hadoop平臺的不少應用場景中,咱們須要對數據進行離線和實時分析,離線分析能夠很容易地藉助於Hive來實現統計分析,可是對於實時的需求Hive就不合適了。實時應用場景可使用Storm,它是一個實時處理系統,它爲實時處理類應用提供了一個計算模型,能夠很容易地進行編程處理。爲了統一離線和實時計算,通常狀況下,咱們都但願將離線和實時計算的數據源的集合統一塊兒來做爲輸入,而後將數據的流向分別經由實時系統和離線分析系統,分別進行分析處理,這時咱們能夠考慮將數據源(如使用Flume收集日誌)直接鏈接一個消息中間件,如Kafka,能夠整合Flume+Kafka,Flume做爲消息的Producer,生產的消息數據(日誌數據、業務請求數據等等)發佈到Kafka中,而後經過訂閱的方式,使用Storm的Topology做爲消息的Consumer,在Storm集羣中分別進行以下兩個需求場景的處理:java

  • 直接使用Storm的Topology對數據進行實時分析處理
  • 整合Storm+HDFS,將消息處理後寫入HDFS進行離線分析處理

實時處理,只要開發知足業務須要的Topology便可,不作過多說明。這裏,咱們主要從安裝配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS這幾點來配置實踐,知足上面提出的一些需求。配置實踐使用的軟件包以下所示:apache

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置運行所基於的操做系統爲CentOS 5.11。編程

Kafka安裝配置api

咱們使用3臺機器搭建Kafka集羣:服務器

1 192.168.4.142   h1
2 192.168.4.143   h2
3 192.168.4.144   h3

在安裝Kafka集羣以前,這裏沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集羣,也是使用這3臺機器,保證Zookeeper集羣正常運行。
首先,在h1上準備Kafka安裝文件,執行以下命令:app

1 cd /usr/local/
2 wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
3 tar xvzf kafka_2.9.2-0.8.1.1.tgz
4 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
5 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置文件/usr/local/kafka/config/server.properties,修改以下內容:框架

1 broker.id=0
2 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

這裏須要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,若是你有其餘的應用也在使用ZooKeeper集羣,查看ZooKeeper中數據可能會不直觀,因此強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定:dom

1 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

並且,須要手動在ZooKeeper中建立路徑/kafka,使用以下命令鏈接到任意一臺ZooKeeper服務器:maven

1 cd /usr/local/zookeeper
2 bin/zkCli.sh

在ZooKeeper執行以下命令建立chroot路徑:分佈式

1 create /kafka ''

這樣,每次鏈接Kafka集羣的時候(使用--zookeeper選項),也必須使用帶chroot路徑的鏈接字符串,後面會看到。
而後,將配置好的安裝文件同步到其餘的h二、h3節點上:

1 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
2 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/

最後,在h二、h3節點上配置,執行以下命令:

1 cd /usr/local/
2 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
3 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

並修改配置文件/usr/local/kafka/config/server.properties內容以下所示:

1 broker.id=1  # 在h1修改
2  
3 broker.id=2  # 在h2修改

由於Kafka集羣須要保證各個Broker的id在整個集羣中必須惟一,須要調整這個配置項的值(若是在單機上,能夠經過創建多個Broker進程來模擬分佈式的Kafka集羣,也須要Broker的id惟一,還須要修改一些配置目錄的信息)。
在集羣中的h一、h二、h3這三個節點上分別啓動Kafka,分別執行以下命令:

1 bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

能夠經過查看日誌,或者檢查進程狀態,保證Kafka集羣啓動成功。
咱們建立一個名稱爲my-replicated-topic5的Topic,5個分區,而且複製因子爲3,執行以下命令:

1 bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

查看建立的Topic,執行以下命令:

1 bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

結果信息以下所示:

1 Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
2      Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
3      Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
4      Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
5      Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
6      Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1

上面Leader、Replicas、Isr的含義以下:

1 Partition: 分區
2 Leader   : 負責讀寫指定分區的節點
3 Replicas : 複製該分區log的節點列表
4 Isr      : "in-sync" replicas,當前活躍的副本列表(是一個子集),而且可能成爲Leader

咱們能夠經過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示若是發佈消息、消費消息。
在一個終端,啓動Producer,並向咱們上面建立的名稱爲my-replicated-topic5的Topic中生產消息,執行以下腳本:

1 bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

在另外一個終端,啓動Consumer,並訂閱咱們上面建立的名稱爲my-replicated-topic5的Topic中生產的消息,執行以下腳本:

1 bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

能夠在Producer終端上輸入字符串消息行,而後回車,就能夠在Consumer終端上看到消費者消費的消息內容。
也能夠參考Kafka的Producer和Consumer的Java API,經過API編碼的方式來實現消息生產和消費的處理邏輯。

Storm安裝配置

Storm集羣也依賴Zookeeper集羣,要保證Zookeeper集羣正常運行。Storm的安裝配置比較簡單,咱們仍然使用下面3臺機器搭建:

1 192.168.4.142   h1
2 192.168.4.143   h2
3 192.168.4.144   h3

首先,在h1節點上,執行以下命令安裝:

1 cd /usr/local/
2 wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
3 tar xvzf apache-storm-0.9.2-incubating.tar.gz
4 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
5 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

而後,修改配置文件conf/storm.yaml,內容以下所示:

01 storm.zookeeper.servers:
02      - "h1"
03      - "h2"
04      - "h3"
05 storm.zookeeper.port: 2181
06 #
07 nimbus.host: "h1"
08  
09 supervisor.slots.ports:
10     - 6700
11     - 6701
12     - 6702
13     - 6703
14  
15 storm.local.dir: "/tmp/storm"

將配置好的安裝文件,分發到其餘節點上:

1 scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
2 scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

最後,在h二、h3節點上配置,執行以下命令:

1 cd /usr/local/
2 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
3 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

Storm集羣的主節點爲Nimbus,從節點爲Supervisor,咱們須要在h1上啓動Nimbus服務,在從節點h二、h3上啓動Supervisor服務:

1 bin/storm nimbus &
2 bin/storm supervisor &

爲了方便監控,能夠啓動Storm UI,能夠從Web頁面上監控Storm Topology的運行狀態,例如在h2上啓動:

1 bin/storm ui &

這樣能夠經過訪問http://h2:8080/來查看Topology的運行情況。

整合Kafka+Storm

消息經過各類方式進入到Kafka消息中間件,好比能夠經過使用Flume來收集日誌數據,而後在Kafka中路由暫存,而後再由實時計算程序Storm作實時分析,這時咱們就須要將在Storm的Spout中讀取Kafka中的消息,而後交由具體的Spot組件去分析處理。實際上,apache-storm-0.9.2-incubating這個版本的Storm已經自帶了一個集成Kafka的外部插件程序storm-kafka,能夠直接使用,例如我使用的Maven依賴配置,以下所示:

01 <dependency>
02      <groupId>org.apache.storm</groupId>
03      <artifactId>storm-core</artifactId>
04      <version>0.9.2-incubating</version>
05      <scope>provided</scope>
06 </dependency>
07 <dependency>
08      <groupId>org.apache.storm</groupId>
09      <artifactId>storm-kafka</artifactId>
10      <version>0.9.2-incubating</version>
11 </dependency>
12 <dependency>
13      <groupId>org.apache.kafka</groupId>
14      <artifactId>kafka_2.9.2</artifactId>
15      <version>0.8.1.1</version>
16      <exclusions>
17           <exclusion>
18                <groupId>org.apache.zookeeper</groupId>
19                <artifactId>zookeeper</artifactId>
20           </exclusion>
21           <exclusion>
22                <groupId>log4j</groupId>
23                <artifactId>log4j</artifactId>
24           </exclusion>
25      </exclusions>
26 </dependency>

下面,咱們開發了一個簡單WordCount示例程序,從Kafka讀取訂閱的消息行,經過空格拆分出單個單詞,而後再作詞頻統計計算,實現的Topology的代碼,以下所示:

001 package org.shirdrn.storm.examples;
002  
003 import java.util.Arrays;
004 import java.util.HashMap;
005 import java.util.Iterator;
006 import java.util.Map;
007 import java.util.Map.Entry;
008 import java.util.concurrent.atomic.AtomicInteger;
009  
010 import org.apache.commons.logging.Log;
011 import org.apache.commons.logging.LogFactory;
012  
013 import storm.kafka.BrokerHosts;
014 import storm.kafka.KafkaSpout;
015 import storm.kafka.SpoutConfig;
016 import storm.kafka.StringScheme;
017 import storm.kafka.ZkHosts;
018 import backtype.storm.Config;
019 import backtype.storm.LocalCluster;
020 import backtype.storm.StormSubmitter;
021 import backtype.storm.generated.AlreadyAliveException;
022 import backtype.storm.generated.InvalidTopologyException;
023 import backtype.storm.spout.SchemeAsMultiScheme;
024 import backtype.storm.task.OutputCollector;
025 import backtype.storm.task.TopologyContext;
026 import backtype.storm.topology.OutputFieldsDeclarer;
027 import backtype.storm.topology.TopologyBuilder;
028 import backtype.storm.topology.base.BaseRichBolt;
029 import backtype.storm.tuple.Fields;
030 import backtype.storm.tuple.Tuple;
031 import backtype.storm.tuple.Values;
032  
033 public class MyKafkaTopology {
034  
035      public static class KafkaWordSplitter extends BaseRichBolt {
036  
037           private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
038           private static final long serialVersionUID = 886149197481637894L;
039           private OutputCollector collector;
040           
041           @Override
042           public void prepare(Map stormConf, TopologyContext context,
043                     OutputCollector collector) {
044                this.collector = collector;             
045           }
046  
047           @Override
048           public void execute(Tuple input) {
049                String line = input.getString(0);
050                LOG.info("RECV[kafka -> splitter] " + line);
051                String[] words = line.split("\\s+");
052                for(String word : words) {
053                     LOG.info("EMIT[splitter -> counter] " + word);
054                     collector.emit(input, new Values(word, 1));
055                }
056                collector.ack(input);
057           }
058  
059           @Override
060           public void declareOutputFields(OutputFieldsDeclarer declarer) {
061                declarer.declare(new Fields("word""count"));        
062           }
063           
064      }
065      
066      public static class WordCounter extends BaseRichBolt {
067  
068           private static final Log LOG = LogFactory.getLog(WordCounter.class);
069           private static final long serialVersionUID = 886149197481637894L;
070           private OutputCollector collector;
071           private Map<String, AtomicInteger> counterMap;
072           
073           @Override
074           public void prepare(Map stormConf, TopologyContext context,
075                     OutputCollector collector) {
076                this.collector = collector;   
077                this.counterMap = new HashMap<String, AtomicInteger>();
078           }
079  
080           @Override
081           public void execute(Tuple input) {
082                String word = input.getString(0);
083                int count = input.getInteger(1);
084                LOG.info("RECV[splitter -> counter] " + word + " : " + count);
085                AtomicInteger ai = this.counterMap.get(word);
086                if(ai == null) {
087                     ai = new AtomicInteger();
088                     this.counterMap.put(word, ai);
089                }
090                ai.addAndGet(count);
091                collector.ack(input);
092                LOG.info("CHECK statistics map: " this.counterMap);
093           }
094  
095           @Override
096           public void cleanup() {
097                LOG.info("The final result:");
098                Iterator<Entry<String, AtomicInteger>> iter =this.counterMap.entrySet().iterator();
099                while(iter.hasNext()) {
100                     Entry<String, AtomicInteger> entry = iter.next();
101                     LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
102                }
103                
104           }
105  
106           @Override
107           public void declareOutputFields(OutputFieldsDeclarer declarer) {
108                declarer.declare(new Fields("word""count"));        
109           }
110      }
111      
112      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
113           String zks = "h1:2181,h2:2181,h3:2181";
114           String topic = "my-replicated-topic5";
115           String zkRoot = "/storm"// default zookeeper root configuration for storm
116           String id = "word";
117           
118           BrokerHosts brokerHosts = new ZkHosts(zks);
119           SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
120           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
121           spoutConf.forceFromStart = false;
122           spoutConf.zkServers = Arrays.asList(new String[] {"h1""h2""h3"});
123           spoutConf.zkPort = 2181;
124           
125           TopologyBuilder builder = new TopologyBuilder();
126           builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5); // Kafka咱們建立了一個5分區的Topic,這裏並行度設置爲5
127           builder.setBolt("word-splitter"new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
128           builder.setBolt("word-counter"new WordCounter()).fieldsGrouping("word-splitter",new Fields("word"));
129           
130           Config conf = new Config();
131           
132           String name = MyKafkaTopology.class.getSimpleName();
133           if (args != null && args.length > 0) {
134                // Nimbus host name passed from command line
135                conf.put(Config.NIMBUS_HOST, args[0]);
136                conf.setNumWorkers(3);
137                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
138           else {
139                conf.setMaxTaskParallelism(3);
140                LocalCluster cluster = new LocalCluster();
141                cluster.submitTopology(name, conf, builder.createTopology());
142                Thread.sleep(60000);
143                cluster.shutdown();
144           }
145      }
146 }

上面程序,在本地調試(使用LocalCluster)不須要輸入任何參數,提交到實際集羣中運行時,須要傳遞一個參數,該參數爲Nimbus的主機名稱。
經過Maven構建,生成一個包含依賴的single jar文件(不要把Storm的依賴包添加進去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集羣以前,由於用到了Kafka,須要拷貝一下依賴jar文件到Storm集羣中的lib目錄下面:

1 cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
2 cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
3 cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
4 cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
5 cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
6 cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
7 cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
8 cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

而後,就能夠提交咱們開發的Topology程序了:

1 bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

能夠經過查看日誌文件(logs/目錄下)或者Storm UI來監控Topology的運行情況。若是程序沒有錯誤,可使用前面咱們使用的Kafka Producer來生成消息,就能看到咱們開發的Storm Topology可以實時接收到並進行處理。
上面Topology實現代碼中,有一個很關鍵的配置對象SpoutConfig,配置屬性以下所示:

1 spoutConf.forceFromStart = false;

該配置是指,若是該Topology因故障中止處理,下次正常運行時是否從Spout對應數據源Kafka中的該訂閱Topic的起始位置開始讀取,若是forceFromStart=true,則以前處理過的Tuple還要從新處理一遍,不然會從上次處理的位置繼續處理,保證Kafka中的Topic數據不被重複處理,是在數據源的位置進行狀態記錄。

整合Storm+HDFS

Storm實時計算集羣從Kafka消息中間件中消費消息,有實時處理需求的能夠走實時處理程序,還有須要進行離線分析的需求,如寫入到HDFS進行分析。下面實現了一個Topology,代碼以下所示:

001 package org.shirdrn.storm.examples;
002  
003 import java.text.DateFormat;
004 import java.text.SimpleDateFormat;
005 import java.util.Date;
006 import java.util.Map;
007 import java.util.Random;
008  
009 import org.apache.commons.logging.Log;
010 import org.apache.commons.logging.LogFactory;
011 import org.apache.storm.hdfs.bolt.HdfsBolt;
012 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
013 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
014 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
015 import org.apache.storm.hdfs.bolt.format.RecordFormat;
016 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
017 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
018 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
019 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
020 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
021  
022 import backtype.storm.Config;
023 import backtype.storm.LocalCluster;
024 import backtype.storm.StormSubmitter;
025 import backtype.storm.generated.AlreadyAliveException;
026 import backtype.storm.generated.InvalidTopologyException;
027 import backtype.storm.spout.SpoutOutputCollector;
028 import backtype.storm.task.TopologyContext;
029 import backtype.storm.topology.OutputFieldsDeclarer;
030 import backtype.storm.topology.TopologyBuilder;
031 import backtype.storm.topology.base.BaseRichSpout;
032 import backtype.storm.tuple.Fields;
033 import backtype.storm.tuple.Values;
034 import backtype.storm.utils.Utils;
035  
036 public class StormToHDFSTopology {
037  
038      public static class EventSpout extends BaseRichSpout {
039  
040           private static final Log LOG = LogFactory.getLog(EventSpout.class);
041           private static final long serialVersionUID = 886149197481637894L;
042           private SpoutOutputCollector collector;
043           private Random rand;
044           private String[] records;
045           
046           @Override
047           public void open(Map conf, TopologyContext context,
048                     SpoutOutputCollector collector) {
049                this.collector = collector;   
050                rand = new Random();
051                records = new String[] {
052                          "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",
053                          "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",
054                          "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"
055                };
056           }
057  
058  
059           @Override
060           public void nextTuple() {
061                Utils.sleep(1000);
062                DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
063                Date d = new Date(System.currentTimeMillis());
064                String minute = df.format(d);
065                String record = records[rand.nextInt(records.length)];
066                LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
067                collector.emit(new Values(minute, record));
068           }
069  
070           @Override
071           public void declareOutputFields(OutputFieldsDeclarer declarer) {
072                declarer.declare(new Fields("minute""record"));        
073           }
074  
075  
076      }
077      
078      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
079           // use "|" instead of "," for field delimiter
080           RecordFormat format = new DelimitedRecordFormat()
081                   .withFieldDelimiter(" : ");
082  
083           // sync the filesystem after every 1k tuples
084           SyncPolicy syncPolicy = new CountSyncPolicy(1000);
085  
086           // rotate files
087           FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
088  
089           FileNameFormat fileNameFormat = new DefaultFileNameFormat()
090                   .withPath("/storm/").withPrefix("app_").withExtension(".log");
091  
092           HdfsBolt hdfsBolt = new HdfsBolt()
093                   .withFsUrl("hdfs://h1:8020")
094                   .withFileNameFormat(fileNameFormat)
095                   .withRecordFormat(format)
096                   .withRotationPolicy(rotationPolicy)
097                   .withSyncPolicy(syncPolicy);
098           
099           TopologyBuilder builder = new TopologyBuilder();
100           builder.setSpout("event-spout"new EventSpout(), 3);
101           builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout"newFields("minute"));
102           
103           Config conf = new Config();
104           
105           String name = StormToHDFSTopology.class.getSimpleName();
106           if (args != null && args.length > 0) {
107                conf.put(Config.NIMBUS_HOST, args[0]);
108                conf.setNumWorkers(3);
109                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
110           else {
111                conf.setMaxTaskParallelism(3);
112                LocalCluster cluster = new LocalCluster();
113                cluster.submitTopology(name, conf, builder.createTopology());
114                Thread.sleep(60000);
115                cluster.shutdown();
116           }
117      }
118  
119 }

上面的處理邏輯,能夠對HdfsBolt進行更加詳細的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(能夠設置在知足什麼條件下,切出一個新的日誌,如能夠指定多長時間切出一個新的日誌文件,能夠指定一個日誌文件大小達到設置值後,再寫一個新日誌文件),更多設置能夠參考storm-hdfs,。
上面代碼在打包的時候,須要注意,使用storm-starter自帶的Maven打包配置,可能在將Topology部署運行的時候,會報錯,可使用maven-shade-plugin這個插件,以下配置所示:

01 <plugin>
02     <groupId>org.apache.maven.plugins</groupId>
03     <artifactId>maven-shade-plugin</artifactId>
04     <version>1.4</version>
05     <configuration>
06         <createDependencyReducedPom>true</createDependencyReducedPom>
07     </configuration>
08     <executions>
09         <execution>
10             <phase>package</phase>
11             <goals>
12                 <goal>shade</goal>
13             </goals>
14             <configuration>
15                 <transformers>
16                     <transformer
17                             implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
18                     <transformer
19                             implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
20                         <mainClass></mainClass>
21                     </transformer>
22                 </transformers>
23             </configuration>
24         </execution>
25     </executions>
26 </plugin>

整合Kafka+Storm+HDFS

上面分別對整合Kafka+Storm和Storm+HDFS作了實踐,能夠將後者的Spout改爲前者的Spout,從Kafka中消費消息,在Storm中能夠作簡單處理,而後將數據寫入HDFS,最後能夠在Hadoop平臺上對數據進行離線分析處理。下面,寫了一個簡單的例子,從Kafka消費消息,而後經由Storm處理,寫入到HDFS存儲,代碼以下所示:

001 package org.shirdrn.storm.examples;
002  
003 import java.util.Arrays;
004 import java.util.Map;
005  
006 import org.apache.commons.logging.Log;
007 import org.apache.commons.logging.LogFactory;
008 import org.apache.storm.hdfs.bolt.HdfsBolt;
009 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
010 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
011 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
012 import org.apache.storm.hdfs.bolt.format.RecordFormat;
013 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
014 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
015 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
016 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
017 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
018  
019 import storm.kafka.BrokerHosts;
020 import storm.kafka.KafkaSpout;
021 import storm.kafka.SpoutConfig;
022 import storm.kafka.StringScheme;
023 import storm.kafka.ZkHosts;
024 import backtype.storm.Config;
025 import backtype.storm.LocalCluster;
026 import backtype.storm.StormSubmitter;
027 import backtype.storm.generated.AlreadyAliveException;
028 import backtype.storm.generated.InvalidTopologyException;
029 import backtype.storm.spout.SchemeAsMultiScheme;
030 import backtype.storm.task.OutputCollector;
031 import backtype.storm.task.TopologyContext;
032 import backtype.storm.topology.OutputFieldsDeclarer;
033 import backtype.storm.topology.TopologyBuilder;
034 import backtype.storm.topology.base.BaseRichBolt;
035 import backtype.storm.tuple.Fields;
036 import backtype.storm.tuple.Tuple;
037 import backtype.storm.tuple.Values;
038  
039 public class DistributeWordTopology {
040      
041      public static class KafkaWordToUpperCase extends BaseRichBolt {
042  
043           private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
044           private static final long serialVersionUID = -5207232012035109026L;
045           private OutputCollector collector;
046           
047           @Override
048           public void prepare(Map stormConf, TopologyContext context,
049                     OutputCollector collector) {
050                this.collector = collector;             
051           }
052  
053           @Override
054           public void execute(Tuple input) {
055                String line = input.getString(0).trim();
056                LOG.info("RECV[kafka -> splitter] " + line);
057                if(!line.isEmpty()) {
058                     String upperLine = line.toUpperCase();
059                     LOG.info("EMIT[splitter -> counter] " + upperLine);
060                     collector.emit(input, new Values(upperLine, upperLine.length()));
061                }
062                collector.ack(input);
063           }
064  
065           @Override
066           public void declareOutputFields(OutputFieldsDeclarer declarer) {
067                declarer.declare(new Fields("line""len"));        
068           }
069           
070      }
071      
072      public static class RealtimeBolt extends BaseRichBolt {
073  
074           private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
075           private static final long serialVersionUID = -4115132557403913367L;
076           private OutputCollector collector;
077           
078           @Override
079           public void prepare(Map stormConf, TopologyContext context,
080                     OutputCollector collector) {
081                this.collector = collector;             
082           }
083  
084           @Override
085           public void execute(Tuple input) {
086                String line = input.getString(0).trim();
087                LOG.info("REALTIME: " + line);
088                collector.ack(input);
089           }
090  
091           @Override
092           public void declareOutputFields(OutputFieldsDeclarer declarer) {
093                
094           }
095  
096      }
097  
098      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
099  
100           // Configure Kafka
101           String zks = "h1:2181,h2:2181,h3:2181";
102           String topic = "my-replicated-topic5";
103           String zkRoot = "/storm"// default zookeeper root configuration for storm
104           String id = "word";
105           BrokerHosts brokerHosts = new ZkHosts(zks);
106           SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
107           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
108           spoutConf.forceFromStart = false;
109           spoutConf.zkServers = Arrays.asList(new String[] {"h1""h2""h3"});
110           spoutConf.zkPort = 2181;
111           
112           // Configure HDFS bolt
113           RecordFormat format = new DelimitedRecordFormat()
114                   .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
115           SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
116           FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
117           FileNameFormat fileNameFormat = new DefaultFileNameFormat()
118                   .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
119           HdfsBolt hdfsBolt = new HdfsBolt()
120                   .withFsUrl("hdfs://h1:8020")
121                   .withFileNameFormat(fileNameFormat)
122                   .withRecordFormat(format)
123                   .withRotationPolicy(rotationPolicy)
124                   .withSyncPolicy(syncPolicy);
125           
126           // configure & build topology
127           TopologyBuilder builder = new TopologyBuilder();
128           builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5);
129           builder.setBolt("to-upper"new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
130           builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
131           builder.setBolt("realtime"new RealtimeBolt(), 2).shuffleGrouping("to-upper");
132           
133           // submit topology
134           Config conf = new Config();
135           String name = DistributeWordTopology.class.getSimpleName();
136           if (args != null && args.length > 0) {
137                String nimbus = args[0];
138                conf.put(Config.NIMBUS_HOST, nimbus);
139                conf.setNumWorkers(3);
140                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
141           else {
142                conf.setMaxTaskParallelism(3);
143                LocalCluster cluster = new LocalCluster();
144                cluster.submitTopology(name, conf, builder.createTopology());
145                Thread.sleep(60000);
146                cluster.shutdown();
147           }
148      }
149  
150 }

上面代碼中,名稱爲to-upper的Bolt將接收到的字符串行轉換成大寫之後,會將處理過的數據向後面的hdfs-bolt、realtime這兩個Bolt各發一份拷貝,而後由這兩個Bolt分別根據實際須要(實時/離線)單獨處理。
打包後,在Storm集羣上部署並運行這個Topology:

1 bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

能夠經過Storm UI查看Topology運行狀況,能夠查看HDFS上生成的數據。

分佈式框架介紹 - kafkaee - kafkaee的博客

分佈式框架介紹 - kafkaee - kafkaee的博客

分佈式框架介紹 - kafkaee - kafkaee的博客分佈式框架介紹 - kafkaee - kafkaee的博客

相關文章
相關標籤/搜索