一、經過XML配置java
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 1.定義consumer的參數 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}" /> <entry key="group.id" value="${group.id}" /> <entry key="enable.auto.commit" value="${enable.auto.commit}" /> <entry key="session.timeout.ms" value="${session.timeout.ms}" /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!-- 2.建立consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" > <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <!--<!– 3.定義消費實現類 –>--> <bean id="kafkaConsumerService" class="com.demo.bd.tasks.kafka.KafkaConsumerSerivceImpl" /> <!-- 4.消費者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg name="topics"> <list> <value>${kafka.consumer.topic}</value> </list> </constructor-arg> <property name="messageListener" ref="kafkaConsumerService" /> </bean> <!-- 5.消費者併發消息監聽容器,執行doStart()方法 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> <property name="concurrency" value="${concurrency}" /> </bean> </beans>
定義消費類:spring
public class KafkaConsumerSerivceImpl implements MessageListener<String, String> { private Logger log = LoggerFactory.getLogger(this.getClass()); public void onMessage(ConsumerRecord<String, String> data) { String topic = data.topic(); String value = data.value(); log.info("----<<<<<<topic={},value={}",topic,value); } }
二、經過註解配置,這種是不須要XML配置的apache
須要配置啓用kafka註解bootstrap
@Configuration @EnableKafka public class KafkaConsumerConfig { final static String list ="10.28.18.103:6667"; /** * Description:獲取配置 * Date: 2017年7月11日 * @author shaqf */ private Map<String, Object> consumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props)); return props; } /** 獲取工廠 */ private ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory(consumerConfigs()); } /** 獲取實例 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory(); factory1.setConsumerFactory(consumerFactory()); factory1.setConcurrency(2); factory1.getContainerProperties().setPollTimeout(3000); System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1)); return factory1; } /** * topic的消費者組1監聽 * @return */ @Bean public KafkaConsumerSerivce2Impl listener1() { return new KafkaConsumerSerivce2Impl(); } } public class KafkaConsumerSerivce2Impl { private Logger log = LoggerFactory.getLogger(this.getClass()); //支持EL表達式 @KafkaListener(topics = {"${kafka.consumer.topic}"},group = "2") public void templarAgreementNoticewithhold(ConsumerRecord<String, String> data){ //消費業務邏輯 System.out.println("++++++++++++++++++++++++++++"); String topic = data.topic(); String value = data.value(); log.info("---****->topic={},value={}",topic,value); } }