Spring boot 集成Kafka+Storm

轉載請註明做者及出處java

前言

​ 因爲業務需求須要把Strom與kafka整合到spring boot項目裏,實現其餘服務輸出日誌至kafka訂閱話題,storm實時處理該話題完成數據監控及其餘數據統計,可是網上教程較少,今天想寫的就是如何整合storm+kafka 到spring boot,順帶說一說我遇到的坑。python

使用工具及環境配置

​ 1. java 版本jdk-1.8git

​ 2. 編譯工具使用IDEA-2017github

​ 3. maven做爲項目管理web

​ 4.spring boot-1.5.8.RELEASEredis

需求體現

1.爲何須要整合到spring boot

爲了使用spring boot 統一管理各類微服務,及同時避免多個分散配置

2.具體思路及整合緣由

​ 使用spring boot統一管理kafka、storm、redis等所須要的bean,經過其餘服務日誌收集至Kafka,KafKa實時發送日誌至storm,在strom bolt時進行相應的處理操做spring

遇到的問題

​ 1.使用spring boot並無相關整合stormapache

​ 2.以spring boot啓動方式不知道如何觸發提交Topolgyapi

​ 3.提交Topology時遇到numbis not client localhost 問題tomcat

​ 4.Storm bolt中沒法經過註解得到實例化bean進行相應的操做

解決思路

在整合以前咱們須要知道相應的spring boot 的啓動方式及配置(若是你在閱讀本文時,默認你已經對storm,kafka及spring boot有相關了解及使用)

  1. spring boot 對storm進行整合的例子在網上不多,可是由於有相應的需求,所以咱們仍是須要整合.

    首先導入所須要jar包:

    <dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>0.10.1.1</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.cloud</groupId>
    			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
    			<exclusions>
    				<exclusion>
    					<artifactId>zookeeper</artifactId>
    					<groupId>org.apache.zookeeper</groupId>
    				</exclusion>
    				<exclusion>
    					<artifactId>spring-boot-actuator</artifactId>
    					<groupId>org.springframework.boot</groupId>
    				</exclusion>
    				<exclusion>
    					<artifactId>kafka-clients</artifactId>
    					<groupId>org.apache.kafka</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    			<exclusions>
    				<exclusion>
    					<artifactId>kafka-clients</artifactId>
    					<groupId>org.apache.kafka</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    
    
     	<dependency>
     		<groupId>org.springframework.data</groupId>
     		<artifactId>spring-data-hadoop</artifactId>
     		<version>2.5.0.RELEASE</version>
     		<exclusions>
     			<exclusion>
     				<groupId>org.slf4j</groupId>
     				<artifactId>slf4j-log4j12</artifactId>
     			</exclusion>
     			<exclusion>
     				<artifactId>commons-logging</artifactId>
     				<groupId>commons-logging</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>netty</artifactId>
     				<groupId>io.netty</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-core-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>curator-client</artifactId>
     				<groupId>org.apache.curator</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jettison</artifactId>
     				<groupId>org.codehaus.jettison</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-mapper-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-jaxrs</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>snappy-java</artifactId>
     				<groupId>org.xerial.snappy</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-xc</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-mapreduce-client-core</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>zookeeper</artifactId>
     				<groupId>org.apache.zookeeper</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
    
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.zookeeper</groupId>
     		<artifactId>zookeeper</artifactId>
     		<version>3.4.10</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>slf4j-log4j12</artifactId>
     				<groupId>org.slf4j</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.hbase</groupId>
     		<artifactId>hbase-client</artifactId>
     		<version>1.2.4</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>log4j</artifactId>
     				<groupId>log4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>zookeeper</artifactId>
     				<groupId>org.apache.zookeeper</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>netty</artifactId>
     				<groupId>io.netty</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-common</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-annotations</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-yarn-common</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>slf4j-log4j12</artifactId>
     				<groupId>org.slf4j</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.hadoop</groupId>
     		<artifactId>hadoop-common</artifactId>
     		<version>2.7.3</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>commons-logging</artifactId>
     				<groupId>commons-logging</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>curator-client</artifactId>
     				<groupId>org.apache.curator</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-mapper-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>jackson-core-asl</artifactId>
     				<groupId>org.codehaus.jackson</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>log4j</artifactId>
     				<groupId>log4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>snappy-java</artifactId>
     				<groupId>org.xerial.snappy</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>zookeeper</artifactId>
     				<groupId>org.apache.zookeeper</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>hadoop-auth</artifactId>
     				<groupId>org.apache.hadoop</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>commons-lang</artifactId>
     				<groupId>commons-lang</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>slf4j-log4j12</artifactId>
     				<groupId>org.slf4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
     	<dependency>
     		<groupId>org.apache.hadoop</groupId>
     		<artifactId>hadoop-mapreduce-examples</artifactId>
     		<version>2.7.3</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>commons-logging</artifactId>
     				<groupId>commons-logging</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>netty</artifactId>
     				<groupId>io.netty</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>guava</artifactId>
     				<groupId>com.google.guava</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>log4j</artifactId>
     				<groupId>log4j</groupId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
    
     	<!--storm-->
     	<dependency>
     		<groupId>org.apache.storm</groupId>
     		<artifactId>storm-core</artifactId>
     		<version>${storm.version}</version>
     		<scope>${provided.scope}</scope>
     		<exclusions>
     			<exclusion>
     				<groupId>org.apache.logging.log4j</groupId>
     				<artifactId>log4j-slf4j-impl</artifactId>
     			</exclusion>
     			<exclusion>
     				<artifactId>servlet-api</artifactId>
     				<groupId>javax.servlet</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
    
     	<dependency>
     		<groupId>org.apache.storm</groupId>
     		<artifactId>storm-kafka</artifactId>
     		<version>1.1.1</version>
     		<exclusions>
     			<exclusion>
     				<artifactId>kafka-clients</artifactId>
     				<groupId>org.apache.kafka</groupId>
     			</exclusion>
     		</exclusions>
     	</dependency>
    複製代碼
其中去除jar包是由於須要相與項目構建依賴有多重依賴問題,storm版本爲1.1.0  spring boot相關依賴爲




```java
<!-- spring boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
複製代碼

ps:maven的jar包僅由於項目使用需求,不是最精簡,僅供你們參考.

項目結構:

config-存儲不一樣環境配置文件

java-config 存儲構建spring boot 相關實現類 其餘如構建名

啓動spring boot的時候咱們會發現

  1. 其實開始整合前,對storm瞭解的較少,屬於剛開始沒有接觸過,後面參考發現整合到spring boot裏面啓動spring boot以後並無相應的方式去觸發提交Topolgy的函數,因此也形成了覺得啓動spring boot以後就完事告終果等了半個小時什麼事情都沒發生….-。-才發現沒有實現觸發提交函數.

爲了解決這個問題個人想法是: 啓動spring boot->建立kafka監聽Topic而後啓動Topolgy完成啓動,但是這樣的問題kafka監聽這個主題會重複觸發Topolgy,這明顯不是咱們想要的.看了一會後發現spring 有相關啓動完成以後執行某個時間方法,這個對我來講簡直是救星啊.因此如今觸發Topolgy的思路變爲:

啓動spring boot ->執行觸發方法->完成相應的觸發條件

構建方法爲:

/** * @author Leezer * @date 2017/12/28 * spring加載完後自動自動提交Topology **/
@Configuration
@Component
public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {

    private static String BROKERZKSTR;
    private static String TOPIC;
    private static String HOST;
    private static String PORT;
    public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr, @Value("${zookeeper.host}") String host, @Value("${zookeeper.port}") String port, @Value("${kafka.default-topic}") String topic ){
        BROKERZKSTR = brokerZkstr;
        HOST= host;
        TOPIC= topic;
        PORT= port;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        try {
            //實例化topologyBuilder類。
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //設置噴發節點並分配併發數,該併發數將會控制該對象在集羣中的線程數。
            BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
            // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
            SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            spoutConfig.zkServers = Collections.singletonList(HOST);
            spoutConfig.zkPort = Integer.parseInt(PORT);
            //從Kafka最新輸出日誌讀取
            spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
            KafkaSpout receiver = new KafkaSpout(spoutConfig);
            topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
            topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
            Config config = new Config();
            config.setDebug(false);
            /*設置該topology在storm集羣中要搶佔的資源slot數,一個slot對應這supervisor節點上的以個worker進程,若是你分配的spot數超過了你的物理節點所擁有的worker數目的話,有可能提交不成功,加入你的集羣上面已經有了一些topology而如今還剩下2個worker資源,若是你在代碼裏分配4個給你的topology的話,那麼這個topology能夠提交可是提交之後你會發現並無運行。 而當你kill掉一些topology後釋放了一些slot後你的這個topology就會恢復正常運行。 */
            config.setNumWorkers(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製代碼

​ 注:

  1. 啓動項目時由於使用的是內嵌tomcat進行啓動,可能會報以下錯誤
[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
	at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
	at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
複製代碼

這是由於有相應導入的jar包引入了servlet-api版本低於內嵌版本,咱們須要作的就是打開maven依賴把其去除

<exclusion>
   <artifactId>servlet-api</artifactId>
   <groupId>javax.servlet</groupId>
</exclusion>
複製代碼

而後從新啓動就能夠了.

  1. 啓動過程當中還有可能報:

    org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
    複製代碼

    這個問題我思考了好久,發現網上的解釋都是由於storm配置問題致使不對,但是個人storm是部署在服務器上的.並無相關的配置,按理也應該去服務器上讀取相關配置,但是結果並非這樣的。最後嘗試了幾個作法發現都不對,這裏才發現,在構建集羣的時候storm提供了相應的本地集羣

    LocalCluster cluster = new LocalCluster();
    複製代碼

    進行本地測試,若是在本地測試就使用其進行部署測試,若是部署到服務器上須要把:

    cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
    //修正爲:
    StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
    複製代碼

    進行任務提交;

    以上解決了上面所述的問題1-3

    問題4:是在bolt中使用相關bean實例,我發現我把其使用@Component加入spring中也沒法獲取到實例:個人猜測是在咱們構建提交Topolgy的時候,它會在:

    topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
    複製代碼

    執行bolt相關:

    @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            StormLauncher stormLauncher = StormLauncher.getStormLauncher();
            dataRepositorys =(AlarmDataRepositorys)  		       stormLauncher.getBean("alarmdataRepositorys");
        }
    複製代碼

    而不會實例化bolt,致使線程不一而spring 獲取不到.(這裏我也不是太明白,若是有大佬知道能夠分享一波)

    而咱們使用spring boot的意義就在於這些獲取這些繁雜的對象,這個問題困擾了我好久.最終想到,咱們能夠經過上下文getbean獲取實例不知道能不能行,而後我就開始了定義:

    例如我須要在bolt中使用一個服務:

    /** * @author Leezer * @date 2017/12/27 * 存儲操做失敗時間 **/
    @Service("alarmdataRepositorys")
    public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
        private static final String ERRO = "erro";
    
        /** * @param type 類型 * @param key key值 * @return 錯誤次數 **/
        @Override
        public String getErrNumFromRedis(String type,String key) {
            if(type==null || key == null){
                return null;
            }else {
                ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
                return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
            }
    
        }
    
    
        /** * @param type 錯誤類型 * @param key key值 * @param value 存儲值 **/
        @Override
        public void setErrNumToRedis(String type, String key,String value) {
            try {
                ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
                valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
            }catch (Exception e){
                logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key爲%s存入redis失敗",key));
            }
        }
    
    
    複製代碼

    這裏我指定了該bean的名稱,則在bolt執行prepare時:使用getbean方法獲取了相關bean就能完成相應的操做.

而後kafka訂閱主題發送至我bolt進行相關的處理.而這裏getbean的方法是在啓動bootmain函數定義:

@SpringBootApplication
@EnableTransactionManagement
@ComponentScan({"service","storm"})
@EnableMongoRepositories(basePackages = {"storm"})
@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@ImportResource(locations = {
		"classpath:/configs/spring-hadoop.xml",
		"classpath:/configs/spring-hbase.xml"})
public class StormLauncher extends SpringBootServletInitializer {

	//設置 安全線程launcher實例
	private volatile static StormLauncher stormLauncher;
	//設置上下文
	private ApplicationContext context;

	public static void main(String[] args) {
     
		SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
     // application.web(false).run(args);該方式是spring boot不以web形式啓動
		application.run(args);
		StormLauncher s = new StormLauncher();
		s.setApplicationContext(application.context());
		setStormLauncher(s);
	}

	private static void setStormLauncher(StormLauncher stormLauncher) {
		StormLauncher.stormLauncher = stormLauncher;
	}
	public static StormLauncher getStormLauncher() {
		return stormLauncher;
	}

	@Override
	protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
		return application.sources(StormLauncher.class);
	}


	/** * 獲取上下文 * * @return the application context */
	public ApplicationContext getApplicationContext() {
		return context;
	}

	/** * 設置上下文. * * @param appContext 上下文 */
	private void setApplicationContext(ApplicationContext appContext) {
		this.context = appContext;
	}

	/** * 經過自定義name獲取 實例 Bean. * * @param name the name * @return the bean */
	public Object getBean(String name) {
		return context.getBean(name);
	}

	/** * 經過class獲取Bean. * * @param <T> the type parameter * @param clazz the clazz * @return the bean */
	public <T> T getBean(Class<T> clazz) {
		return context.getBean(clazz);
	}

	/** * 經過name,以及Clazz返回指定的Bean * * @param <T> the type parameter * @param name the name * @param clazz the clazz * @return the bean */
	public <T> T getBean(String name, Class<T> clazz) {
		return context.getBean(name, clazz);
	}
複製代碼

到此集成storm 和kafka至spring boot已經結束了,相關kafka及其餘配置我會放入github上面

對了這裏還有一個kafkaclient的坑:

Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.
複製代碼

項目會報kafka client 問題,這是由於storm-kafka中,kafka使用的是0.8版本,而NetworkSend是0.9以上的版本,這裏集成須要與你集成的kafka相關版本一致.

雖然集成比較簡單,可是參考都比較少,加之剛開始接觸storm因此思考比較多,也在這記錄一下.

項目地址 - github

參考文獻: springboot-storm-integration

相關文章
相關標籤/搜索