kafka-3python生產者和消費者實用demo

程序分爲productor.py是發送消息端,consumer爲消費消息端,python

啓動的時候先啓動product再啓動consumer,畢竟只有發了消息,消費端纔有消息能夠消費,json

productor.py
bootstrap

#!/usr/bin/env python2.7
#_*_coding: utf-8 _*_
from kafka import KafkaProducer


kafka_host = '192.168.1.200'  # kafka服務器地址
kafka_port = 9092  # kafka服務器的端口


producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
    kafka_host = kafka_host,
    kafka_port = kafka_port
)])


#簡單for循環10次,發送10條消息
for i in range(1,10):
    message_string = 'some message'.format(i)

    #調用send方法,發送名字爲'topic1'的topicid ,發送的消息爲message_string
    response = producer.send('topic1', message_string.encode('utf-8'))
    print response


consumer.py服務器

#!/usr/bin/env python
#_*_coding: utf-8 _*_
import json
from kafka import *

kafka_host = '192.168.1.200'  # kafka服務器地址
kafka_port = 9092  # kafka服務器端口


#消費topic1的topic,並指定group_id(自定義),多個機器或進程想順序消費,能夠指定同一個group_id,
# 若是想一條消費屢次消費,能夠換一個group_id,會從頭開始消費
consumer = KafkaConsumer(
    'topic1',
    group_id = 'my-group',
    bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)]
)
for message in consumer:
    #json讀取kafka的消息
    content = json.loads(message.value)
    print content
相關文章
相關標籤/搜索