1、環境搭建java
本人測試環境爲:win7 64, Kafka 0.10.0.1 , Spring 4.3.4.RELEASE, Zookeeper 3.4.6redis
Kafka 下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgzspring
zookeeper 下載地址:http://www.apache.org/dyn/closer.cgi/zookeeper/apache
下載後,分別解壓縮,默認配置下,兩個都是能夠啓動併成功。爲了演示,就不作配置直接執行啓動:bootstrap
windows 下,啓動 %ZOOKEEPER_HOME%/bin/zkServer.bat ,看到以下圖片時,啓動成功。windows
接下來啓動 Kafka , 執行命令 %KAFKA_HOME%/bin/windows/kafka-server-start.bat ../../config/server.properties ,看到以下圖樣,表示啓動成功。服務器
建立 kafka topic , 我建立是的執行命令爲:session
2、項目搭建app
添加Spring kafka 依賴包:ide
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring.kafka.version}</version> </dependency>
建立 Spring Producer 配置信息
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:redis="http://www.springframework.org/schema/redis" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:cache="http://www.springframework.org/schema/cache" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <context:property-placeholder location="classpath:kafka/kafka-producer.properties" /> <!-- producer的參數 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}"/> <entry key="group.id" value="0"/> <entry key="retries" value="5"/> <entry key="batch.size" value="16384"/> <entry key="linger.ms" value="1"/> <entry key="buffer.memory" value="33554432"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> </map> </constructor-arg> </bean> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg name="configs" ref="producerProperties"/> </bean> <bean id="producerListener" class="com.ryan.spring.kafka.listener.MsgProducerListener" /> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg name="producerFactory" ref="producerFactory"/> <!--<constructor-arg name="autoFlush" value="true" />--> <property name="defaultTopic" value="yaxin-test" /> <property name="producerListener" ref="producerListener" /> </bean> </beans>
建立 Producer 監聽 MsgProducerListener.java 類:
package com.ryan.spring.kafka.listener; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.ProducerListenerAdapter; /** * @author Rayn * @email liuwei412552703@163.com * Created by Rayn on 2016/11/24 11:20. */ public class MsgProducerListener extends ProducerListenerAdapter { private static final Logger LOG = LoggerFactory.getLogger(MsgProducerListener.class); /** * * @param topic * @param partition * @param key * @param value * @param recordMetadata */ @Override public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) { super.onSuccess(topic, partition, key, value, recordMetadata); LOG.info("生產者消息 OnMessage : offset [{}] partition [{}]", recordMetadata.offset(), recordMetadata.partition()); } /** * * @param topic * @param partition * @param key * @param value * @param exception */ @Override public void onError(String topic, Integer partition, Object key, Object value, Exception exception) { super.onError(topic, partition, key, value, exception); LOG.info("發送消息 Topic :{}, Partition : {}, Key : {}, Value : {}", topic, partition, key, value, exception.getMessage()); } }
接下來配置消費者:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:redis="http://www.springframework.org/schema/redis" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:cache="http://www.springframework.org/schema/cache" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <context:property-placeholder location="classpath:kafka/kafka-producer.properties"/> <!-- 定義consumer的參數 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}"/> <entry key="group.id" value="0"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="15000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 實際執行消息消費的類 --> <bean id="messageListener" class="com.ryan.spring.kafka.listener.SingleThreadConsumerListener"/> <!-- 消費者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="yaxin-test"/> <property name="messageListener" ref="messageListener"/> </bean> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg ref="consumerProperties"/> </bean> <!-- 建立kafkatemplate bean,使用的時候,只須要注入這個bean,便可使用template的send消息方法 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> <property name="autoStartup" value="true" /> </bean> <!--<bean id="kafkaListenerContainerFactory" class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory" >--> <!--<property name="batchListener" value="true"/>--> <!--<property name="consumerFactory" ref="consumerFactory"/>--> <!--</bean>--> <context:component-scan base-package="com.ryan.spring.kafka.listener" /> </beans>
消費者監聽代碼以下:
package com.ryan.spring.kafka.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.listener.MessageListener; /** * @author Rayn * @email liuwei412552703@163.com * Created by Rayn on 2016/11/24 12:41. */ public class SingleThreadConsumerListener implements MessageListener<Integer, String> { private static final Logger LOG = LoggerFactory.getLogger(SingleThreadConsumerListener.class); /** * * @param record */ @Override public void onMessage(ConsumerRecord<Integer, String> record) { LOG.info("接收到消息:{}", record.value()); } }
3、測試生產者和消費者
生產者測試代碼以下:
/** * 測試Kafka生產者 * * @throws Exception */ @Test public void testProducer() throws Exception { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("kafka/spring-kafka-producer.xml"); KafkaTemplate kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate.class); for (int i = 0; i < 100; i++) { kafkaTemplate.send("yaxin-test", "測試發送數據:" + i); LOG.info("如今發送第 {} 條信息.", i); Thread.sleep(2000); } LOG.info("數據發送完畢."); kafkaTemplate.flush(); }
消費者,消息代碼測試以下:
/** * 測試 消費者 * * @throws Exception */ @Test public void testConsumer() throws Exception { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("kafka/spring-kafka-consumer.xml"); applicationContext.start(); while(true){ LOG.info("current time: " + new Date()); Thread.sleep(2000); } }
最後的消費者自行結果以下圖所示:
--------------------------------------------------------------------------------------------------------------
注:請留意spring-kafka 中,依賴的 kafka 版本,要否則 在啓動執行的時候,會報 Error reading field 'topic_metadata' in Kafka 這樣的錯誤。客戶端與服務器 Kafka 依賴 jar 包版本不一致引發的!