項目中有一個需求,是經過消費kafka的消息來處理數據,可是想要實現延遲消費的效果,因而想到了是否能夠本身管理kafka的commit來實現,就是經過設置`enable.auto.commit`爲False,預期是若是消費到了消息,可是不commit,kafka就會從新把消息放回隊列,後續還會再次消費到,直到超過設置的延遲時間再真正消費並commit。python
因而寫了個demo來驗證,結果發現這個配置的效果並非本身想要的。bootstrap
生產者每秒鐘向kafka的topic發送一條消息。spa
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from confluent_kafka import Producer, KafkaError from confluent_kafka import TopicPartition from confluent_kafka import OFFSET_BEGINNING p = Producer({'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094'}) topic = 'nico-test' msg_tpl = 'hello kafka:{0}' while True: msg = msg_tpl.format(time.time()) p.produce(topic, msg) print('Produce msg:{0}'.format(msg)) time.sleep(1) p.flush()
消費者設置了配置項enable.auto.commit:False。設計
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from confluent_kafka import Consumer, KafkaError from confluent_kafka import TopicPartition from confluent_kafka import OFFSET_BEGINNING c = Consumer({ 'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094', 'group.id':'nico-test', 'auto.offset.reset':'earliest', 'enable.auto.commit':False }) topic = 'nico-test' c.subscribe([topic]) cd = c.list_topics() print(cd.cluster_id) print(cd.controller_id) print(cd.brokers) print(cd.topics) print(cd.orig_broker_id) print(cd.orig_broker_name) while True: msg = c.poll(1.0) if msg is None: continue print('topic:{topic}, partition:{partition}, offset:{offset}, headers:{headers}, key:{key}, msg:{msg}, timestamp:{timestamp}'.format(topic=msg.topic(), msg=msg.value(), headers=msg.headers(), key=msg.key(), offset=msg.offset(), partition=msg.partition(), timestamp=msg.timestamp()))
結果是consumer啓動後會一直順序的消費消息,而且並不會把消息重放到隊列中,可是當consumer被kill掉重啓時,每次都是從最開始的時候消費的,因此總結一下,該配置項的做用是當配置爲true時,每次獲取到消息後就會自動更新存儲在zookepper中的offset值。code
最後本身也想了一下,這裏不支持延遲消費的緣由其實和kafka的實現原理有很大的關係,kafka是直接把消息存儲在磁盤文件中的,若是想要實現重放(支持延遲消費)那麼就須要把該消息從消息隊列中刪除,而後從新插入到消息隊列,那這樣就跟kafka的設計相違背了。orm