spring-kafka消費者配置

     <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

目前kafka版本更新的有快,0.10版本剛用穩定,1.0都出來了,0.11版本重啓的時候會偶爾會報日誌文件未正常結束,若是用0.10就基本不會出現這個問題,這裏主要是在開發環境會常常關閉再開,java

這裏spring-kafka的版本爲1.2.2.RELEASE版本linux

上consumer配置文件spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
        http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <bean id="messagingMessageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

    <!-- 定義consumer的參數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.broker.address}"/>
                <entry key="group.id" value="${kafka.broker.groupid}"/>
                <entry key="enable.auto.commit" value="false"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="15000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 建立consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>

    <!-- 消費者容器配置信息 -->
    <bean id="containerProperties_flowevent" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="${kafka.topic}"/>
        <property name="messageListener" ref="eventAsyncConsumerService"/>
        <property name="AckMode" value="MANUAL"/>
    </bean>

    <bean id="concurrentMessageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties_flowevent"/>
        <property name="Concurrency" value="2"/>
    </bean>

</beans>
若是須要手動提交offset,那麼調用acknowledgment的ack方法。但這裏有一個問題,不知道是否是項目版本的問題,
我以前的一個項目僅僅只配置enable.auto.commit爲false,而後在配合acknowledgment使用是沒任何問題的,到如今項目都在跑,
@Component
public class EventAsyncConsumerService implements AcknowledgingMessageListener<String, String> {
    private final static Logger log = LoggerFactory.getLogger(EventAsyncConsumerService.class);

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
        if (log.isDebugEnabled()) {
            log.debug("value:" + consumerRecord.value());
            log.debug("topic:" + consumerRecord.topic());
            log.debug("partition:" + consumerRecord.partition());
            log.debug("offset:" + consumerRecord.offset());
        }

        try {
            EventMsg message=JSONSerializer.deserialize(consumerRecord.value(), EventMsg.class);
            EventInterface eventModel = ApplicationContextHelper.getBean(message.getEventName());
            Map result= eventModel.execute(message.getParams(),message.getPrevResult());
            // 最後 調用acknowledgment的ack方法,提交offset
            acknowledgment.acknowledge();
        } catch (Exception ex) {
            log.error("EventAsyncConsumer==Exception==>"+ex.getMessage()+ex.getStackTrace());
        }
    }
}

 

這幾天升級了spring框架版本,發現無論用了。因而看了下spring-kafka源碼,發現須要配置   <property name="AckMode" value="MANUAL"/>就ok了因爲kafka官方最近升級很快,開發者使用的速度有點跟不上了,1.0版本在windows環境中啓動會報錯,這讓人情何以堪,難道是必需要放到linux環境下使用的嗎
相關文章
相關標籤/搜索