本次實驗是在Mac下操做,使用kafka以前先安裝kafka,具體操做以下:python
brew install kafkagit
(1) 安裝過程將依賴安裝 jdk8,zookeeper(jdk8沒有,須要單獨安裝)github
(2) kafka安裝位置bootstrap
/usr/local/Cellar/zookeeper服務器
/usr/local/Cellar/kafkaspa
(3) 配置文件位置server
/usr/local/etc/kafka/zookeeper.properties (默認端口2181)utf-8
/usr/local/etc/kafka/server.properties (默認端口9092)get
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &kafka
kafka-server-start /usr/local/etc/kafka/server.properties &
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics --list --zookeeper localhost:2181
kafka-console-producer --broker-list localhost:9092 --topic test
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
備註:--from-beginning 將從第一個消息開始接收
# -*- coding: utf-8 -*- from kafka import KafkaProducer """ 生產者模塊 """ class Producer(object): def __init__(self, kafka_hosts, kafka_topic): self.kafka_topic = kafka_topic # bootstrap_servers 能夠是多臺服務器 self.producer = KafkaProducer(bootstrap_servers=[kafka_hosts]) self.send() def send(self): future = self.producer.send('my_topic', key=b'my_key', value=b'my_value', partition=0) result = future.get(timeout=10) print(result) if __name__ == '__main__': p = Producer('localhost:9092','my_topic')
# -*- coding: utf-8 -*- from kafka import KafkaConsumer """ 消費者模塊 """ class Consumer(object): def __init__(self, kafka_hosts, kafka_topic): self.consumers = KafkaConsumer(kafka_topic,bootstrap_servers=[kafka_hosts]) def consume(self): for message in self.consumers: print(message) if __name__ == '__main__': c = Consumer('localhost:9092','my_topic') c.consume()
運行consumer.py : python3 consumer.py。而後運行 python3 producer.py,輸出以下:
➜ kafka_client git:(master) ✗ python3 consumer.py
ConsumerRecord(topic='my_topic', partition=0, offset=3, timestamp=1546670287366, timestamp_type=0, key=b'my_key', value=b'my_value', headers=[], checksum=None, serialized_key_size=6, serialized_value_size=8, serialized_header_size=-1)