Kafka目前主要做爲一個分佈式的發佈訂閱式的消息系統使用,也是目前最流行的消息隊列系統之一。所以,也愈來愈多的框架對kafka作了集成,好比本文將要說到的spring-kafka。spring
Kafka既然做爲一個消息發佈訂閱系統,就包括消息生成者和消息消費者。本文主要講述的spring-kafka框架的kafkaListener註解的深刻解讀和使用案例。數組
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(KafkaListeners.class) public @interface KafkaListener { /** * 消費者的id,當GroupId沒有被配置的時候,默認id爲GroupId */ String id() default ""; /** * 監聽容器工廠,當監聽時須要區分單數據仍是多數據消費須要配置containerFactory 屬性 */ String containerFactory() default ""; /** * 須要監聽的Topic,可監聽多個,和 topicPattern 屬性互斥 */ String[] topics() default {}; /** * 須要監聽的Topic的正則表達。和 topics,topicPartitions屬性互斥 */ String topicPattern() default ""; /** * 可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset爲200的偏移量開始監聽,可配置該參數, 和 topicPattern 屬性互斥 */ TopicPartition[] topicPartitions() default {}; /** *偵聽器容器組 */ String containerGroup() default ""; /** * 監聽異常處理器,配置BeanName */ String errorHandler() default ""; /** * 消費組ID */ String groupId() default ""; /** * id是否爲GroupId */ boolean idIsGroup() default true; /** * 消費者Id前綴 */ String clientIdPrefix() default ""; /** * 真實監聽容器的BeanName,須要在 BeanName前加 "__" */ String beanRef() default "__listener"; }
使用ConsumerRecord類接收有必定的好處,ConsumerRecord類裏面包含分區信息、消息頭、消息體等內容,若是業務須要獲取這些參數時,使用ConsumerRecord會是個不錯的選擇。若是使用具體的類型接收消息體則更加方便,好比說用String類型去接收消息體。服務器
這裏咱們編寫一個Listener方法,監聽"topic1"Topic,並把ConsumerRecord裏面所包含的內容打印到控制檯中:併發
@Component public class Listener { private static final Logger log = LoggerFactory.getLogger(Listener.class); @KafkaListener(id = "consumer", topics = "topic1") public void consumerListener(ConsumerRecord<Integer, String> record) { log.info("topic.quick.consumer receive : " + record.toString()); } }
批量消費在現實業務場景中是頗有實用性的。由於批量消費能夠增大kafka消費吞吐量,提升性能。app
批量消費實現步驟:框架
一、從新建立一份新的消費者配置,配置爲一次拉取10條消息異步
二、建立一個監聽容器工廠,命名爲:batchContainerFactory,設置其爲批量消費並設置併發量爲5,這個併發量根據分區數決定,必須小於等於分區數,不然會有線程一直處於空閒狀態。分佈式
三、建立一個分區數爲8的Topic。ide
四、建立監聽方法,設置消費id爲「batchConsumer」,clientID前綴爲「batch」,監聽「batch」,使用「batchContainerFactory」工廠建立該監聽容器。函數
@Component public class BatchListener { private static final Logger log= LoggerFactory.getLogger(BatchListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); //一次拉取消息數量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, NumberDeserializers.IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); //設置併發量,小於或等於Topic的分區數 container.setConcurrency(5); //必須 設置爲批量監聽 container.setBatchListener(true); return container; } @Bean public NewTopic batchTopic() { return new NewTopic("topic.batch", 8, (short) 1); } @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch" ,topics = {"topic.batch"},containerFactory = "batchContainerFactory") public void batchListener(List<String> data) { log.info("topic.batch receive : "); for (String s : data) { log.info( s); } } }
使用@KafkaListener註解的topicPartitions屬性監聽不一樣的partition分區。
@TopicPartition:topic--須要監聽的Topic的名稱,partitions --須要監聽Topic的分區id。
partitionOffsets --能夠設置從某個偏移量開始監聽,@PartitionOffset:partition --分區Id,非數組,initialOffset --初始偏移量。
@Bean public NewTopic batchWithPartitionTopic() { return new NewTopic("topic.batch.partition", 8, (short) 1); } @KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory", topicPartitions = { @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}), @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"}, partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100")) } ) public void batchListenerWithPartition(List<String> data) { log.info("topic.batch.partition receive : "); for (String s : data) { log.info(s); } }
當你接收的消息包含請求頭,以及你監聽方法須要獲取該消息很是多的字段時能夠經過這種方式。。這裏使用的是默認的監聽容器工廠建立的,若是你想使用批量消費,把對應的類型改成List便可,好比List<String> data , List<Integer> key。
@Payload:獲取的是消息的消息體,也就是發送內容
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key
@Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪一個分區中監聽到的
@Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName
@Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳
@KafkaListener(id = "params", topics = "topic.params") public void otherListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.params receive : \n"+ "data : "+data+"\n"+ "key : "+key+"\n"+ "partitionId : "+partition+"\n"+ "topic : "+topic+"\n"+ "timestamp : "+ts+"\n" ); }
Kafka是經過最新保存偏移量進行消息消費的,並且確認消費的消息並不會馬上刪除,因此咱們能夠重複的消費未被刪除的數據,當第一條消息未被確認,而第二條消息被確認的時候,Kafka會保存第二條消息的偏移量,也就是說第一條消息不再會被監聽器所獲取,除非是根據第一條消息的偏移量手動獲取。Kafka的ack 機制能夠有效的確保消費不被丟失。由於自動提交是在kafka拉取到數據以後就直接提交,這樣很容易丟失數據,尤爲是在須要事物控制的時候。
使用Kafka的Ack機制比較簡單,只需簡單的三步便可:
4.使用Consumer.seek方法,能夠指定到某個偏移量的位置
@Component public class AckListener { private static final Logger log = LoggerFactory.getLogger(AckListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); return factory; } @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack) { log.info("topic.quick.ack receive : " + record.value()); ack.acknowledge(); } }
上一節中使用ack手動提交偏移量時,假如consumer掛了重啓,那它將從committed offset位置開始從新消費,而不是consume offset位置。這也就意味着有可能重複消費。
在0.9客戶端中,有3種ack策略:
策略1: 自動的,週期性的ack。
策略2:consumer.commitSync(),調用commitSync,手動同步ack。每處理完1條消息,commitSync 1次。
策略3:consumer. commitASync(),手動異步ack。、
那麼使用策略2,提交每處理完1條消息,就發送一次commitSync。那這樣是否是就能夠解決「重複消費」了呢?以下代碼:
while (true) { List<ConsumerRecord> buffer = new ArrayList<>(); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } insertIntoDb(buffer); //消除處理,存到db consumer.commitSync(); //同步發送ack buffer.clear(); } }
答案是否認的!由於上面的insertIntoDb和commitSync作不到原子操做:若是在數據處理完成,commitSync的時候掛了,服務器再次重啓,消息仍然會重複消費。
那麼如何解決重複消費的問題呢?答案是本身保存committed offset,而不是依賴kafka的集羣保存committed offset,把消息的處理和保存offset作成一個原子操做,而且對消息加入惟一id,進行判重。
依照官方文檔,要本身保存偏移量,須要: