程序分爲productor.py是發送消息端,consumer爲消費消息端,python
啓動的時候先啓動product再啓動consumer,畢竟只有發了消息,消費端纔有消息能夠消費,json
productor.py
bootstrap
#!/usr/bin/env python2.7 #_*_coding: utf-8 _*_ from kafka import KafkaProducer kafka_host = '192.168.1.200' # kafka服務器地址 kafka_port = 9092 # kafka服務器的端口 producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format( kafka_host = kafka_host, kafka_port = kafka_port )]) #簡單for循環10次,發送10條消息 for i in range(1,10): message_string = 'some message'.format(i) #調用send方法,發送名字爲'topic1'的topicid ,發送的消息爲message_string response = producer.send('topic1', message_string.encode('utf-8')) print response
consumer.py服務器
#!/usr/bin/env python #_*_coding: utf-8 _*_ import json from kafka import * kafka_host = '192.168.1.200' # kafka服務器地址 kafka_port = 9092 # kafka服務器端口 #消費topic1的topic,並指定group_id(自定義),多個機器或進程想順序消費,能夠指定同一個group_id, # 若是想一條消費屢次消費,能夠換一個group_id,會從頭開始消費 consumer = KafkaConsumer( 'topic1', group_id = 'my-group', bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)] ) for message in consumer: #json讀取kafka的消息 content = json.loads(message.value) print content