1.安裝pykafkapython
pip install pykafka git
2.下載安裝github
git clone https://github.com/Parsely/pykafka.git異步
而後將下載下來的pykafka文件夾下的pykafka文件(pykafka的庫文件)放到/Library/Python/2.7/site-packages/路徑下便可接口
3.假設你有至少一個卡夫卡實例在本地運行,你能夠使用pykafka鏈接它。隊列
consumer.py 消費者ip
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient #kafka默認端口爲9092 client = KafkaClient(hosts='192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092')#這裏鏈接多個客戶端 topic = client.topics['test_kafka_topic'] #從zookeeper消費,zookeeper的默認端口爲2181 balanced_consumer = topic.get_balanced_consumer( consumer_group='test_kafka_group', auto_commit_enable=True, # 設置爲False的時候不須要添加consumer_group,直接鏈接topic便可取到消息 zookeeper_connect='192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181'#這裏就是鏈接多個zk ) for message in balanced_consumer: # print message if message is not None: print message.offset, message.value#打印接收到的消息體的偏移個數和值
producer.py 生產者utf-8
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts ="192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092") #可接受多個client #查看全部的topic client.topics print client.topics topic = client.topics['test_kafka_topic']#選擇一個topic message ="test message test message" #當有了topic以後呢,能夠建立一個producer,來發消息,生產kafka數據,經過字符串形式, with topic.get_sync_producer() as producer: producer.produce(message) #The example above would produce to kafka synchronously - #the call only returns after we have confirmation that the message made it to the cluster. #以上的例子將產生kafka同步消息,這個調用僅僅在咱們已經確認消息已經發送到集羣以後 #但生產環境,爲了達到高吞吐量,要採用異步的方式,經過delivery_reports =True來啓用隊列接口; with topic.get_sync_producer() as producer: producer.produce('test message',partition_key='{}'.) producer=topic.get_producer() producer.produce(message) print message