因工做須要,需在系統利用Kafka監聽接口,實現消息隊列中,對消息的消費,首選Kafka,由於看中其超高的吞吐量。java
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.2.RELEASE</version> </dependency>
1 版本兼容性
配置完Maven依賴之後,還須要確認,由於Kafka與Spring有依賴關係,須要肯定Spring的版本是否能和Kafka0.10.2完美兼容,查閱Spring For Apache Kafka 文檔可知:
Compatibilityspring
2 排除重複包
引入Maven依賴之後,Kafka的maven依賴,自動包含了springframework相關jar包,須要排除。apache
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>4.3.9.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.9.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.3.9.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>4.3.9.RELEASE</version> <scope>compile</scope> <optional>true</optional> </dependency>
public interface MessageListener<K, V> {} 1 void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> {} 2 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface BatchMessageListener<K, V> {} 3 void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> {} 4 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); }
對應的解釋以下
一、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods.
使用MessageListener接口實現時,當消費者拉取消息以後,消費完成會自動提交offset,即enable.auto.commit爲true時,適合使用此接口
二、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.
使用AcknowledgeMessageListener時,當消費者消費一條消息以後,不會自動提交offset,須要手動ack,即enable.auto.commit爲false時,適合使用此接口
三、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.bootstrap
四、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.api
BatchMessageListener和BatchAcknowledgingMessageListener接口做用與上述兩個接口大致相似,只是適合批量消費消息決定是否自動提交offset緩存
因爲業務較重,且offset自動提交時,出現消費異常或者消費失敗的狀況,消費者容易丟失消息,因此須要採用手動提交offset的方式,所以,這裏實現了AcknowledgeMessageListener接口。服務器
配置思路:
一、肯定須要定義的beans:session
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!--一、consumer屬性配置,hashMap--> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/> <entry key="group.id" value="${kafka.group.id}"/> <entry key="enable.auto.commit" value="false"/> <entry key="session.timeout.ms" value="15000"/> <!--<entry key="auto.offset.reset" value="earliest"/>--> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer.encoding" value="UTF8"/> <entry key="value.deserializer.encoding" value="UTF8"/> </map> </constructor-arg> </bean> <!--二、Kafka消費者工廠,DefaultKafkaConsumerFactory--> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!--三、監聽接口,AcknowledgingMessageListener--> <bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener"> <property name="threadPool" ref="kafkaWorkerThreadPool"/> </bean> <bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="20"/> <property name="maxPoolSize" value="200"/> <property name="queueCapacity" value="500"/> <property name="keepAliveSeconds" value="1800"/> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <!--四、Kafka消費者容器,屬性配置--> <bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="${kafka.topic}"/> <property name="ackMode" value="MANUAL_IMMEDIATE"/> <property name="messageListener" ref="kafkaMessageListener"/> </bean> <!--五、Kafka消費者容器--> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" > <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containProperties"/> </bean> </bean>
寫了個簡單的測試用例
生產者:
實現每秒定時向brokers發送一條消息maven
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; public class SimpleKafkaProducer implements Runnable { protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class); @Override public void run() { Map<String, Object> sendProps = senderProps(); Producer producer = new KafkaProducer(sendProps); Integer currentNum = 0; try { LOGGER.info("start produce message"); while (true){ ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum); producer.send(producerRecord); LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value()); currentNum++; Thread.sleep(1000); } }catch (Exception e){ LOGGER.error("send message fail", e); }finally { producer.close(); } } public static void main(String[] args) { SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer(); new Thread(simpleKafkaProducer).start(); } private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
消費者ide
public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); @Override public void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) { //TODO 這裏具體實現我的業務邏輯 // 最後 調用acknowledgment的ack方法,提交offset acknowledgment.acknowledge(); } }
消費者使用示例:這裏參考spring官方文檔,簡單實現了一個消費者監聽接口示例
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.Acknowledgment; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; public class SimpleKafkaConsumer extends SpringUnitTest { protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class); @Resource(name = "kafkaMessageListener") private KafkaMessageListener kafkaMessageListener; @Test public void TestLinstener(){ ContainerProperties containerProps = new ContainerProperties("testTopic"); containerProps.setMessageListener(kafkaMessageListener); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps); container.setBeanName("messageListenerContainer"); container.start(); } private static KafkaMessageListenerContainer<Integer, String> createContainer( ContainerProperties containerProps) { Map<String, Object> props = consumerProps(); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } private static Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
實現acknowledgeMessageListener接口以前,查閱了網上現有的文檔,結果不盡如人意,只能試着本身去參考spring官方文檔,慢慢摸索,最終實現手動提交offset的監聽接口,固然,Kafka的知識點,遠不止這些,後續還將繼續學習。