Kafka Python的生產者和消費者

Kafka Python的生產者和消費者

在本教程中,咱們將使用Python構建Kafka Producer和Consumer。除此以外,咱們還將學習如何在Kafka中設置配置以及如何使用組和偏移量概念。python

創建

對於本教程,咱們應該在計算機上安裝python。另外,咱們須要訪問在咱們的設備或某些服務器上運行的Apache Kafka。您能夠檢查如何在Windows上安裝Apache Kafka。除此以外,咱們須要python的_kafka_ 庫來運行咱們的代碼。要解決此問題,請在系統上運行如下命令git

pip install kafkagithub

卡夫卡生產者

===web

讓咱們開始建立本身的Kafka Producer。咱們必須從kafka庫導入KafkaProducer。咱們還須要將Kafka服務器的代理列表提供給Producer,以便它能夠鏈接到Kafka服務器。咱們還須要提供要向其發佈消息的主題名稱。這是建立生產者所需的最小配置。docker

from kafka import KafkaProducer

bootstrap_servers = ['localhost:9092']
topicName = 'myTopic'

producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
producer = KafkaProducer()

咱們能夠使用如下代碼開始向該主題發送消息。json

ack = producer.send(topicName, b'Hello World!!!!!!!!')

metadata = ack.get()
print(metadata.topic)
print(metadata.partition)

上面的代碼將消息發送到Kafka服務器中名爲「 myTopic」的主題。可是,若是該主題還沒有出如今Kafka服務器中怎麼辦?在這種狀況下,Kafka會使用該名稱建立一個新主題並向其發佈消息。方便嗎?可是您應該記住要檢查主題名稱中是否存在拼寫錯誤。bootstrap

若是要爲Producer設置更多屬性或更改其序列化格式,則能夠使用如下代碼行。windows

producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii'))

卡夫卡消費者

完成建立Producer的工做後,如今讓咱們開始使用python構建Consumer,看看這是否一樣容易。導入KafkaConsumer後,咱們須要設置提供引導服務器ID和主題名稱,以與Kafka服務器創建鏈接。服務器

from kafka import KafkaConsumer
import sys

bootstrap_servers = ['localhost:9092']
topicName = 'myTopic'

consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers,
auto_offset_reset = 'earliest')

如咱們所見,咱們須要設置哪一個組消費者屬於。另外,咱們須要指定偏移量,此使用者應該從該偏移量讀取主題中的消息。在上述狀況下,咱們最先指定了auto_offset_reset,這意味着此使用者將從主題的開頭開始讀取消息。app

以後,咱們能夠開始閱讀主題中的消息。與每條消息一塊兒,咱們還得到了一些其餘信息,例如消息所屬的分區,在該分區中的偏移量和鍵。

try:
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

except KeyboardInterrupt:
    sys.exit()

這將以如下格式打印輸出。

用Python編寫的Kafka Consumer的輸出

就是這個。咱們已經在python中建立了第一個Kafka使用者。咱們能夠看到該使用者已經閱讀了該主題的消息並將其打印在控制檯上。

Docker 運行Kafka

使用的是 zerocode 提供的docker-compose配置文件。

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    # -----------------------------------------------------------------------------
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # -----------------------------------------------------------------------------
    image: confluentinc/cp-kafka:5.0.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

注意這裏的 PLAINTEXT_HOST://localhost:9092 使用的是localhost, 因此在容器外部訪問是沒有問題,若是須要的是容器之間的訪問,即生產才和消費者也在容器裏運行,則須要改爲hostname(如 kafka).

結論

咱們已經學習瞭如何在python中建立Kafka生產者和消費者。

相關文章
相關標籤/搜索