spring+kafka消費者的2種配置方式

一、經過XML配置java

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                  http://www.springframework.org/schema/context
                  http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 1.定義consumer的參數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="group.id" value="${group.id}" />
                <entry key="enable.auto.commit" value="${enable.auto.commit}" />
                <entry key="session.timeout.ms" value="${session.timeout.ms}" />
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 2.建立consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!--&lt;!&ndash; 3.定義消費實現類 &ndash;&gt;-->
    <bean id="kafkaConsumerService" class="com.demo.bd.tasks.kafka.KafkaConsumerSerivceImpl" />

    <!-- 4.消費者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg name="topics">
            <list>
                <value>${kafka.consumer.topic}</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerService" />
    </bean>
    <!-- 5.消費者併發消息監聽容器,執行doStart()方法 -->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        <property name="concurrency" value="${concurrency}" />
    </bean>
</beans>

定義消費類:spring

public class KafkaConsumerSerivceImpl implements MessageListener<String, String> {
    private Logger log = LoggerFactory.getLogger(this.getClass());
    public void onMessage(ConsumerRecord<String, String> data) {
        String topic = data.topic();
        String value = data.value();
        log.info("----<<<<<<topic={},value={}",topic,value);
    }
}

二、經過註解配置,這種是不須要XML配置的apache

須要配置啓用kafka註解bootstrap

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    final static String list ="10.28.18.103:6667";

    /**
     * Description:獲取配置
     * Date:        2017年7月11日
     * @author      shaqf
     */
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
        return props;
    }
    /** 獲取工廠 */
    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }
    /** 獲取實例 */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
        factory1.setConsumerFactory(consumerFactory());
        factory1.setConcurrency(2);
        factory1.getContainerProperties().setPollTimeout(3000);
        System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
        return factory1;
    }

    /**
     * topic的消費者組1監聽
     * @return
     */
    @Bean
    public KafkaConsumerSerivce2Impl listener1() {
        return new KafkaConsumerSerivce2Impl();
    }
}

public class KafkaConsumerSerivce2Impl {

     private Logger log = LoggerFactory.getLogger(this.getClass());
    
     //支持EL表達式

     @KafkaListener(topics = {"${kafka.consumer.topic}"},group = "2")
     public void templarAgreementNoticewithhold(ConsumerRecord<String, String> data){
                //消費業務邏輯
          System.out.println("++++++++++++++++++++++++++++");
          String topic = data.topic();
          String value = data.value();
          log.info("---****->topic={},value={}",topic,value);
     }
}
相關文章
相關標籤/搜索