kafka簡單使用

本次實驗是在Mac下操做,使用kafka以前先安裝kafka,具體操做以下:python

1. 安裝kafka

 brew install kafkagit

(1)  安裝過程將依賴安裝 jdk8,zookeeper(jdk8沒有,須要單獨安裝)github

(2)  kafka安裝位置bootstrap

/usr/local/Cellar/zookeeper服務器

/usr/local/Cellar/kafkaspa

(3)  配置文件位置server

/usr/local/etc/kafka/zookeeper.properties  (默認端口2181)utf-8

/usr/local/etc/kafka/server.properties  (默認端口9092)get

2. 啓動zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &kafka

3. 啓動kafka服務

kafka-server-start /usr/local/etc/kafka/server.properties &

4. 建立topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. 查看建立的topic

kafka-topics --list --zookeeper localhost:2181  

6. 生產數據

kafka-console-producer --broker-list localhost:9092 --topic test

7. 消費數據

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

備註:--from-beginning  將從第一個消息開始接收

8. Python下實現kafka

(1)producer實現,producer.py以下:

# -*- coding: utf-8 -*-
from kafka import KafkaProducer

"""
生產者模塊
"""
class Producer(object):

    def __init__(self, kafka_hosts, kafka_topic):
        self.kafka_topic = kafka_topic
        # bootstrap_servers 能夠是多臺服務器
        self.producer = KafkaProducer(bootstrap_servers=[kafka_hosts])
        self.send()

    def send(self):
        future = self.producer.send('my_topic', key=b'my_key', value=b'my_value', partition=0)
        result = future.get(timeout=10)
        print(result)

if __name__ == '__main__':
    p = Producer('localhost:9092','my_topic')

(2)consumer實現,consumer.py以下:

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
"""
消費者模塊
"""
class Consumer(object):

    def __init__(self, kafka_hosts, kafka_topic):
        self.consumers = KafkaConsumer(kafka_topic,bootstrap_servers=[kafka_hosts])

    def consume(self):
        for message in self.consumers:
            print(message)

if __name__ == '__main__':
    c = Consumer('localhost:9092','my_topic')
    c.consume()

運行consumer.py : python3 consumer.py。而後運行 python3 producer.py,輸出以下:

➜  kafka_client git:(master) ✗ python3 consumer.py
ConsumerRecord(topic='my_topic', partition=0, offset=3, timestamp=1546670287366, timestamp_type=0, key=b'my_key', value=b'my_value', headers=[], checksum=None, serialized_key_size=6, serialized_value_size=8, serialized_header_size=-1)

 


github:https://github.com/littlemesie/big_data_framework/tree/master/kafka_client

相關文章
相關標籤/搜索