Kafka生產者發送消息的三種方式

Kafka是一種分佈式的基於發佈/訂閱的消息系統,它的高吞吐量、靈活的offset是其它消息系統所沒有的。bootstrap

Kafka發送消息主要有三種方式:服務器

1.發送並忘記 2.同步發送 3.異步發送+回調函數網絡

 

下面以單節點的方式分別用三種方法發送1w條消息測試:異步

方式一:發送並忘記(不關心消息是否正常到達,對返回結果不作任何判斷處理)分佈式

發送並忘記的方式本質上也是一種異步的方式,只是它不會獲取消息發送的返回結果,這種方式的吞吐量是最高的,可是沒法保證消息的可靠性:函數

 1 import pickle  2 import time  3 from kafka import KafkaProducer  4 
 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'],  6                          key_serializer=lambda k: pickle.dumps(k),  7                          value_serializer=lambda v: pickle.dumps(v))  8 
 9 start_time = time.time() 10 for i in range(0, 10000): 11     print('------{}---------'.format(i)) 12     future = producer.send('test_topic', key='num', value=i, partition=0) 13 
14 # 將緩衝區的所有消息push到broker當中
15 producer.flush() 16 producer.close() 17 
18 end_time = time.time() 19 time_counts = end_time - start_time 20 print(time_counts)

 測試結果:1.88s測試

 

方式二:同步發送(經過get方法等待Kafka的響應,判斷消息是否發送成功)spa

以同步的方式發送消息時,一條一條的發送,對每條消息返回的結果判斷, 能夠明確地知道每條消息的發送狀況,可是因爲同步的方式會阻塞,只有當消息經過get返回future對象時,纔會繼續下一條消息的發送:日誌

 

 1 import pickle  2 import time  3 from kafka import KafkaProducer  4 from kafka.errors import kafka_errors  5 
 6 producer = KafkaProducer(  7     bootstrap_servers=['192.168.33.11:9092'],  8     key_serializer=lambda k: pickle.dumps(k),  9     value_serializer=lambda v: pickle.dumps(v) 10 ) 11 
12 start_time = time.time() 13 for i in range(0, 10000): 14     print('------{}---------'.format(i)) 15     future = producer.send(topic="test_topic", key="num", value=i) 16     # 同步阻塞,經過調用get()方法進而保證必定程序是有序的.
17     try: 18         record_metadata = future.get(timeout=10) 19         # print(record_metadata.topic)
20         # print(record_metadata.partition)
21         # print(record_metadata.offset)
22     except kafka_errors as e: 23         print(str(e)) 24 
25 end_time = time.time() 26 time_counts = end_time - start_time 27 print(time_counts)

 

測試結果:16scode

 

方式三:異步發送+回調函數(消息以異步的方式發送,經過回調函數返回消息發送成功/失敗)

在調用send方法發送消息的同時,指定一個回調函數,服務器在返回響應時會調用該回調函數,經過回調函數可以對異常狀況進行處理,當調用了回調函數時,只有回調函數執行完畢生產者纔會結束,不然一直會阻塞:

 1 import pickle  2 import time  3 from kafka import KafkaProducer  4 
 5 producer = KafkaProducer(  6     bootstrap_servers=['192.168.33.11:9092'],  7     key_serializer=lambda k: pickle.dumps(k),  8     value_serializer=lambda v: pickle.dumps(v)  9 ) 10 
11 
12 def on_send_success(*args, **kwargs): 13     """
14  發送成功的回調函數 15  :param args: 16  :param kwargs: 17  :return: 18     """
19     return args 20 
21 
22 def on_send_error(*args, **kwargs): 23     """
24  發送失敗的回調函數 25  :param args: 26  :param kwargs: 27  :return: 28     """
29 
30     return args 31 
32 
33 start_time = time.time() 34 for i in range(0, 10000): 35     print('------{}---------'.format(i)) 36     # 若是成功,傳進record_metadata,若是失敗,傳進Exception.
37  producer.send( 38         topic="test_topic", key="num", value=i 39  ).add_callback(on_send_success).add_errback(on_send_error) 40 
41 producer.flush() 42 producer.close() 43 
44 end_time = time.time() 45 time_counts = end_time - start_time 46 print(time_counts)

測試結果:2.15s

 

三種方式雖然在時間上有所差異,但並非說時間越快的越好,具體要看業務的應用場景:

場景1:若是業務要求消息必須是按順序發送的,那麼可使用同步的方式,而且只能在一個partation上,結合參數設置retries的值讓發送失敗時重試,設置max_in_flight_requests_per_connection=1,能夠控制生產者在收到服務器晌應以前只能發送1個消息,從而控制消息順序發送;

場景2:若是業務只關心消息的吞吐量,允許少許消息發送失敗,也不關注消息的發送順序,那麼可使用發送並忘記的方式,並配合參數acks=0,這樣生產者不須要等待服務器的響應,以網絡能支持的最大速度發送消息;

場景3:若是業務須要知道消息發送是否成功,而且對消息的順序不關心,那麼能夠用異步+回調的方式來發送消息,配合參數retries=0,並將發送失敗的消息記錄到日誌文件中;

相關文章
相關標籤/搜索