在kafka的消費者中,有一個很是關鍵的機制,那就是offset機制。它使得Kafka在消費的過程當中即便掛了或者引起再均衡問題從新分配Partation,當下次從新恢復消費時仍然能夠知道從哪裏開始消費。它比如看一本書中的書籤標記,每次經過書籤標記(offset)就能快速找到該從哪裏開始看(消費)。算法
Kafka對於offset的處理有兩種提交方式:(1) 自動提交(默認的提交方式) (2) 手動提交(能夠靈活地控制offset)bootstrap
(1) 自動提交偏移量:異步
Kafka中偏移量的自動提交是由參數enable_auto_commit和auto_commit_interval_ms控制的,當enable_auto_commit=True時,Kafka在消費的過程當中會以頻率爲auto_commit_interval_ms向Kafka自帶的topic(__consumer_offsets)進行偏移量提交,具體提交到哪一個Partation是以算法:partation=hash(group_id)%50來計算的。async
如:group_id=test_group_1,則partation=hash("test_group_1")%50=28函數
自動提交偏移量示例:測試
1 import pickle 2 import uuid 3 from kafka import KafkaConsumer 4
5 consumer = KafkaConsumer( 6 bootstrap_servers=['192.168.33.11:9092'], 7 group_id="test_group_1", 8 client_id="{}".format(str(uuid.uuid4())), 9 max_poll_records=500, 10 enable_auto_commit=True, # 默認爲True 表示自動提交偏移量
11 auto_commit_interval_ms=100, # 控制自動提交偏移量的頻率 單位ms 默認是5000ms
12 key_deserializer=lambda k: pickle.loads(k), 13 value_deserializer=lambda v: pickle.loads(v) 14 ) 15
16 # 訂閱消費round_topic這個主題
17 consumer.subscribe(topics=('round_topic',)) 18
19 try: 20 while True: 21 consumer_records_dict = consumer.poll(timeout_ms=1000) 22
23 # consumer.assignment()能夠獲取每一個分區的offset
24 for partition in consumer.assignment(): 25 print('主題:{} 分區:{},須要從下面的offset開始消費:{}'.format( 26 str(partition.topic), 27 str(partition.partition), 28 consumer.position(partition) 29 )) 30
31 # 處理邏輯.
32 for k, record_list in consumer_records_dict.items(): 33 print(k) 34 for record in record_list: 35 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 36 record.topic, record.partition, record.offset, record.key, record.value) 37 ) 38
39 finally: 40 # 調用close方法的時候會觸發偏移量的自動提交 close默認autocommit=True
41 consumer.close()
返回結果:ui
在上述代碼中,最後調用consumer.close()時候也會觸發自動提交,由於它默認autocommit=True,源碼以下:this
1 def close(self, autocommit=True): 2 """Close the consumer, waiting indefinitely for any needed cleanup. 3
4 Keyword Arguments: 5 autocommit (bool): If auto-commit is configured for this consumer, 6 this optional flag causes the consumer to attempt to commit any 7 pending consumed offsets prior to close. Default: True 8 """
9 if self._closed: 10 return
11 log.debug("Closing the KafkaConsumer.") 12 self._closed = True 13 self._coordinator.close(autocommit=autocommit) 14 self._metrics.close() 15 self._client.close() 16 try: 17 self.config['key_deserializer'].close() 18 except AttributeError: 19 pass
20 try: 21 self.config['value_deserializer'].close() 22 except AttributeError: 23 pass
24 log.debug("The KafkaConsumer has closed.")
對於自動提交偏移量,若是auto_commit_interval_ms的值設置的過大,當消費者在自動提交偏移量以前異常退出,將致使kafka未提交偏移量,進而出現重複消費的問題,因此建議auto_commit_interval_ms的值越小越好。spa
(2) 手動提交偏移量:線程
鑑於Kafka自動提交offset的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交offset策略。手動提交能對偏移量更加靈活精準地控制,以保證消息不被重複消費以及消息不被丟失。
對於手動提交offset主要有3種方式:1.同步提交 2.異步提交 3.異步+同步 組合的方式提交
1.同步手動提交偏移量
同步模式下提交失敗的時候一直嘗試提交,直到遇到沒法重試的狀況下才會結束,同時同步方式下消費者線程在拉取消息會被阻塞,在broker對提交的請求作出響應以前,會一直阻塞直到偏移量提交操做成功或者在提交過程當中發生異常,限制了消息的吞吐量。
1 """
2 同步的方式10W條消息 4.58s 3 """
4
5 import pickle 6 import uuid 7 import time 8 from kafka import KafkaConsumer 9
10 consumer = KafkaConsumer( 11 bootstrap_servers=['192.168.33.11:9092'], 12 group_id="test_group_1", 13 client_id="{}".format(str(uuid.uuid4())), 14 enable_auto_commit=False, # 設置爲手動提交偏移量.
15 key_deserializer=lambda k: pickle.loads(k), 16 value_deserializer=lambda v: pickle.loads(v) 17 ) 18
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',)) 21
22 try: 23 start_time = time.time() 24 while True: 25 consumer_records_dict = consumer.poll(timeout_ms=100) # 在輪詢中等待的毫秒數
26 print("獲取下一輪") 27
28 record_num = 0 29 for key, record_list in consumer_records_dict.items(): 30 for record in record_list: 31 record_num += 1
32 print("---->當前批次獲取到的消息個數是:{}<----".format(record_num)) 33 record_num = 0 34
35 for k, record_list in consumer_records_dict.items(): 36 for record in record_list: 37 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 38 record.topic, record.partition, record.offset, record.key, record.value) 39 ) 40
41 try: 42 # 輪詢一個batch 手動提交一次
43 consumer.commit() # 提交當前批次最新的偏移量. 會阻塞 執行完後纔會下一輪poll
44 end_time = time.time() 45 time_counts = end_time - start_time 46 print(time_counts) 47 except Exception as e: 48 print('commit failed', str(e)) 49
50 finally: 51 consumer.close() # 手動提交中close對偏移量提交沒有影響
從上述能夠看出,每輪循一個批次,手動提交一次,只有當前批次的消息提交完成時纔會觸發poll來獲取下一輪的消息,經測試10W條消息耗時4.58s
2.異步手動提交偏移量+回調函數
異步手動提交offset時,消費者線程不會阻塞,提交失敗的時候也不會進行重試,而且能夠配合回調函數在broker作出響應的時候記錄錯誤信息。
1 """
2 異步的方式手動提交偏移量(異步+回調函數的模式) 10W條消息 3.09s 3 """
4
5 import pickle 6 import uuid 7 import time 8 from kafka import KafkaConsumer 9
10 consumer = KafkaConsumer( 11 bootstrap_servers=['192.168.33.11:9092'], 12 group_id="test_group_1", 13 client_id="{}".format(str(uuid.uuid4())), 14 enable_auto_commit=False, # 設置爲手動提交偏移量.
15 key_deserializer=lambda k: pickle.loads(k), 16 value_deserializer=lambda v: pickle.loads(v) 17 ) 18
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',)) 21
22
23 def _on_send_response(*args, **kwargs): 24 """
25 提交偏移量涉及回調函數 26 :param args: args[0] --> {TopicPartition:OffsetAndMetadata} args[1] --> Exception 27 :param kwargs: 28 :return: 29 """
30 if isinstance(args[1], Exception): 31 print('偏移量提交異常. {}'.format(args[1])) 32 else: 33 print('偏移量提交成功') 34
35
36 try: 37 start_time = time.time() 38 while True: 39 consumer_records_dict = consumer.poll(timeout_ms=10) 40
41 record_num = 0 42 for key, record_list in consumer_records_dict.items(): 43 for record in record_list: 44 record_num += 1
45 print("當前批次獲取到的消息個數是:{}".format(record_num)) 46
47 for record_list in consumer_records_dict.values(): 48 for record in record_list: 49 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 50 record.topic, record.partition, record.offset, record.key, record.value)) 51
52 # 避免頻繁提交
53 if record_num != 0: 54 try: 55 consumer.commit_async(callback=_on_send_response) 56 except Exception as e: 57 print('commit failed', str(e)) 58
59 record_num = 0 60
61 finally: 62 consumer.close()
對於args參數:args[0]是一個dict,key是TopicPartition,value是OffsetAndMetadata,表示該主題下的partition對應的offset;args[1]在提交成功是True,提交失敗時是一個Exception類。
對於異步提交,因爲不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,若是偏移量還未提交就會形成偏移量丟失。
3.異步+同步 組合的方式提交偏移量
針對異步提交偏移量丟失的問題,經過對消費者進行異步批次提交而且在關閉時同步提交的方式,這樣即便上一次的異步提交失敗,經過同步提交還可以進行補救,同步會一直重試,直到提交成功。
1 """
2 同步和異步組合的方式提交偏移量 3 """
4
5 import pickle 6 import uuid 7 import time 8 from kafka import KafkaConsumer 9
10 consumer = KafkaConsumer( 11 bootstrap_servers=['192.168.33.11:9092'], 12 group_id="test_group_1", 13 client_id="{}".format(str(uuid.uuid4())), 14 enable_auto_commit=False, # 設置爲手動提交偏移量.
15 key_deserializer=lambda k: pickle.loads(k), 16 value_deserializer=lambda v: pickle.loads(v) 17 ) 18
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',)) 21
22
23 def _on_send_response(*args, **kwargs): 24 """
25 提交偏移量涉及的回調函數 26 :param args: 27 :param kwargs: 28 :return: 29 """
30 if isinstance(args[1], Exception): 31 print('偏移量提交異常. {}'.format(args[1])) 32 else: 33 print('偏移量提交成功') 34
35
36 try: 37 start_time = time.time() 38 while True: 39 consumer_records_dict = consumer.poll(timeout_ms=100) 40
41 record_num = 0 42 for key, record_list in consumer_records_dict.items(): 43 for record in record_list: 44 record_num += 1
45 print("---->當前批次獲取到的消息個數是:<----".format(record_num)) 46 record_num = 0 47
48 for k, record_list in consumer_records_dict.items(): 49 print(k) 50 for record in record_list: 51 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 52 record.topic, record.partition, record.offset, record.key, record.value) 53 ) 54
55 try: 56 # 輪詢一個batch 手動提交一次
57 consumer.commit_async(callback=_on_send_response) 58 end_time = time.time() 59 time_counts = end_time - start_time 60 print(time_counts) 61 except Exception as e: 62 print('commit failed', str(e)) 63
64 except Exception as e: 65 print(str(e)) 66 finally: 67 try: 68 # 同步提交偏移量,在消費者異常退出的時候再次提交偏移量,確保偏移量的提交.
69 consumer.commit() 70 print("同步補救提交成功") 71 except Exception as e: 72 consumer.close()
經過finally在最後無論是否異常都會觸發consumer.commit()來同步補救一次,確保偏移量不會丟失