1. 消息監聽
對於Kafka中Topic的數據消費,咱們通常都選擇使用消息監聽器進行消費,怎麼把消息監聽器玩出花來呢,那就得看看它所實現的功能了。數組
Spring-Kafka中消息監聽大體分爲兩種類型,安全
一種是單條數據消費,併發
一種是批量消費;ide
二者的區別只是在於監聽器一次性獲取消息的數量。函數
GenericMessageListener是咱們實現消息監聽的一個接口,向上擴展的接口有很是多,測試
好比:單數據消費的MessageListener、批量消費的BatchMessageListener、還有具有ACK機制的AcknowledgingMessageListener和BatchAcknowledgingMessageListener等等。ui
接下來咱們就一一解析一下。this
2. GenericMessageListener
這裏能夠看到GenericMessageListener使用註解標明這是一個函數式接口,默認實現了三種不一樣參數的onMessage方法。spa
data就是咱們須要接收的數據,Consumer則是消費者類,Acknowledgment則是用來實現Ack機制的類。.net
這裏須要注意一下的是,Consumer對象並非線程安全的。
@FunctionalInterface public interface GenericMessageListener<T> { void onMessage(T var1); default void onMessage(T data, Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } }
接下來先瀏覽一下繼承了GenericMessageListener接口的類。前綴爲Batch的接口都是批處理類型的消息監聽接口,裏面的參數也都講解過了
public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
在把@KafkaListener玩出花前,咱們還須要瞭解怎麼使用非註解方式去監聽Topic。
咱們在建立監聽容器前須要建立一個監聽容器工廠,這裏只須要配置一下消費者工廠就行了
,以後咱們使用它去建立咱們的監聽容器。consumerFactory()這個參數在以前就已經定義過了,這裏就不重複貼代碼了。
@Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
有了監聽容器工廠,咱們就可使用它去建立咱們的監聽容器
Bean方式建立監聽容器
@Bean public KafkaMessageListenerContainer demoListenerContainer() { ContainerProperties properties = new ContainerProperties("topic.quick.bean"); properties.setGroupId("bean"); properties.setMessageListener(new MessageListener<Integer,String>() { private Logger log = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(ConsumerRecord<Integer, String> record) { log.info("topic.quick.bean receive : " + record.toString()); } }); return new KafkaMessageListenerContainer(consumerFactory(), properties); }
啓動項目咱們能夠看一下控制檯的日誌,監聽容器成功分配給某個消費者的結果很清晰的顯示出來了,
順便就寫個測試方法測試一下監聽器能不能正常運行。
2018-09-11 10:36:15.732 INFO 1168 --- [erContainer-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bean] Successfully joined group with generation 1 2018-09-11 10:36:15.733 INFO 1168 --- [erContainer-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bean] Setting newly assigned partitions [topic.quick.bean-0] 2018-09-11 10:36:15.733 INFO 1168 --- [erContainer-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.bean-0]
發送一條消息到指定監聽隊列中
@Test public void test() { kafkaTemplate.send("topic.quick.bean", "send msg to beanListener"); }
3. @KafkaListener參數講解
在前幾章入門的時候就已經寫過一個用@KafkaListener註解實現監聽的代碼,這裏就貼一下以前寫的代碼
@KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : "+msgData); }
使用@KafkaListener這個註解並不侷限於這個監聽容器是單條數據消費仍是批量消費,
區分單數據仍是多數據消費只須要配置一下註解的containerFactory屬性便可,先講解一下這個監聽方法都能接收寫什麼參數吧。
data : 對於data值的類型其實並無限定,根據KafkaTemplate所定義的類型來決定。data爲List集合的則是用做批量消費。
ConsumerRecord:具體消費數據類,包含Headers信息、分區信息、時間戳等
Acknowledgment:用做Ack機制的接口
Consumer:消費者類,使用該類咱們能夠手動提交偏移量、控制消費速率等功能
public void listen1(String data) public void listen2(ConsumerRecord<K,V> data) public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) public void listen5(List<String> data) public void listen6(List<ConsumerRecord<K,V>> data) public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
接下來在看看@KafkaListener的註解都提供了什麼屬性。
d:消費者的id,當GroupId沒有被配置的時候,默認id爲GroupId
containerFactory:上面提到了@KafkaListener區分單數據仍是多數據消費只須要配置一下註解的containerFactory屬性就能夠了,
這裏面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
topics:須要監聽的Topic,可監聽多個
topicPartitions:可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset爲200的偏移量開始監聽
errorHandler:監聽異常處理器,配置BeanName
groupId:消費組ID
idIsGroup:id是否爲GroupId
clientIdPrefix:消費者Id前綴
beanRef:真實監聽容器的BeanName,須要在 BeanName前加 "__"
public @interface KafkaListener { String id() default ""; String containerFactory() default ""; String[] topics() default {}; String topicPattern() default ""; TopicPartition[] topicPartitions() default {}; String containerGroup() default ""; String errorHandler() default ""; String groupId() default ""; boolean idIsGroup() default true; String clientIdPrefix() default ""; String beanRef() default "__listener"; }
如今開始纔是把監聽容器玩出花來的時刻
4. 使用ConsumerRecord類消費
用ConsumerRecord類接收的好處是什麼呢,ConsumerRecord類裏面包含分區信息、消息頭、消息體等內容,若是業務須要獲取這些參數時,使用ConsumerRecord會是個不錯的選擇。
若是使用具體的類型接收消息體則更加方便,好比說用String類型去接收消息體。
這裏咱們編寫一個consumerListener方法,監聽"topic.quick.consumer" Topic,並把ConsumerRecord裏面所包含的內容打印到控制檯中
@Component public class SingleListener { private static final Logger log = LoggerFactory.getLogger(SingleListener.class); @KafkaListener(id = "consumer", topics = "topic.quick.consumer") public void consumerListener(ConsumerRecord<Integer, String> record) { log.info("topic.quick.consumer receive : " + record.toString()); } }
編寫測試方法,發送數據到對應的Topic中,運行測試咱們能夠看到控制檯打印的日誌,
日誌裏面包含topic、partition、offset等信息,這其實就是完整的消息儲存結構。
@Test public void testConsumerRecord() { kafkaTemplate.send("topic.quick.consumer", "test receive by consumerRecord"); }
2018-09-11 15:52:13.546 INFO 13644 --- [ consumer-0-C-1] com.viu.kafka.listen.SingleListener : topic.quick.consumer receive : ConsumerRecord(topic = topic.quick.consumer, partition = 0, offset = 0, CreateTime = 1536652333476, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test receive by consumerRecord)
5. 批量消費案例
從新建立一份新的消費者配置,配置爲一次拉取5條消息
建立一個監聽容器工廠,設置其爲批量消費並設置併發量爲5,這個併發量根據分區數決定,必須小於等於分區數,不然會有線程一直處於空閒狀態
建立一個分區數爲8的Topic
建立監聽方法,設置消費id爲batch,clientID前綴爲batch,監聽topic.quick.batch,使用batchContainerFactory工廠建立該監聽容器
@Component public class BatchListener { private static final Logger log= LoggerFactory.getLogger(BatchListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); //一次拉取消息數量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); //設置併發量,小於或等於Topic的分區數 container.setConcurrency(5); //設置爲批量監聽 container.setBatchListener(true); return container; } @Bean public NewTopic batchTopic() { return new NewTopic("topic.quick.batch", 8, (short) 1); } @KafkaListener(id = "batch",clientIdPrefix = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory") public void batchListener(List<String> data) { log.info("topic.quick.batch receive : "); for (String s : data) { log.info( s); } } }
緊接着咱們啓動項目,控制檯的日誌信息很是完整,咱們能夠看到batchListener這個監聽容器的partition分配信息。
咱們設置concurrency爲5,也就是將會啓動5條線程進行監聽,那咱們建立的topic則是有8個partition,意味着將有3條線程分配到2個partition和2條線程分配到1個partition。
咱們能夠看到這段日誌的最後5行,這就是每條線程分配到的partition。
2018-09-11 12:47:49.628 INFO 4708 --- [ batch-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=batch-2, groupId=batch] Successfully joined group with generation 98 2018-09-11 12:47:49.628 INFO 4708 --- [ batch-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=batch-2, groupId=batch] Setting newly assigned partitions [topic.quick.batch-4, topic.quick.batch-5] 2018-09-11 12:47:49.630 INFO 4708 --- [ batch-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=batch-3, groupId=batch] Successfully joined group with generation 98 2018-09-11 12:47:49.630 INFO 4708 --- [ batch-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=batch-0, groupId=batch] Successfully joined group with generation 98 2018-09-11 12:47:49.630 INFO 4708 --- [ batch-4-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=batch-4, groupId=batch] Successfully joined group with generation 98 2018-09-11 12:47:49.630 INFO 4708 --- [ batch-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=batch-3, groupId=batch] Setting newly assigned partitions [topic.quick.batch-6] 2018-09-11 12:47:49.630 INFO 4708 --- [ batch-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=batch-0, groupId=batch] Setting newly assigned partitions [topic.quick.batch-0, topic.quick.batch-1] 2018-09-11 12:47:49.630 INFO 4708 --- [ batch-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=batch-4, groupId=batch] Setting newly assigned partitions [topic.quick.batch-7] 2018-09-11 12:47:49.631 INFO 4708 --- [ batch-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=batch-1, groupId=batch] Successfully joined group with generation 98 2018-09-11 12:47:49.631 INFO 4708 --- [ batch-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=batch-1, groupId=batch] Setting newly assigned partitions [topic.quick.batch-2, topic.quick.batch-3] 2018-09-11 12:47:49.633 INFO 4708 --- [ batch-3-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.batch-6] 2018-09-11 12:47:49.633 INFO 4708 --- [ batch-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.batch-0, topic.quick.batch-1] 2018-09-11 12:47:49.633 INFO 4708 --- [ batch-4-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.batch-7] 2018-09-11 12:47:49.633 INFO 4708 --- [ batch-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.batch-2, topic.quick.batch-3] 2018-09-11 12:47:49.634 INFO 4708 --- [ batch-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic.quick.batch-4, topic.quick.batch-5]
那咱們來編寫一下測試方法,在短期內發送12條消息到topic中,能夠看到運行結果,對應的監聽方法總共拉取了三次數據,
其中兩次爲5條數據,一次爲2條數據,加起來就是咱們在測試方法發送的12條數據。證實咱們的批量消費方法是按預期進行的。
@Autowired private KafkaTemplate kafkaTemplate; @Test public void testBatch() { for (int i = 0; i < 12; i++) { kafkaTemplate.send("topic.quick.batch", "test batch listener,dataNum-" + i); } }
2018-09-11 12:08:51.840 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch receive : 2018-09-11 12:08:51.840 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-5 2018-09-11 12:08:51.840 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-2 2018-09-11 12:08:51.840 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-10 2018-09-11 12:08:51.840 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-6 2018-09-11 12:08:51.840 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-3 2018-09-11 12:08:51.841 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch receive : 2018-09-11 12:08:51.841 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-11 2018-09-11 12:08:51.841 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-0 2018-09-11 12:08:51.841 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-8 2018-09-11 12:08:51.841 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-7 2018-09-11 12:08:51.841 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-4 2018-09-11 12:08:51.842 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch receive : 2018-09-11 12:08:51.842 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-1 2018-09-11 12:08:51.842 INFO 12416 --- [ batch-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-9
注意:設置的併發量不能大於partition的數量,若是須要提升吞吐量,能夠經過增長partition的數量達到快速提高吞吐量的效果。
6. 監聽Topic中指定的分區
緊接着剛纔編寫的代碼裏面編寫新的監聽器,第一眼看到這代碼,媽呀,這注解這麼長,哈哈哈,我也不是故意的啊。
這裏使用@KafkaListener註解的topicPartitions屬性監聽不一樣的partition分區。
@TopicPartition:topic--須要監聽的Topic的名稱,partitions --須要監聽Topic的分區id,
partitionOffsets --能夠設置從某個偏移量開始監聽
@PartitionOffset:partition --分區Id,非數組,initialOffset --初始偏移量
@Bean public NewTopic batchWithPartitionTopic() { return new NewTopic("topic.quick.batch.partition", 8, (short) 1); } @KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory", topicPartitions = { @TopicPartition(topic = "topic.quick.batch.partition",partitions = {"1","3"}), @TopicPartition(topic = "topic.quick.batch.partition",partitions = {"0","4"}, partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100")) } ) public void batchListenerWithPartition(List<String> data) { log.info("topic.quick.batch.partition receive : "); for (String s : data) { log.info(s); } }
其實和咱們剛纔寫的批量消費區別只是在註解上多了個屬性,啓動項目咱們仔細搜索一下控制檯輸出的日誌,若是存在該日誌則說明成功。
一樣的咱們往這個Topic裏面寫入一些數據,運行後咱們能夠看到控制檯只監聽到一部分消息,
這是由於建立的Topic的partition數量爲8,而咱們只監聽了0、一、二、三、4這幾個partition,也就是說5 6 7這三個分區的消息咱們並無讀取出來。
2018-09-11 14:39:52.045 INFO 12412 --- [Partition-4-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=bwp-4, groupId=batchWithPartition] Fetch offset 100 is out of range for partition topic.quick.batch-2, resetting offset
@Test public void testBatch() throws InterruptedException { for (int i = 0; i < 12; i++) { kafkaTemplate.send("topic.quick.batch.partition", "test batch listener,dataNum-" + i); } }
2018-09-11 14:51:09.063 INFO 1532 --- [Partition-2-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.063 INFO 1532 --- [Partition-2-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-4 2018-09-11 14:51:09.064 INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.064 INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-2 2018-09-11 14:51:09.075 INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.075 INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-1 2018-09-11 14:51:09.078 INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.078 INFO 1532 --- [Partition-1-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-10 2018-09-11 14:51:09.091 INFO 1532 --- [Partition-4-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.091 INFO 1532 --- [Partition-4-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-5 2018-09-11 14:51:09.095 INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.096 INFO 1532 --- [Partition-0-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-9 2018-09-11 14:51:09.097 INFO 1532 --- [Partition-3-C-1] com.viu.kafka.listen.BatchListener : topic.quick.batch.partition receive : 2018-09-11 14:51:09.098 INFO 1532 --- [Partition-3-C-1] com.viu.kafka.listen.BatchListener : test batch listener,dataNum-7
7. 註解方式獲取消息頭及消息體
當你接收的消息包含請求頭,以及你監聽方法須要獲取該消息很是多的字段時能夠經過這種方式,畢竟get方法代碼量仍是稍多點的。
這裏使用的是默認的監聽容器工廠建立的,若是你想使用批量消費,把對應的類型改成List便可,好比List<String> data , List<Integer> key。
@Payload:獲取的是消息的消息體,也就是發送內容
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key
@Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪一個分區中監聽到的
@Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName
@Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳
@KafkaListener(id = "anno", topics = "topic.quick.anno") public void annoListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.quick.anno receive : \n"+ "data : "+data+"\n"+ "key : "+key+"\n"+ "partitionId : "+partition+"\n"+ "topic : "+topic+"\n"+ "timestamp : "+ts+"\n" ); }
監聽容器編寫好了,那就寫個測試方法測試一下。
啓動測試後能夠看到監聽方法成功的把咱們所須要的數據提取出來了,說明這段代碼也是ojbk的。
@Test public void testAnno() throws InterruptedException { Map map = new HashMap<>(); map.put(KafkaHeaders.TOPIC, "topic.quick.anno"); map.put(KafkaHeaders.MESSAGE_KEY, 0); map.put(KafkaHeaders.PARTITION_ID, 0); map.put(KafkaHeaders.TIMESTAMP, System.currentTimeMillis()); kafkaTemplate.send(new GenericMessage<>("test anno listener", map)); }
2018-09-11 15:27:47.108 INFO 7592 --- [ anno-0-C-1] com.viu.kafka.listen.SingleListener : topic.quick.anno receive : data : test anno listener key : 0 partitionId : 0 topic : topic.quick.anno timestamp : 1536650867015
8. 使用Ack機制確認消費
Kafka的Ack機制相對於RabbitMQ的Ack機制差異比較大,剛入門Kafka的時候我也被搞蒙了,不過能弄清楚Kafka是怎麼消費消息的就能理解Kafka的Ack機制了
我先說說RabbitMQ的Ack機制,RabbitMQ的消費能夠說是一次性的,也就是你確認消費後就馬上從硬盤或內存中刪除,
並且RabbitMQ粗糙點來講是順序消費,像排隊同樣,一個個順序消費,未被確認的消息則會從新回到隊列中,等待監聽器再次消費。
但Kafka不一樣,Kafka是經過最新保存偏移量進行消息消費的,並且確認消費的消息並不會馬上刪除,因此咱們能夠重複的消費未被刪除的數據,
當第一條消息未被確認,而第二條消息被確認的時候,Kafka會保存第二條消息的偏移量,
也就是說第一條消息不再會被監聽器所獲取,除非是根據第一條消息的偏移量手動獲取。
使用Kafka的Ack機制比較簡單,只需簡單的三步便可:
設置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交
設置AckMode=MANUAL_IMMEDIATE
監聽方法加入Acknowledgment ack 參數
怎麼拒絕消息呢,只要在監聽方法中不調用ack.acknowledge()便可
@Component public class AckListener { private static final Logger log= LoggerFactory.getLogger(AckListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); return factory; } @KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack) { log.info("topic.quick.ack receive : " + record.value()); ack.acknowledge(); } }
編寫測試方法,運行後能夠方法監聽方法能收到消息,緊接着註釋ack.acknowledge()方法,從新測試,
一樣你會發現監聽容器能接收到消息,這個時候若是你重啓項目仍是能夠看到未被確認的那幾條消息。
@Test public void testAck() throws InterruptedException { for (int i = 0; i < 5; i++) { kafkaTemplate.send("topic.quick.ack", i+""); } }
在這段章節開頭之初我就講解了Kafka機制會出現的一些狀況,致使沒辦法重複消費未被Ack的消息,解決辦法有以下:
從新將消息發送到隊列中,這種方式比較簡單並且可使用Headers實現第幾回消費的功能,用如下次判斷
@KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //若是偏移量爲偶數則確認消費,不然拒絕消費 if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); kafkaTemplate.send("topic.quick.ack", record.value()); } }
使用Consumer.seek方法,從新回到該未ack消息偏移量的位置從新消費,這種可能會致使死循環,
緣由出現於業務一直沒辦法處理這條數據,但仍是不停的從新定位到該數據的偏移量上。
@KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //若是偏移量爲偶數則確認消費,不然拒絕消費 if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); consumer.seek(new TopicPartition("topic.quick.ack",record.partition()),record.offset() ); } }