Kafka 入門(四)-- Python Kafka Client 性能測試

1、前言

  因爲工做緣由使用到了 Kafka,而現有的代碼並不能知足性能需求,因此須要開發高效讀寫 Kafka 的工具,本文是一個 Python Kafka Client 的性能測試記錄,經過本次測試,能夠知道選用什麼第三方庫的性能最高,選用什麼編程模型開發出來的工具效率最高。python

 

2、第三方庫性能測試

1.第三方庫

  這次測試的是三個主要的 Python Kafka Client:pykafka、kafka-python 和 confluent-kafka,具體介紹見官網:編程

2.測試環境

       這次測試使用的 Python 版本是2.7,第三方庫的版本爲:bootstrap

  • pykafka:2.8.0
  • kafka-python:2.0.2
  • confluent-kafka:1.5.0

       使用的數據總量有50萬,每條數據大小爲2KB,總共爲966MB。api

3.測試過程

(1)Kafka Producer 測試session

  分別使用 pykafka、kafka-python 和 confluent-kafka 實例化一個 Kafka 的 Producer 對象,而後調用相應的 produce 方法將數據推送給 Kafka,數據總條數爲50萬,比較三個庫所耗費的時間,並計算每秒鐘能夠推送的數據條數和大小,比較得出性能最優的。多線程

  代碼示例(以 pykafka 爲例):併發

 1 import sys
 2 from datetime import datetime
 3 from pykafka import KafkaClient
 4 
 5 
 6 class KafkaProducerTool():
 7     def __init__(self, broker, topic):
 8         client = KafkaClient(hosts=broker)
 9         self.topic = client.topics[topic]
10         self.producer = self.topic.get_producer()
11 
12     def send_msg(self, msg):
13         self.producer.produce(msg)
14 
15 
16 if __name__ == '__main__':
17     producer = KafkaProducerTool(broker, topic)
18     print(datetime.now())
19     for line in sys.stdin:
20         producer.send_msg(line.strip())
21     producer.producer.stop()
22     print(datetime.now())

(2)Kafka Consumer 測試ide

  分別使用 pykafka、kafka-python 和 confluent-kafka 實例化一個 Kafka 的 Consumer 對象,而後調用相應的 consume 方法從 Kafka 中消費數據,要消費下來的數據總條數爲50萬,比較三個庫所耗費的時間,並計算每秒鐘能夠消費的數據條數和大小,比較得出性能最優的。工具

  代碼示例(以 pykafka 爲例):性能

 1 from datetime import datetime
 2 from pykafka import KafkaClient
 3 
 4 
 5 class KafkaConsumerTool():
 6     def __init__(self, broker, topic):
 7         client = KafkaClient(hosts=broker)
 8         self.topic = client.topics[topic]
 9         self.consumer = self.topic.get_simple_consumer()
10 
11     def receive_msg(self):
12         count = 0
13         print(datetime.now())
14         while True:
15             msg = self.consumer.consume()
16             if msg:
17                 count += 1
18             if count == 500000:
19                 print(datetime.now())
20                 return
21 
22 
23 if __name__ == '__main__':
24     consumer = KafkaConsumerTool(broker, topic)
25     consumer.receive_msg()
26     consumer.consumer.stop()

4.測試結果

  • Kafka Producer 測試結果:
  總耗時/秒 每秒數據量/MB 每秒數據條數
confluent_kafka 35 27.90 14285.71
pykafka 50 19.53 10000
kafka-python 532 1.83 939.85
  • Kafka Consumer 測試結果:
  總耗時/秒 每秒數據量/MB 每秒數據條數
confluent_kafka 39 25.04 12820.51
kafka-python 52 18.78 9615.38
pykafka 335 2.92 1492.54

5.測試結論

  通過測試,在這次測試的三個庫中,生產消息的效率排名是:confluent-kafka > pykafka > kafka-python,消費消息的效率排名是:confluent-kafka > kafka-python > pykafka,因而可知 confluent-kafka 的性能是其中最優的,於是選用這個庫進行後續開發。

 

3、多線程模型性能測試

1.編程模型

  通過前面的測試已經知道 confluent-kafka 這個庫的性能是很優秀的了,但若是還須要更高的效率,應該怎麼辦呢?當單線程(或者單進程)不能知足需求時,咱們很容易想到使用多線程(或者多進程)來增長併發提升效率,考慮到線程的資源消耗比進程少,因此打算選用多線程來進行開發。那麼多線程消費 Kafka 有什麼實現方式呢?我想到的有兩種:

  1. 一個線程實現一個 Kafka Consumer,最多能夠有 n 個線程同時消費 Topic(其中 n 是該 Topic 下的分區數量);
  2. 多個線程共用一個 Kafka Consumer,此時也能夠實例化多個 Consumer 同時消費。

    

  對比這兩種多線程模型:

  • 模型1實現方便,能夠保證每一個分區有序消費,但 Partition 數量會限制消費能力;
  • 模型2併發度高,可擴展能力強,消費能力不受 Partition 限制。

 2.測試過程

(1)多線程模型1

  測試代碼:

 1 import time
 2 from threading import Thread
 3 from datetime import datetime
 4 from confluent_kafka import Consumer
 5 
 6 
 7 class ChildThread(Thread):
 8     def __init__(self, name, broker, topic):
 9         Thread.__init__(self, name=name)
10         self.con = KafkaConsumerTool(broker, topic)
11 
12     def run(self):
13         self.con.receive_msg()
14 
15 
16 class KafkaConsumerTool:
17     def __init__(self, broker, topic):
18         config = {
19             'bootstrap.servers': broker,
20             'session.timeout.ms': 30000,
21             'auto.offset.reset': 'earliest',
22             'api.version.request': False,
23             'broker.version.fallback': '2.6.0',
24             'group.id': 'test'
25         }
26         self.consumer = Consumer(config)
27         self.topic = topic
28 
29     def receive_msg(self):
30         self.consumer.subscribe([self.topic])
31         print(datetime.now())
32         while True:
33             msg = self.consumer.poll(timeout=30.0)
34             print(msg)
35 
36 
37 if __name__ == '__main__':
38     thread_num = 10
39     threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in range(thread_num)]
40 
41     for i in range(thread_num):
42         threads[i].setDaemon(True)
43     for i in range(thread_num):
44         threads[i].start()

  由於我使用的 Topic 共有8個分區,因此我分別測試了線程數在5個、8個和10個時消費50萬數據所須要的時間,並計算每秒可消費的數據條數。

(2)多線程模型2

  測試代碼:

 1 import time
 2 from datetime import datetime
 3 from confluent_kafka import Consumer
 4 from threadpool import ThreadPool, makeRequests
 5 
 6 
 7 class KafkaConsumerTool:
 8     def __init__(self, broker, topic):
 9         config = {
10             'bootstrap.servers': broker,
11             'session.timeout.ms': 30000,
12             'auto.offset.reset': 'earliest',
13             'api.version.request': False,
14             'broker.version.fallback': '2.6.0',
15             'group.id': 'mini-spider'
16         }
17         self.consumer = Consumer(config)
18         self.topic = topic
19 
20     def receive_msg(self, x):
21         self.consumer.subscribe([self.topic])
22         print(datetime.now())
23         while True:
24             msg = self.consumer.poll(timeout=30.0)
25             print(msg)
26 
27 
28 if __name__ == '__main__':
29     thread_num = 10
30     consumer = KafkaConsumerTool(broker, topic)
31     pool = ThreadPool(thread_num)
32     for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
33         pool.putRequest(r)
34     pool.wait()

  主要使用 threadpool 這個第三方庫來實現線程池,此處固然也能夠使用其餘庫來實現,這裏我分別測試了線程數量在5個和10個時消費50萬數據所須要的時間,並計算每秒可消費的數據條數。

3.測試結果

  • 多線程模型1
 總數據量/萬 線程數量 總耗時/秒 每秒數據條數
50 5 27 18518.51
50 8 24 20833.33
50 10 26 19230.76
  • 多線程模型2
  總數據量/萬 線程數量 總耗時/秒 每秒數據條數
50 5 17 29411.76
50 10 13 38461.53

4.測試結論

  使用多線程能夠有效提升 Kafka 的 Consumer 消費數據的效率,而選用線程池共用一個 KafkaConsumer 的消費方式的消費效率更高。

相關文章
相關標籤/搜索