kafka+storm+hbase實現計算WordCount。java
(1)表名:wcapache
(2)列族:resultapp
(3)RowKey:wordmaven
(4)Field:countui
一、解決:google
(1)第一步:首先準備kafka、storm和hbase相關jar包。依賴以下:url
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com</groupId> <artifactId>kafkaSpout</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.99.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar</systemPath> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>http://repo1.maven.org/maven2/</url> <snapshots> <enabled>false</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>clojars</id> <url>https://clojars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http://scala-tools.org/repo-releases</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>conjars</id> <url>http://conjars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
(2)將kafka發來的數據經過levelSplit的bolt進行分割處理,而後再發送到下一個Bolt中。代碼以下:spa
package com.kafka.spout; import java.util.regex.Matcher; import java.util.regex.Pattern; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelSplit extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String words = tuple.getString(0).toString();//the cow jumped over the moon String []va=words.split(" "); for(String word : va) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
(3)將levelSplit的Bolt發來的數據到levelCount的Bolt中進行計數處理,而後發送到hbase(Bolt)中。代碼以下:scala
package com.kafka.spout; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); for (Entry<String, Integer> e : counts.entrySet()) { //sum += e.getValue(); System.out.println(e.getKey() + "----------->" +e.getValue()); } collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("word", "count")); } }
(4)準備鏈接kafka和hbase條件以及設置整個拓撲結構而且提交拓撲。代碼以下:orm
package com.kafka.spout; import java.util.HashMap; import java.util.Map; import com.google.common.collect.Maps; //import org.apache.storm.guava.collect.Maps; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; public class StormKafkaTopo { public static void main(String[] args) { BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh"); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout"); Config conf = new Config(); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); SimpleHBaseMapper mapper = new SimpleHBaseMapper(); mapper.withColumnFamily("result"); mapper.withColumnFields(new Fields("count")); mapper.withRowKeyField("word"); Map<String, Object> map = Maps.newTreeMap(); map.put("hbase.rootdir", "hdfs://zeb:9000/hbase"); map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181"); // hbase-bolt HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf"); conf.setDebug(true); conf.put("hbase.conf", map); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout"); builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word")); builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count"); if(args != null && args.length > 0) { //提交到集羣運行 try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { //本地模式運行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Topotest1121", conf, builder.createTopology()); Utils.sleep(1000000); cluster.killTopology("Topotest1121"); cluster.shutdown(); } } }
(5)在kafka端用控制檯生產數據,以下:
二、運行結果截圖:
三、遇到的問題:
(1)把全部的工做作好後,提交了拓撲,運行代碼。發生了錯誤1,以下:
解決:原來是由於依賴版本要統一的問題,最後將版本修改一致後,成功解決。
(2)發生了錯誤2,以下:
解決:原來是忘記開hbase中的HMaster和HRegionServer。啓動後問題成功解決。