Kafka提交offset機制

在kafka的消費者中,有一個很是關鍵的機制,那就是offset機制。它使得Kafka在消費的過程當中即便掛了或者引起再均衡問題從新分配Partation,當下次從新恢復消費時仍然能夠知道從哪裏開始消費。它比如看一本書中的書籤標記,每次經過書籤標記(offset)就能快速找到該從哪裏開始看(消費)。算法

Kafka對於offset的處理有兩種提交方式:(1) 自動提交(默認的提交方式)   (2) 手動提交(能夠靈活地控制offset)bootstrap

(1) 自動提交偏移量:異步

Kafka中偏移量的自動提交是由參數enable_auto_commit和auto_commit_interval_ms控制的,當enable_auto_commit=True時,Kafka在消費的過程當中會以頻率爲auto_commit_interval_ms向Kafka自帶的topic(__consumer_offsets)進行偏移量提交,具體提交到哪一個Partation是以算法:partation=hash(group_id)%50來計算的。async

如:group_id=test_group_1,則partation=hash("test_group_1")%50=28函數

自動提交偏移量示例:測試

 1 import pickle  2 import uuid  3 from kafka import KafkaConsumer  4 
 5 consumer = KafkaConsumer(  6     bootstrap_servers=['192.168.33.11:9092'],  7     group_id="test_group_1",  8     client_id="{}".format(str(uuid.uuid4())),  9     max_poll_records=500, 10     enable_auto_commit=True,  # 默認爲True 表示自動提交偏移量
11     auto_commit_interval_ms=100,  # 控制自動提交偏移量的頻率 單位ms 默認是5000ms
12     key_deserializer=lambda k: pickle.loads(k), 13     value_deserializer=lambda v: pickle.loads(v) 14 ) 15 
16 # 訂閱消費round_topic這個主題
17 consumer.subscribe(topics=('round_topic',)) 18 
19 try: 20     while True: 21         consumer_records_dict = consumer.poll(timeout_ms=1000) 22 
23         # consumer.assignment()能夠獲取每一個分區的offset
24         for partition in consumer.assignment(): 25             print('主題:{} 分區:{},須要從下面的offset開始消費:{}'.format( 26  str(partition.topic), 27  str(partition.partition), 28  consumer.position(partition) 29  )) 30 
31         # 處理邏輯.
32         for k, record_list in consumer_records_dict.items(): 33             print(k) 34             for record in record_list: 35                 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 36  record.topic, record.partition, record.offset, record.key, record.value) 37  ) 38 
39 finally: 40     # 調用close方法的時候會觸發偏移量的自動提交 close默認autocommit=True
41     consumer.close()

 返回結果:ui

在上述代碼中,最後調用consumer.close()時候也會觸發自動提交,由於它默認autocommit=True,源碼以下:this

 1     def close(self, autocommit=True):  2         """Close the consumer, waiting indefinitely for any needed cleanup.  3 
 4  Keyword Arguments:  5  autocommit (bool): If auto-commit is configured for this consumer,  6  this optional flag causes the consumer to attempt to commit any  7  pending consumed offsets prior to close. Default: True  8         """
 9         if self._closed: 10             return
11         log.debug("Closing the KafkaConsumer.") 12         self._closed = True 13         self._coordinator.close(autocommit=autocommit) 14  self._metrics.close() 15  self._client.close() 16         try: 17             self.config['key_deserializer'].close() 18         except AttributeError: 19             pass
20         try: 21             self.config['value_deserializer'].close() 22         except AttributeError: 23             pass
24         log.debug("The KafkaConsumer has closed.")

 

對於自動提交偏移量,若是auto_commit_interval_ms的值設置的過大,當消費者在自動提交偏移量以前異常退出,將致使kafka未提交偏移量,進而出現重複消費的問題,因此建議auto_commit_interval_ms的值越小越好。spa

 

(2) 手動提交偏移量:線程

鑑於Kafka自動提交offset的不靈活性和不精確性(只能是按指定頻率的提交),Kafka提供了手動提交offset策略。手動提交能對偏移量更加靈活精準地控制,以保證消息不被重複消費以及消息不被丟失。

對於手動提交offset主要有3種方式:1.同步提交  2.異步提交  3.異步+同步 組合的方式提交

 1.同步手動提交偏移量

同步模式下提交失敗的時候一直嘗試提交,直到遇到沒法重試的狀況下才會結束,同時同步方式下消費者線程在拉取消息會被阻塞,在broker對提交的請求作出響應以前,會一直阻塞直到偏移量提交操做成功或者在提交過程當中發生異常,限制了消息的吞吐量。

 1 """
 2 同步的方式10W條消息 4.58s  3 """
 4 
 5 import pickle  6 import uuid  7 import time  8 from kafka import KafkaConsumer  9 
10 consumer = KafkaConsumer( 11     bootstrap_servers=['192.168.33.11:9092'], 12     group_id="test_group_1", 13     client_id="{}".format(str(uuid.uuid4())), 14     enable_auto_commit=False,  # 設置爲手動提交偏移量.
15     key_deserializer=lambda k: pickle.loads(k), 16     value_deserializer=lambda v: pickle.loads(v) 17 ) 18 
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',)) 21 
22 try: 23     start_time = time.time() 24     while True: 25         consumer_records_dict = consumer.poll(timeout_ms=100)  # 在輪詢中等待的毫秒數
26         print("獲取下一輪") 27 
28         record_num = 0 29         for key, record_list in consumer_records_dict.items(): 30             for record in record_list: 31                 record_num += 1
32         print("---->當前批次獲取到的消息個數是:{}<----".format(record_num)) 33         record_num = 0 34 
35         for k, record_list in consumer_records_dict.items(): 36             for record in record_list: 37                 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 38  record.topic, record.partition, record.offset, record.key, record.value) 39  ) 40 
41         try: 42             # 輪詢一個batch 手動提交一次
43             consumer.commit()  # 提交當前批次最新的偏移量. 會阻塞 執行完後纔會下一輪poll
44             end_time = time.time() 45             time_counts = end_time - start_time 46             print(time_counts) 47         except Exception as e: 48             print('commit failed', str(e)) 49 
50 finally: 51     consumer.close()  # 手動提交中close對偏移量提交沒有影響

 

從上述能夠看出,每輪循一個批次,手動提交一次,只有當前批次的消息提交完成時纔會觸發poll來獲取下一輪的消息,經測試10W條消息耗時4.58s

 2.異步手動提交偏移量+回調函數

 異步手動提交offset時,消費者線程不會阻塞,提交失敗的時候也不會進行重試,而且能夠配合回調函數在broker作出響應的時候記錄錯誤信息。

 

 1 """
 2 異步的方式手動提交偏移量(異步+回調函數的模式) 10W條消息 3.09s  3 """
 4 
 5 import pickle  6 import uuid  7 import time  8 from kafka import KafkaConsumer  9 
10 consumer = KafkaConsumer( 11     bootstrap_servers=['192.168.33.11:9092'], 12     group_id="test_group_1", 13     client_id="{}".format(str(uuid.uuid4())), 14     enable_auto_commit=False,  # 設置爲手動提交偏移量.
15     key_deserializer=lambda k: pickle.loads(k), 16     value_deserializer=lambda v: pickle.loads(v) 17 ) 18 
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',)) 21 
22 
23 def _on_send_response(*args, **kwargs): 24     """
25  提交偏移量涉及回調函數 26  :param args: args[0] --> {TopicPartition:OffsetAndMetadata} args[1] --> Exception 27  :param kwargs: 28  :return: 29     """
30     if isinstance(args[1], Exception): 31         print('偏移量提交異常. {}'.format(args[1])) 32     else: 33         print('偏移量提交成功') 34 
35 
36 try: 37     start_time = time.time() 38     while True: 39         consumer_records_dict = consumer.poll(timeout_ms=10) 40 
41         record_num = 0 42         for key, record_list in consumer_records_dict.items(): 43             for record in record_list: 44                 record_num += 1
45         print("當前批次獲取到的消息個數是:{}".format(record_num)) 46 
47         for record_list in consumer_records_dict.values(): 48             for record in record_list: 49                 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 50  record.topic, record.partition, record.offset, record.key, record.value)) 51 
52         # 避免頻繁提交
53         if record_num != 0: 54             try: 55                 consumer.commit_async(callback=_on_send_response) 56             except Exception as e: 57                 print('commit failed', str(e)) 58 
59         record_num = 0 60 
61 finally: 62     consumer.close()

對於args參數:args[0]是一個dict,key是TopicPartition,value是OffsetAndMetadata,表示該主題下的partition對應的offset;args[1]在提交成功是True,提交失敗時是一個Exception類。

對於異步提交,因爲不會進行失敗重試,當消費者異常關閉或者觸發了再均衡前,若是偏移量還未提交就會形成偏移量丟失。

 3.異步+同步 組合的方式提交偏移量

針對異步提交偏移量丟失的問題,經過對消費者進行異步批次提交而且在關閉時同步提交的方式,這樣即便上一次的異步提交失敗,經過同步提交還可以進行補救,同步會一直重試,直到提交成功。

 1 """
 2 同步和異步組合的方式提交偏移量  3 """
 4 
 5 import pickle  6 import uuid  7 import time  8 from kafka import KafkaConsumer  9 
10 consumer = KafkaConsumer( 11     bootstrap_servers=['192.168.33.11:9092'], 12     group_id="test_group_1", 13     client_id="{}".format(str(uuid.uuid4())), 14     enable_auto_commit=False,  # 設置爲手動提交偏移量.
15     key_deserializer=lambda k: pickle.loads(k), 16     value_deserializer=lambda v: pickle.loads(v) 17 ) 18 
19 # 訂閱消費round_topic這個主題
20 consumer.subscribe(topics=('round_topic',)) 21 
22 
23 def _on_send_response(*args, **kwargs): 24     """
25  提交偏移量涉及的回調函數 26  :param args: 27  :param kwargs: 28  :return: 29     """
30     if isinstance(args[1], Exception): 31         print('偏移量提交異常. {}'.format(args[1])) 32     else: 33         print('偏移量提交成功') 34 
35 
36 try: 37     start_time = time.time() 38     while True: 39         consumer_records_dict = consumer.poll(timeout_ms=100) 40 
41         record_num = 0 42         for key, record_list in consumer_records_dict.items(): 43             for record in record_list: 44                 record_num += 1
45         print("---->當前批次獲取到的消息個數是:<----".format(record_num)) 46         record_num = 0 47 
48         for k, record_list in consumer_records_dict.items(): 49             print(k) 50             for record in record_list: 51                 print("topic = {},partition = {},offset = {},key = {},value = {}".format( 52  record.topic, record.partition, record.offset, record.key, record.value) 53  ) 54 
55         try: 56             # 輪詢一個batch 手動提交一次
57             consumer.commit_async(callback=_on_send_response) 58             end_time = time.time() 59             time_counts = end_time - start_time 60             print(time_counts) 61         except Exception as e: 62             print('commit failed', str(e)) 63 
64 except Exception as e: 65     print(str(e)) 66 finally: 67     try: 68         # 同步提交偏移量,在消費者異常退出的時候再次提交偏移量,確保偏移量的提交.
69  consumer.commit() 70         print("同步補救提交成功") 71     except Exception as e: 72         consumer.close()

經過finally在最後無論是否異常都會觸發consumer.commit()來同步補救一次,確保偏移量不會丟失

相關文章
相關標籤/搜索