<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency>
目前kafka版本更新的有快,0.10版本剛用穩定,1.0都出來了,0.11版本重啓的時候會偶爾會報日誌文件未正常結束,若是用0.10就基本不會出現這個問題,這裏主要是在開發環境會常常關閉再開,java
這裏spring-kafka的版本爲1.2.2.RELEASE版本linux
上consumer配置文件spring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <bean id="messagingMessageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/> <!-- 定義consumer的參數 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.broker.address}"/> <entry key="group.id" value="${kafka.broker.groupid}"/> <entry key="enable.auto.commit" value="false"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="15000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 建立consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 消費者容器配置信息 --> <bean id="containerProperties_flowevent" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="${kafka.topic}"/> <property name="messageListener" ref="eventAsyncConsumerService"/> <property name="AckMode" value="MANUAL"/> </bean> <bean id="concurrentMessageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties_flowevent"/> <property name="Concurrency" value="2"/> </bean> </beans>
若是須要手動提交offset,那麼調用acknowledgment的ack方法。但這裏有一個問題,不知道是否是項目版本的問題,
我以前的一個項目僅僅只配置enable.auto.commit爲false,而後在配合acknowledgment使用是沒任何問題的,到如今項目都在跑,
@Component public class EventAsyncConsumerService implements AcknowledgingMessageListener<String, String> { private final static Logger log = LoggerFactory.getLogger(EventAsyncConsumerService.class); @Override public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) { if (log.isDebugEnabled()) { log.debug("value:" + consumerRecord.value()); log.debug("topic:" + consumerRecord.topic()); log.debug("partition:" + consumerRecord.partition()); log.debug("offset:" + consumerRecord.offset()); } try { EventMsg message=JSONSerializer.deserialize(consumerRecord.value(), EventMsg.class); EventInterface eventModel = ApplicationContextHelper.getBean(message.getEventName()); Map result= eventModel.execute(message.getParams(),message.getPrevResult()); // 最後 調用acknowledgment的ack方法,提交offset acknowledgment.acknowledge(); } catch (Exception ex) { log.error("EventAsyncConsumer==Exception==>"+ex.getMessage()+ex.getStackTrace()); } } }
這幾天升級了spring框架版本,發現無論用了。因而看了下spring-kafka源碼,發現須要配置 <property name="AckMode" value="MANUAL"/>就ok了因爲kafka官方最近升級很快,開發者使用的速度有點跟不上了,1.0版本在windows環境中啓動會報錯,這讓人情何以堪,難道是必需要放到linux環境下使用的嗎