Spring kafka 整合示例

1、環境搭建java

本人測試環境爲:win7 64, Kafka 0.10.0.1 , Spring 4.3.4.RELEASE,  Zookeeper 3.4.6redis

Kafka 下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgzspring

zookeeper 下載地址:http://www.apache.org/dyn/closer.cgi/zookeeper/apache

 

下載後,分別解壓縮,默認配置下,兩個都是能夠啓動併成功。爲了演示,就不作配置直接執行啓動:bootstrap

windows 下,啓動 %ZOOKEEPER_HOME%/bin/zkServer.bat ,看到以下圖片時,啓動成功。windows

接下來啓動 Kafka , 執行命令 %KAFKA_HOME%/bin/windows/kafka-server-start.bat ../../config/server.properties ,看到以下圖樣,表示啓動成功。服務器

建立 kafka topic , 我建立是的執行命令爲:session

2、項目搭建app

添加Spring kafka 依賴包:ide

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

建立 Spring Producer 配置信息

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:redis="http://www.springframework.org/schema/redis"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:cache="http://www.springframework.org/schema/cache"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
       http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd
       http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <context:property-placeholder location="classpath:kafka/kafka-producer.properties" />

    <!-- producer的參數 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="0"/>
                <entry key="retries" value="5"/>
                <entry key="batch.size" value="16384"/>
                <entry key="linger.ms" value="1"/>
                <entry key="buffer.memory" value="33554432"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>
        </constructor-arg>
    </bean>


    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg name="configs" ref="producerProperties"/>
    </bean>


    <bean id="producerListener" class="com.ryan.spring.kafka.listener.MsgProducerListener" />

    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg name="producerFactory" ref="producerFactory"/>
        <!--<constructor-arg name="autoFlush" value="true" />-->
        <property name="defaultTopic" value="yaxin-test" />
        <property name="producerListener" ref="producerListener" />
    </bean>


</beans>

建立 Producer 監聽 MsgProducerListener.java 類:

package com.ryan.spring.kafka.listener;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListenerAdapter;

/**
 * @author Rayn
 * @email liuwei412552703@163.com
 * Created by Rayn on 2016/11/24 11:20.
 */
public class MsgProducerListener extends ProducerListenerAdapter {

    private static final Logger LOG = LoggerFactory.getLogger(MsgProducerListener.class);


    /**
     *
     * @param topic
     * @param partition
     * @param key
     * @param value
     * @param recordMetadata
     */
    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        super.onSuccess(topic, partition, key, value, recordMetadata);

        LOG.info("生產者消息 OnMessage : offset [{}]  partition [{}]", recordMetadata.offset(), recordMetadata.partition());

    }

    /**
     *
     * @param topic
     * @param partition
     * @param key
     * @param value
     * @param exception
     */
    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
        super.onError(topic, partition, key, value, exception);

        LOG.info("發送消息 Topic :{}, Partition : {}, Key : {}, Value : {}", topic, partition, key, value, exception.getMessage());

    }
}

 

接下來配置消費者:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:redis="http://www.springframework.org/schema/redis"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:cache="http://www.springframework.org/schema/cache"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
       http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd
       http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <context:property-placeholder location="classpath:kafka/kafka-producer.properties"/>

    <!-- 定義consumer的參數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="0"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="15000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>


    <!-- 實際執行消息消費的類 -->
    <bean id="messageListener" class="com.ryan.spring.kafka.listener.SingleThreadConsumerListener"/>

    <!-- 消費者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="yaxin-test"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>


    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="consumerProperties"/>
    </bean>

    <!-- 建立kafkatemplate bean,使用的時候,只須要注入這個bean,便可使用template的send消息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
        <property name="autoStartup" value="true" />
    </bean>

    <!--<bean id="kafkaListenerContainerFactory" class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory" >-->
        <!--<property name="batchListener" value="true"/>-->
        <!--<property name="consumerFactory" ref="consumerFactory"/>-->
    <!--</bean>-->

    <context:component-scan base-package="com.ryan.spring.kafka.listener" />

</beans>

消費者監聽代碼以下:

package com.ryan.spring.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListener;

/**
 * @author Rayn
 * @email liuwei412552703@163.com
 * Created by Rayn on 2016/11/24 12:41.
 */
public class SingleThreadConsumerListener implements MessageListener<Integer, String> {

    private static final Logger LOG = LoggerFactory.getLogger(SingleThreadConsumerListener.class);

    /**
     *
     * @param record
     */
    @Override
    public void onMessage(ConsumerRecord<Integer, String> record) {

        LOG.info("接收到消息:{}", record.value());
    }
}

 

3、測試生產者和消費者

生產者測試代碼以下:

/**
 * 測試Kafka生產者
 *
 * @throws Exception
 */
@Test
public void testProducer() throws Exception {
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("kafka/spring-kafka-producer.xml");

    KafkaTemplate kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate.class);

    for (int i = 0; i < 100; i++) {
        kafkaTemplate.send("yaxin-test", "測試發送數據:" + i);

        LOG.info("如今發送第 {} 條信息.", i);
        Thread.sleep(2000);
    }
    LOG.info("數據發送完畢.");

    kafkaTemplate.flush();
}

消費者,消息代碼測試以下:

/**
 * 測試 消費者
 *
 * @throws Exception
 */
@Test
public void testConsumer() throws Exception {
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("kafka/spring-kafka-consumer.xml");
    applicationContext.start();

    while(true){

        LOG.info("current time: " + new Date());

        Thread.sleep(2000);
    }
}

 

最後的消費者自行結果以下圖所示:

 

--------------------------------------------------------------------------------------------------------------

注:請留意spring-kafka 中,依賴的 kafka 版本,要否則 在啓動執行的時候,會報 Error reading field 'topic_metadata' in Kafka 這樣的錯誤。客戶端與服務器 Kafka 依賴 jar 包版本不一致引發的!

相關文章
相關標籤/搜索