python中kafka生產者和消費者實現

安裝kafka-python:python

C:\anaconda3\Scripts>pip install kafka-pythonjson

import datetimeimport jsonfrom kafka import KafkaProducerfrom kafka import KafkaConsumerfrom kafka.errors import KafkaError'''使用kafka-python的生產模塊'''class Kafka_producer(): def __init__(self, bootstrapServers, kafkaTopic): self.bootstrapServers = bootstrapServers self.kafkaTopic = kafkaTopic self.producer = KafkaProducer(bootstrap_servers=self.bootstrapServers) def sendjsondata(self, params): try: parmas_message = json.dumps(params) producer = self.producer future = producer.send(self.kafkaTopic, parmas_message.encode('utf-8')) producer.flush() recordMetadata = future.get(timeout=10) print(recordMetadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S')) except KafkaError as e: print(e)'''使用Kafka-python的消費模塊'''class Kafka_consumer(): def __init__(self, bootstrapServers, kafkaTopic, groupId): self.kafkaTopic = kafkaTopic self.bootstrapServers = bootstrapServers self.groupId = groupId self.consumer = KafkaConsumer(self.kafkaTopic, group_id=self.groupId, bootstrap_servers=self.bootstrapServers) def consume_data(self): try: for message in self.consumer: yield message except BaseException as e: print(e)if __name__ == '__main__': bootstrapServers = ['ip1:port1', 'ip2:port2', 'ip3:port3'] topicStr = '主題' print('-' * 20) print('生產者') print('-' * 20) producer = Kafka_producer(bootstrapServers, topicStr) for id in range(5): params = '{tst}:{null}---' + str(id) producer.sendjsondata(params) print('-' * 20) print('消費者') print('-' * 20) groupId = 'group名稱' consumer = Kafka_consumer(bootstrapServers, topicStr, groupId) message = consumer.consume_data() for i in message: print(i.value)
相關文章
相關標籤/搜索