Kafka(八)Python生產者和消費者API使用

單線程生產者python

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import sys
from kafka import KafkaProducer
from kafka.client import log
import time
import json

__metaclass__ = type


class Producer:
    def __init__(self, KafkaServer='127.0.0.1', KafkaPort='9092', ClientId="Procucer01", Topic='Test'):
        """
        用於設置生產者配置信息,這些配置項能夠從源碼中找到,下面爲必要參數。
        :param KafkaServer: kafka服務器IP
        :param KafkaPort: kafka工做端口
        :param ClientId: 生產者名稱
        :param Topic: 主題
        """
        self._bootstrap_server = '{host}:{port}'.format(host=KafkaServer, port=KafkaPort)
        self._topic = Topic
        self._clientId = ClientId

        """
        初始化一個生產者實例,生產者是線程安全的,多個線程共享一個生產者實例效率比每一個線程都使用一個生產者實例要高
        acks: 消費者只能消費被提交的,而只有消息在全部副本中都有了纔算提交,生產者發送了消息是否要等待全部副本都同步了該消息呢?這個值就是控制這個的。默認是1,表示只要該分區的Leader副本成功寫入日誌就返回。
              0表示生產者無需等待,發送完就返回;all是全部副本都寫入該消息才返回。 all可靠性最高可是效率最低,0效率最高可是可靠性最低,因此通常用1。
        retries: 表示請求重試次數,默認是0,上面的acks配置請求完成的標準,若是請求失敗,生產者將會自動重試,若是配置爲0則不重試。可是若是重試則有可能發生重複發送消息。
        key_serializer: 鍵的序列化器,默認不設置,採用字節碼
        value_serializer: 值得序列化器,默認不設置,採用字節碼,由於能夠發送單一字符,也能夠發送鍵值型消息
        """
        try:
            self._producer = KafkaProducer(bootstrap_servers=self._bootstrap_server, client_id=self._clientId, acks=1,
                                           value_serializer=lambda m: json.dumps(m).encode('utf-8'))
        except Exception as err:
            print err.message

    def _TIMESTAMP(self):
        t = time.time()
        return int((round(t * 1000)))

    # 時間戳轉換爲普通時間
    def getNormalTime(self, temp_timeStamp, timeSize=10):
        timeStamp = temp_timeStamp
        if timeSize == 13:
            timeStamp = int(temp_timeStamp / 1000)
        timeArray = time.localtime(timeStamp)
        otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
        return otherStyleTime

    # 發送成功的回調函數
    def _on_send_success(self, record_metadata):
        print "Topic: %s Partition: %d Offset: %s" % (record_metadata.topic, record_metadata.partition, record_metadata.offset)

    # 發送失敗的回調函數
    def _on_send_error(self, excp):
        log.error('I am an errback', exc_info=excp)

    def sendMsg(self, msg, partition=None):
        """
        發送消息
        :param msg: 消息
        :param partition: 分區也能夠不指定
        :return:
        """
        if not msg:
            print "消息不能爲空。"
            return None

        # 發送的消息必須是序列化後的,或者是字節
        message = json.dumps(msg, encoding='utf-8', ensure_ascii=False)
        try:
            TIMESTAMP = self._TIMESTAMP()
            # 發送數據,異步方式,調用以後當即返回,由於這裏實際上是發送到緩衝區,因此你能夠屢次調用,而後一塊兒flush出去。
            self._producer.send(self._topic, partition=partition, key=self._clientId, value=message, timestamp_ms=TIMESTAMP).add_callback(self._on_send_success).add_errback(self._on_send_error)
            # 下面的 flush是阻塞的,只有flush纔會真正經過網絡把緩衝區的數據發送到對端,若是不調用flush,則等到時間或者緩衝區滿了就會發送。
            self._producer.flush()
            print self.getNormalTime(TIMESTAMP, timeSize=13) + " send msg: " + message
        except Exception as err:
            print err


def main():
    p = Producer(KafkaServer="172.16.48.171", KafkaPort="9092", Topic='AAA')
    for i in range(10):
        time.sleep(1)
        closePrice = random.randint(1, 500)
        msg = {
            "股票代碼": 60000 + i,
            "昨日收盤價": closePrice,
            "今日開盤價": 0,
            "今日收盤價": 0,
        }
        p.sendMsg(msg)


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()


消費者json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from kafka import KafkaConsumer
import json

__metaclass__ = type


class Consumer:
    def __init__(self, KafkaServer='127.0.0.1', KafkaPort='9092', GroupID='TestGroup', ClientId="Test", Topic='Test'):
        """
        用於設置消費者配置信息,這些配置項能夠從源碼中找到,下面爲必要參數。
        :param KafkaServer: kafka服務器IP
        :param KafkaPort: kafka工做端口
        :param GroupID: 消費者組ID
        :param ClientId: 消費者名稱
        :param Topic: 主題
        """
        self._bootstrap_server = '{host}:{port}'.format(host=KafkaServer, port=KafkaPort)
        self._groupId = GroupID
        self._topic = Topic
        self._clientId = ClientId

    def consumeMsg(self):

        try:
            """
            初始化一個消費者實例,消費者不是線程安全的,因此建議一個線程實現一個消費者,而不是一個消費者讓多個線程共享
            下面這些是可選參數,能夠在初始化KafkaConsumer實例的時候傳遞進去
            enable_auto_commit 是否自動提交,默認是true
            auto_commit_interval_ms 自動提交間隔毫秒數
            """
            consumer = KafkaConsumer(self._topic, bootstrap_servers=self._bootstrap_server,
                                     group_id=self._groupId, client_id=self._clientId, enable_auto_commit=True,
                                     auto_commit_interval_ms=5000, value_deserializer=lambda m: json.loads(m.decode('utf-8')))

            """
            這裏不須要顯示的調用訂閱函數,在初始化KafkaConsumer對象的時候已經指定了主題,若是主題字段不爲空則會自動調用訂閱函數,至於
            這個線程消費哪一個分區則是自動分配的。若是你但願手動指定分區則就須要使用 assign() 函數,而且在初始的時候不輸入主題。
            """
            # consumer.subscribe(self._topicList)

            # 返回一個集合
            print "當前消費的分區爲:", consumer.partitions_for_topic(self._topic)
            print "當前訂閱的主題爲:", consumer.subscription()

            while True:
                for msg in consumer:
                    if msg:
                        print "Topic: %s Partition: %d Offset: %s Key: %s Message: %s " % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        except Exception as err:
            print err


def main():
    try:
        c = Consumer(KafkaServer='172.16.48.171', Topic='AAA')
        c.consumeMsg()
    except Exception as err:
        print err.message


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

執行效果bootstrap

image.png

image.png

相關文章
相關標籤/搜索