kafka+storm+hbase

kafka+storm+hbase實現計算WordCount。java

(1)表名:wcapache

(2)列族:resultapp

(3)RowKey:wordmaven

(4)Field:countui

 

一、解決:google

1)第一步:首先準備kafkastormhbase相關jar包。依賴以下url

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com</groupId>
  <artifactId>kafkaSpout</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
	<dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.99.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
       <dependency>

         <groupId>com.google.protobuf</groupId>

         <artifactId>protobuf-java</artifactId>

         <version>2.5.0</version>

        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>



			
			
			
			
			 
        
           <dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.7</version>
			<scope>system</scope>
			<systemPath>C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar</systemPath>
		</dependency>     
        
    </dependencies>


    <repositories>
        <repository>
            <id>central</id>
            <url>http://repo1.maven.org/maven2/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>clojars</id>
            <url>https://clojars.org/repo/</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>scala-tools</id>
            <url>http://scala-tools.org/repo-releases</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>conjars</id>
            <url>http://conjars.org/repo/</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <encoding>UTF-8</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

 

(2)kafka發來的數據經過levelSplitbolt進行分割處理,而後再發送到下一個Bolt中。代碼以下:spa

 

package com.kafka.spout;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class LevelSplit extends BaseBasicBolt {


    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String words = tuple.getString(0).toString();//the cow jumped over the moon
        String []va=words.split(" ");
        for(String word : va)
        {
        	collector.emit(new Values(word));
        }
		
    }

  
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

  

(3)將levelSplit的Bolt發來的數據到levelCount的Bolt中進行計數處理,而後發送到hbase(Bolt)中。代碼以下:scala

 

package com.kafka.spout;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class LevelCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		// TODO Auto-generated method stub
		String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);

		for (Entry<String, Integer> e : counts.entrySet()) {
			//sum += e.getValue();
			System.out.println(e.getKey()
								+ "----------->" +e.getValue());
        }
        collector.emit(new Values(word, count));      
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		 declarer.declare(new Fields("word", "count"));
	}
}

  

(4)準備鏈接kafkahbase條件以及設置整個拓撲結構而且提交拓撲。代碼以下:orm

 

package com.kafka.spout;


import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Maps;

//import org.apache.storm.guava.collect.Maps;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;



public class StormKafkaTopo {
    public static void main(String[] args) {
    	
    	
        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout");
        Config conf = new Config();   
        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());    
        
        SimpleHBaseMapper mapper = new SimpleHBaseMapper();
        mapper.withColumnFamily("result");
        mapper.withColumnFields(new Fields("count"));
        mapper.withRowKeyField("word");
        
        Map<String, Object> map = Maps.newTreeMap();
        map.put("hbase.rootdir", "hdfs://zeb:9000/hbase");
        map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181");
        
        // hbase-bolt
        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");

        conf.setDebug(true);
        conf.put("hbase.conf", map);

        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpout(spoutConfig));
        builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout");
        builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word"));
        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");
        
        if(args != null && args.length > 0) {
            //提交到集羣運行
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            //本地模式運行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Topotest1121", conf, builder.createTopology());
            Utils.sleep(1000000);
            cluster.killTopology("Topotest1121");
            cluster.shutdown();
        }           
    }
}

  

(5)在kafka端用控制檯生產數據,以下:

 

 

二、運行結果截圖:

 

 

三、遇到的問題:

(1)把全部的工做作好後,提交了拓撲,運行代碼。發生了錯誤1,以下:

 

 

解決:原來是由於依賴版本要統一的問題,最後將版本修改一致後,成功解決。

(2)發生了錯誤2,以下:

 

 

解決:原來是忘記開hbase中的HMaster和HRegionServer。啓動後問題成功解決。

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息