1、概述java
Spring Integration Kafka 是基於 Apache Kafka 和Spring Integration來集成Kafka,對開發配置提供了方便。git
2、配置github
一、spring-kafka-consumer.xmlweb
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- topic test conf --> <int:channel id="inputFromKafka" > <int:dispatcher task-executor="kafkaMessageExecutor" /> </int:channel> <!-- zookeeper配置 能夠配置多個 --> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="192.168.1.237:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" /> <!-- channel配置 auto-startup="true" 不然接收不發數據 --> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka"> <int:poller fixed-delay="1" time-unit="MILLISECONDS" /> </int-kafka:inbound-channel-adapter> <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" /> <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> </props> </property> </bean> <!-- 消息接收的BEEN --> <bean id="kafkaConsumerService" class="com.sunney.service.impl.KafkaConsumerService" /> <!-- 指定接收的方法 --> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage" /> <int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000"> <!-- 兩個TOPIC配置 --> <int-kafka:topic id="mytopic" streams="4" /> <int-kafka:topic id="sunneytopic" streams="4" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> </beans>
二、spring-kafka-producer.xmlspring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- commons config --> <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer"/> <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> <constructor-arg value="java.lang.String" /> </bean> <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="serializer.class">kafka.serializer.StringEncoder</prop> <prop key="request.required.acks">1</prop> </props> </property> </bean> <!-- topic test config --> <int:channel id="kafkaTopicTest"> <int:queue /> </int:channel> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest" auto-startup="true" channel="kafkaTopicTest" order="3"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="1" task-executor="taskExecutor" /> </int-kafka:outbound-channel-adapter> <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500" /> <!-- <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder"> <constructor-arg value="com.company.AvroGeneratedSpecificRecord" /> </bean> --> <int-kafka:producer-context id="producerContextTopicTest" producer-properties="producerProperties"> <int-kafka:producer-configurations> <!-- 多個topic配置 --> <int-kafka:producer-configuration broker-list="192.168.1.237:9090,192.168.1.237:9091,192.168.1.237:9092" key-serializer="stringSerializer" value-class-type="java.lang.String" value-serializer="stringSerializer" topic="mytopic" /> <int-kafka:producer-configuration broker-list="192.168.1.237:9090,192.168.1.237:9091,192.168.1.237:9092" key-serializer="stringSerializer" value-class-type="java.lang.String" value-serializer="stringSerializer" topic="sunneytopic"/> </int-kafka:producer-configurations> </int-kafka:producer-context> </beans>
三、發消息接口 KafkaServiceapache
package com.sunney.service; /** * 類KafkaService.java的實現描述:發消息接口類 * @author Sunney 2016年4月30日 上午11:30:53 */ public interface KafkaService { /** * 發消息 * @param topic 主題 * @param obj 發送內容 */ public void sendUserInfo(String topic, Object obj); }
四、發消息實現類 KafkaServiceImpl json
package com.sunney.service.impl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.kafka.support.KafkaHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; import com.sunney.service.KafkaService; /** * 類KafkaServiceImpl.java的實現描述:發消息實現類 * @author Sunney 2016年4月30日 上午11:31:13 */ @Service("kafkaService") public class KafkaServiceImpl implements KafkaService{ @Autowired @Qualifier("kafkaTopicTest") MessageChannel channel; public void sendUserInfo(String topic, Object obj) { channel.send(MessageBuilder.withPayload(obj) .setHeader(KafkaHeaders.TOPIC,topic) .build()); } }
五、消費接收類KafkaConsumerService session
package com.sunney.service.impl; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.sunney.service.UserDto; /** * 類KafkaConsumerService.java的實現描述:消費接收類 * * @author Sunney 2016年4月30日 上午11:46:14 */ public class KafkaConsumerService { static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class); public void processMessage(Map<String, Map<Integer, String>> msgs) { logger.info("===============processMessage==============="); for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) { logger.info("============Topic:" + entry.getKey()); LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue(); Set<Integer> keys = messages.keySet(); for (Integer i : keys) logger.info("======Partition:" + i); Collection<String> values = messages.values(); for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) { String message = "["+iterator.next()+"]"; logger.info("=====message:" + message); List<UserDto> userList = JSON.parseArray(message, UserDto.class); logger.info("=====userList.size:" + userList.size()); } } } }
六、pomsocket
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId> org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency> </dependencies>
6、源代碼地址:https://github.com/sunney2010/kafka-demospring-boot
7、遇到的問題
一、消費端口收不到消息
spring-kafka-consumer.xml的auto-startup設置爲true