python經過Pykafka庫來鏈接kafka並收發消息

1.安裝pykafkapython

pip install pykafka git

2.下載安裝github

git clone https://github.com/Parsely/pykafka.git異步

而後將下載下來的pykafka文件夾下的pykafka文件(pykafka的庫文件)放到/Library/Python/2.7/site-packages/路徑下便可接口

3.假設你有至少一個卡夫卡實例在本地運行,你能夠使用pykafka鏈接它。隊列

consumer.py 消費者ip

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient

#kafka默認端口爲9092
client = KafkaClient(hosts='192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092')#這裏鏈接多個客戶端
topic = client.topics['test_kafka_topic']

#從zookeeper消費,zookeeper的默認端口爲2181
balanced_consumer = topic.get_balanced_consumer(
    consumer_group='test_kafka_group',
    auto_commit_enable=True,  # 設置爲False的時候不須要添加consumer_group,直接鏈接topic便可取到消息
    zookeeper_connect='192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181'#這裏就是鏈接多個zk
)

for message in balanced_consumer:
    # print message
    if message is not None:
        print message.offset, message.value#打印接收到的消息體的偏移個數和值

producer.py 生產者utf-8

#!/usr/bin/python
# -*- coding:utf-8 -*-

from pykafka import KafkaClient
 
client = KafkaClient(hosts ="192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092") #可接受多個client
#查看全部的topic
client.topics
print client.topics


topic = client.topics['test_kafka_topic']#選擇一個topic

message ="test message test message"
#當有了topic以後呢,能夠建立一個producer,來發消息,生產kafka數據,經過字符串形式,
with topic.get_sync_producer() as producer:
    producer.produce(message)
#The example above would produce to kafka synchronously - 
#the call only returns after we have confirmation that the message made it to the cluster.
#以上的例子將產生kafka同步消息,這個調用僅僅在咱們已經確認消息已經發送到集羣以後

#但生產環境,爲了達到高吞吐量,要採用異步的方式,經過delivery_reports =True來啓用隊列接口;
with topic.get_sync_producer() as producer:
     producer.produce('test message',partition_key='{}'.)
producer=topic.get_producer()
producer.produce(message)
print message
相關文章
相關標籤/搜索