Kafka快速入門(十二)——Python客戶端

Kafka快速入門(十二)——Python客戶端

1、confluent-kafka

一、confluent-kafka簡介

confluent-kafka是Python模塊,是對librdkafka的輕量級封裝,支持Kafka 0.8以上版本。本文基於confluent-kafka 1.3.0編寫。
GitHub地址:
https://github.com/confluentinc/confluent-kafka-pythonpython

二、confluent-kafka特性

(1)可靠。confluent-kafka是對普遍應用於各類生產環境的librdkafka的封裝,使用Java客戶端相同的測試集進行測試,由Confluent進行支持。
(2)性能。性能是一個關鍵的設計考慮因素,對於較大的消息,最大吞吐量與Java客戶機至關(Python解釋器的開銷影響較小),延遲與Java客戶端至關。
(3)將來支持。Coufluent由Kafka創始人建立,致力於構建以Apache Kafka爲核心的流處理平臺。確保核心Apache Kafka和Coufluent平臺組件保持同步是當務之急。git

三、confluent-kafka安裝

建立confluent源:
進入/etc/yum.repos.d目錄建立confluent.repo文件:github

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.4/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.4
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1

安裝:正則表達式

sudo yum clean all &&  sudo yum install confluent-community-2.12
sudo yum install librdkafka-devel python-devel
pip install confluent-kafka

安裝AvroProducer、AvroConsumer:
pip install "confluent-kafka[avro]"docker

2、coufluent-kafka客戶端API

一、confluent_kafka.Consumer

Consumer(config)
使用指定的配置dict建立Consumer實例。
Consumer.assign(partitions)
由指定TopicPartition列表設置Consumer的分區分配策略,啓動消費。若是對關閉的Consumer調用本函數會拋出RuntimeError。
Consumer.assignment()
返回當前分區分配策略,返回list(TopicPartition)
Consumer.close()
關閉和終止Consumer實例,關閉Consumer實例會執行如下操做:中止消費;提交位移(若是enable.auto.commit設置爲False會拋出異常)、離開Consumer Group。
Consumer.commit([message=None][, offsets=None][, asynchronous=True])
提交一條消息或位移列表,message和offsets是互斥參數,若是沒有指定參數,會使用當前分區分配策略的offsets。
message:提交消息的位移加1
offsets:要提交的TopicPartition列表
asynchronous:是否異步提交。異步提交會當即返回None。若是設置爲False,會阻塞直到提交成功或失敗,若是提交成功,會返回提交的offsets。注意:提交成功,須要對返回的TopicPartition列表的每一個TopicPartition的err字段進行檢查,TopicPartition可能會提交失敗。
Consumer.committed(partitions[, timeout=None])
獲取已提交的分區的offsets。
partitions:TopicPartition列表
timeout:請求超時,單位秒。
返回TopicPartition列表或錯誤集
Consumer.consume([num_messages=1][, timeout=-1])
消費消息,調用回調函數,返回消息列表,若是超時,返回空。
應用程序必須檢查返回Message的error方法,正常Message的error返回None。
num_messages:返回的最大消息數量,默認爲1
timeout:阻塞等待消息、事件、回調函數的最大時間
Connsumer.get_watermark_offsets(partition[, timeout=None][, cached=False])
獲取分區的低水位和高水位
partition:TopicPartition對象
Timeout:請求超時,
Cached:是否替換正在查詢的Broker使用的緩存信息。
成功返回低水位和高水位的元組,超時返回None。
Consumer.list_topics([topic=None][, timeout=-1])
請求集羣的元數據信息。
topic:字符串類,若是指定,只請求本Topic的信息,不然返回集羣的全部Topic。
timeout:超時前的最大響應時間,-1表示永不超時。
返回ClusterMetadata類型
Consumer.offsets_for_times(partitions[, timeout=None])
對指定的分區列表根據時間戳查詢offsets。
返回每一個分區的offsets大於等於指定分區列表的時間戳的位移。
partitions:TopicPartition列表
timeout:請求超時時間。
Consumer.pause(partitions)
暫停指定分區列表的分區的消費
Consumer.poll([timeout=None])
消費消息,調用回調函數,返回事件。
應用程序必須檢查返回的Message對象的error()方法,若是是正常消息,返回None。
返回Message對象回None。
Consumer.position(partitions)
獲取指定分區列表分區的位移
partitions:分區列表
返回帶位移的TopicPartition列表,當前位移是最新消費消息的位移加1。
Consumer.resume(partitions)
恢復指定分區列表的分區的消費
partitions:要恢復的TopicPartitio列表
Consumer.seek(partition)
定位分區的消費位移到offset。offset能夠是絕對值,也能夠是邏輯位移OFFSET_BEGINNING。本函數只用於活躍消費分區更新消費位移,要設置分區的起始位移可使用assign函數。
Consumer.store_offsets([message=None][, offsets=None])
存儲一條消息的位移或位移列表。
message和offsets是互斥參數。
被存儲的位移會根據auto.commit.interval.m參數值被提交,使用本函數時enable.auto.offset.store參數必須被設置爲False。
message:存儲message的位移加1。
offsets:要存儲位移的TopicPartition列表
Consumer.subscribe(topics[, on_assign=None][, on_revoke=None])
設置要訂閱的Topic列表,會替代此前訂閱的Topic。
訂閱的Topic名稱支持正則表達式,使用」^」做爲Topic名稱前綴。
topics:Topic名稱列表
on_assign:完成分區再分配的回調函數
on_revoke:再平衡操做的bootstrap

on_assign(consumer, partitions)
on_revoke(consumer, partitions)

Consumer.unassign()
刪除當前分區分配策略和中止消費
Consumer.unsubscribe()
刪除當前訂閱Topicapi

二、confluent_kafka.Producer

confluent_kafka.Producer是異步Kafka生產者。
Producer.Producer(config)
使用config字典建立Producer實例。
config:配置字典對象,至少應該設置bootstrap.servers屬性
Producer.len()
返回要傳遞到Broker的消息數量
Producer.flush([timeout])
等待Producer隊列中要傳遞的全部消息。
timeout:阻塞的最大時間,要求librdkafka版本大於0.9.4。
返回Producer消息隊列中仍然存在的消息的數量。
Producer.list_topics([topic=None][, timeout=-1])
請求集羣的元數據。
topic:若是指定Topic,只返回Topic的相應元數據,不然返回全部Topic的元數據。
timeout:超時前的最大響應時間,-1表示永不超時。
返回ClusterMetadata類型。
Producer.poll([timeout])
Poll生產者事件,調用相應回調函數。
timeout:阻塞等待事件的最大時間。
返回處理事件的數量。
Producer.produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
生產消息到指定Topic,異步操做。
topic:要生產消息到的指定Topic。
value:str或bytes類型,消息數據。
key:str或bytes類型,消息Key。
partition:要生產消息到指定分區,不然使用內置分區分配策略。
on_delivery:投遞報告回調函數
timestamp:消息事件戳,要求librdkafka v0.9.4以上版本,api.version.request=true, Kafka 0.10.0.0以上版本。
headers:消息頭,字典類型,消息頭的key必須是字符串,value必須是二進制數據,unicode或None。要求librdkafka v0.11.4以上版本和Kafka 0.11.0.0以上版本。緩存

三、confluent_kafka.admin.AdminClient

AdminClient提供對Kafka Broker、Topic、Group、Broker支持的其它資源進行管理操做。
AdminClient.alter_configs(resources, **kwargs)
更新指定resource的配置值。
AdminClient.create_partitions(new_partitions, **kwargs)
建立指定Topic的分區
AdminClient.create_topics(new_topics, **kwargs)
建立Topic
AdminClient.delete_topics(topics, **kwargs)
刪除Topic
AdminClient.describe_configs(resources, **kwargs)
獲取指定resource的配置安全

四、confluent_kafka.admin.BrokerMetadata

BrokerMetadata定義了Kafka Broker的信息,是非實例化類,BrokerMetadata包含的屬性以下:
id:整型,Broker ID
host:字符串類型,Broker 主機名
port:整型,Broker端口bash

五、confluent_kafka.admin.ClusterMetadata

ClusterMetadata定義了Kafka集羣、Broker、Topic等信息,是非實例化類,ClusterMetadata包含以下屬性:
cluster_id :字符串,Kafka集羣ID字符串。
controller_id:id類型,當前Controller Broker ID
brokers:字典,key爲整型Broker ID,值爲BrokerMetadata對象。
topics :字典,key爲字符串Topic名稱, 值爲TopicMetadata對象。
orig_broker_id:整型,數據源於Broker的ID。
orig_broker_name:字符串,數據源於Broker的名稱或地址。

六、confluent_kafka.admin.ConfigEntry

ConfigEntry由describe_configs()返回指定資源的配置實體,是非實例化類,ConfigEntry包含以下屬性:
name:字符串,配置屬性名稱。
value:字符串,配置值。
source :ConfigSource類型,配置源。
is_read_only:bool類型,指明配置屬性是否只讀。
is_default:bool類型,指明配置屬性是否使用默認值。
is_sensitive:bool類型,指明配置屬性值是否包含敏感信息,如安全配置。
is_synonym:bool類型,指明配置屬性是不是賦配置實體的別名。
synonyms :list類型,配置屬性的備用源的配置實體列表。

七、confluent_kafka.admin.ConfigResource

ConfigResource(restype, name, set_config=None, described_configs=None, error=None)
restype:resource類型
name:resource名稱
set_config:胚珠屬性值設置方法
described_configs:
error:錯誤信息
Kafka資源ConfigResource.Type以下:
ANY= 1,任何資源
BROKER= 4,Broker資源,資源名稱是Broker ID
GROUP= 3,Group資源,資源名稱是group.id
TOPIC= 2,Topic資源,資源名稱是Topic名稱。
UNKNOWN= 0,未知類型,未設置類型。
set_config(name, value, overwrite=True)
設置、覆寫配置實體
name:配置屬性名稱。
value:配置屬性值。
overwrite:是否覆寫。

八、confluent_kafka.admin.ConfigSource

ConfigSource是由 describe_configs()返回的配置實體的配置源。
DEFAULT_CONFIG= 5
DYNAMIC_BROKER_CONFIG= 2
DYNAMIC_DEFAULT_BROKER_CONFIG= 3
DYNAMIC_TOPIC_CONFIG= 1
STATIC_BROKER_CONFIG= 4
UNKNOWN_CONFIG= 0

九、confluent_kafka.admin.PartitionMetadata

PartitionsMetadata包含Kafka分區的元數據,是非實例化類,PartitionsMetadata包含的屬性以下:
id :整型,分區編號。
leader :整型,分區的當前Leader,或是-1。
replicas: 整型列表,分區的副本的Broker ID的列表。
Isrs: 整型列表,分區的ISR Broker ID列表。
error:KafkaError類型,分區錯誤。

十、confluent_kafka.admin.TopicMetadata

TopicMetadata包含Kafka Topic相關的元數據,是非實例化類,TopicMetadata包含的屬性以下:
topic:字符串類型,Topic名稱
partitions:字典,key是分區編號,值是PartitionMetadata對象。
error:KafkaError類型,Topic錯誤。

3、Avro序列化組件

一、confluent_kafka.avro.AvroConsumer

AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None)
Kafka Consumer客戶端,對消息進行avro模式解碼,處理消息反序列化。
config:字典,配置參數,包含schema.registry.url和bootstrap.servers參數。
reader_key_schema:schema類型,消息key的讀取器。
reader_value_schema:(schema類型,消息值的讀取器。
AvroConsumer.poll(timeout=None)
confluent_kafka.Consumer類的poll方法的覆寫,使用avro scema處理消息的反序列化。

二、confluent_kafka.avro.AvroProducer

AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None)
對消息進行avro schema編碼的Kafka Producer客戶端,處理結構註冊、消息序列化。
config:字典,配置參數,包含schema.registry.url和bootstrap.servers參數。
default_key_schema:字符串,可選,key的默認avro schema。
default_value_schema:字符串,可選,value的默認avro schema。
AvroProducer.produce(**kwargs)
異步發送消息到Kafka,使用指定編碼,默認使用avro schema。
topic:字符串,Topic名稱。
value: object類型,要序列化的對象。
value_schema:字符串,值的Avro schema。
key:object類型,要序列化的對象。
key_schema:字符串,鍵Avro schema。

4、coufluent-kafka主要類API

一、confluent_kafka.Message

Message對象表示一條消費或生產的消息,或是一個事件。
應用程序必須使用Message.error()方法檢查Message對象是不是一個正常的Message仍是一個錯誤事件。
Message類不是用戶可實例化的類,
Message.len()
返回消息數據的長度
Message.error()
Message對象用於傳播錯誤或事件,應用程序必須檢查error()方法以肯定Message對象是不是一個正常的消息(返回None),錯誤或是事件(返回KafkaError對象)。
Message.headers()
獲取消息的頭。消息頭是鍵值對集合,消息頭的鍵是有序的,能夠重重複。
返回消息頭鍵值對的列表。
Message.key()
獲取消息鍵。
Message.offset()
消息位移
Message.partition()
分區編號
Message.set_headers(value)
使用新值設置Message.key字段的值。
Message.set_value()
使用新值設置Message.value字段的值
Message.timestamp()
獲取消息的時間戳類型和時間戳。時間戳類型以下:
TIMESTAMP_NOT_AVAILABLE:Broker不支持的時間戳
TIMESTAMP_CREATE_TIME:生產者時間戳
TIMESTAMP_LOG_APPEND_TIME:Broker接收時間
返回時間戳類型和時間戳的元組。
若是返回的時間戳類型是TIMESTAMP_NOT_AVAILABLE,返回的時間戳應該被忽略。時間戳要求Kafka 0.10.0.0以上版本,客戶端配置的api.version.request屬性值爲True。
Message.topic()
返回消息的Topic
Message.Value()
返回消息數據。

二、confluent_kafka.TopicPartition

TopicPartition是一種泛類型,用戶保存單個分區及其相關的各類信息。一般用於爲不一樣的操做提供TopicPartition列表。
TopicPartition(topic[, partition][, offset])
實例化TopicPartition對象。
topic:Topic名稱
partition:分區編號
offset:位移
TopicPartition.error:屬性值,使用KafkaError表示一個錯誤。
TopicPartition.offset:屬性值,位移
TopicPartition.partition:屬性值,分區編號
TopicPartition.topic:屬性值,Topic名稱

三、confluent_kafka.KafkaError

KafkaError表示Kafka錯誤和事件對象,不是用戶實例化類,用於事件傳播、錯誤傳播、異常。
KafkaError.code()
返回錯誤或事件的代碼
KafkaError.name()
返回錯誤或事件的枚舉名稱
KafkaError.str()
返回事件或錯誤的可讀字符串描述

四、confluent_kafka.KafkaException

KafkaException是對KafkaError類的封裝,使用exception.args[0]能夠提取KafkaError對象。

五、Offset

OFFSET_BEGINNING:從分區開始
OFFSET_END:分區結束位置
OFFSET_STORED:使用存儲提交位移
OFFSET_INVALID:非法/默認位移。

六、confluent_kafka.ThrottleEvent

ThrottleEvent包含限制請求的相關數據,是非實例化類,包含以下屬性:
broker_name:字符串,限制請求的Broker的主機名稱。 
broker_id:整型,Broker ID。
throttle_time:float類型,Broker限制請求的時間,單爲秒。

七、Configuration

生產者和消費者實例的配置。

conf = {'bootstrap.servers': 'mybroker.com',
        'group.id': 'mygroup', 'session.timeout.ms': 6000,
        'on_commit': my_commit_callback,
        'default.topic.config': {'auto.offset.reset': 'smallest'}}
consumer = confluent_kafka.Consumer(conf)

Python綁定提供了其它的配置屬性:
default.topic.config:屬性值是頂層配置屬性字典。

5、coufluent-kafka示例

一、Producer

from confluent_kafka import Producer

class KafkaProducer:
    def __init__(self, brokers):
        self.producer = Producer({'bootstrap.servers': brokers})

    def sendMessage(self, topic, payloads):
        for payload in payloads:
            # Trigger any available delivery report callbacks from previous produce() calls
            self.producer.poll(0)
            # Asynchronously produce a message, the delivery report callback
            # will be triggered from poll() above, or flush() below, when the message has
            # been successfully delivered or failed permanently.
            self.producer.produce(topic, payload.encode('utf-8'), callback=self.delivery_report)

        # Wait for any outstanding messages to be delivered and delivery report
        # callbacks to be triggered.
        self.producer.flush()

    @staticmethod
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

if __name__ == "__main__":
    producer = KafkaProducer('192.168.0.105:9092')
    source_data = list()
    for x in range(1000):
        source_data.append("Hello kafka{}".format(x))
    producer.sendMessage('test2', source_data)

進入kafka容器:
docker exec -it kafka-test /bin/bash
查看Topic的消息:
kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic test2 --from-beginning

二、Consumer

from confluent_kafka import Consumer

class KafkaConsumer:
    def __init__(self, brokers, group):
        config = dict()
        config['bootstrap.servers'] = brokers
        config['group.id'] = group
        config['auto.offset.reset'] = 'earliest'
        self.consumer = Consumer(config)

    def subscribe(self, topics):
        self.consumer.subscribe(topics=topics)

    def pull(self):
        while True:
            msg = self.consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue
            print('Received message: {}'.format(msg.value().decode('utf-8')))

    def close(self):
        self.consumer.close()

if __name__ == "__main__":
    consumer = KafkaConsumer("192.168.0.105:9092", "test_group")
    consumer.subscribe(["test1", "test2"])
    consumer.pull()
    consumer.close()

三、AvroProducer

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

class Message:
    """
    Message struct
    """
    def __init__(self, key, value):
        self.key = {"name": "{}".format(key)}
        self.value = {"name": "{}".format(value)}

class KafkaAvroProducer:
    """
    Kafka Avro Producer Wrapper class
    """
    value_schema = ""
    key_schema = ""

    def __init__(self, brokers, schema_registry_url):
        config = dict()
        config['bootstrap.servers'] = brokers
        config['on_delivery'] = KafkaAvroProducer.delivery_report
        config['schema.registry.url'] = schema_registry_url
        self.avro_producer = AvroProducer(config=config,
                                          default_key_schema=KafkaAvroProducer.key_schema,
                                          default_value_schema=KafkaAvroProducer.value_schema)

    @classmethod
    def register_value_schema(cls, schema):
        cls.key_schema = avro.loads(schema)

    @classmethod
    def register_key_schema(cls, schema):
        cls.value_schema = avro.loads(schema)

    def send_message(self, topic, messages):
        for message in messages:
            self.avro_producer.produce(topic='test1', value=message.value, key=message.key)

        self.avro_producer.flush()

    @staticmethod
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

if __name__ == "__main__":
    value_schema_str = """
    {
       "namespace": "kafka.test",
       "name": "value",
       "type": "record",
       "fields" : [
         {
           "name" : "name",
           "type" : "string"
         }
       ]
    }
    """
    key_schema_str = """
    {
       "namespace": "kafka.test",
       "name": "key",
       "type": "record",
       "fields" : [
         {
           "name" : "name",
           "type" : "string"
         }
       ]
    }
    """
    messages = list()
    for i in range(1000):
        messages.append(Message("key{}".format(i), "Hello Confluent Kafka{}".format(i)))
    KafkaAvroProducer.register_key_schema(key_schema_str)
    KafkaAvroProducer.register_value_schema(value_schema_str)
    schema_registry_url = 'http://127.0.0.1:8081'
    avroProducer = KafkaAvroProducer("192.168.0.105:9092", schema_registry_url)
    avroProducer.send_message("test1", messages=messages)

四、AvroConsumer

from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

class KafkaAvroConsumer:
    def __init__(self, brokers, group, schema_registry_url):
        self.avro_consumer = AvroConsumer({
            'bootstrap.servers': brokers,
            'group.id': group,
            'auto.offset.reset': 'earliest',
            'schema.registry.url': schema_registry_url})

    def subscribe(self, topics):
        self.avro_consumer.subscribe(topics=topics)

    def pull_message(self):
        while True:
            try:
                msg = self.avro_consumer.poll(2)
            except SerializerError as e:
                print("Message deserialization failed for {}: {}".format(msg, e))
                break
            if msg is None:
                continue
            if msg.error():
                print("AvroConsumer error: {}".format(msg.error()))
                continue
            print(msg.key(), ": ", msg.value())

    def close(self):
        self.avro_consumer.close()

if __name__ == "__main__":
    schema_registry_url = "http://127.0.0.1:8081"
    avroConsumer = KafkaAvroConsumer("192.168.0.105:9092", "test_group", schema_registry_url)
    avroConsumer.subscribe(["test1", "test2"])
    avroConsumer.pull_message()
    avroConsumer.close()

五、AdminClient

from confluent_kafka.admin import NewTopic, AdminClient

class KafkaManager:
    def __init__(self, broker):
        self.admin_client = AdminClient({'bootstrap.servers': broker})

    def create_topics(self, topics, num_partition):
        new_topics = [NewTopic(topic, num_partitions=num_partition, replication_factor=1) for topic in topics]
        fs = self.admin_client.create_topics(new_topics)
        # Wait for each operation to finish.
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                print("Topic {} created".format(topic))
            except Exception as e:
                print("Failed to create topic {}: {}".format(topic, e))

    def delete_topics(self, topics):
        fs = self.admin_client.delete_topics(topics=topics)
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                print("Topic {} deleted".format(topic))
            except Exception as e:
                print("Failed to delete topic {}: {}".format(topic, e))

if __name__ == "__main__":
    import time
    manager = KafkaManager("192.168.0.105:9092")
    manager.create_topics(["test3", "test4"], 1)
    time.sleep(3)
    manager.delete_topics(["test3", "test4"])
相關文章
相關標籤/搜索