轉自:http://www.javashuo.com/article/p-cdrpukaq-hb.html
import socket
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
#kafka的配置文件:(bootstrap_servers:kafka的集羣地址,topic_name:主題,consumer_id:消費分組)
KAFKA_SETTING = {
'bootstrap_servers': ["172.24.181.182:9092", "172.24.181.181:9092", "172.24.181.183:9092"],
'topic_name': 'user_data',
'consumer_id': 'consumer_ai'
}
conf= KAFKA_SETTING
print("[setting] =", conf)
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
api_version = (0,10),
retries=5)
partitions = producer.partitions_for(conf['topic_name'])
print('Topic下分區: %s' % partitions)
#須要推送的數據:(推送到kafka的數據類型必須的json類型)
user_data = {
"appToken": "d23ea83dbf7c411aa36e5ab519f41818",
"appId": "JF_WK_001",
"mobile": "15950857927",
"isRealTimeReturn": True,
"applyTime": 15100226057,
"uuid": "a91140f54b898w85d7a50d4b95994",
"customerNo": 1153265851
}
send_data = bytes(json.dumps(user_data), encoding="utf-8")
try:
future = producer.send(conf['topic_name'], send_data)
future.get()
print('send message succeed.')
except KafkaError as e:
print('send message failed. [e] ='),
import socketfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorKAFKA_SETTING = { 'bootstrap_servers': ["172.24.181.182:9092", "172.24.181.181:9092", "172.24.181.183:9092"], 'topic_name': 'result_data', 'topic_name_user': 'user_data', 'consumer_id': 'consumer_ai'}conf = KAFKA_SETTINGconsumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], api_version = (0,10))print('consumer start to consuming...')consumer.subscribe((conf['topic_name'], ))from IPython import embed# embed()print("consumer = ", consumer)for message in consumer: print(message.topic, message.offset, message.key, message.value, message.partition)