confluent-kafka是Python模塊,是對librdkafka的輕量級封裝,支持Kafka 0.8以上版本。本文基於confluent-kafka 1.3.0編寫。
GitHub地址:
https://github.com/confluentinc/confluent-kafka-pythonpython
(1)可靠。confluent-kafka是對普遍應用於各類生產環境的librdkafka的封裝,使用Java客戶端相同的測試集進行測試,由Confluent進行支持。
(2)性能。性能是一個關鍵的設計考慮因素,對於較大的消息,最大吞吐量與Java客戶機至關(Python解釋器的開銷影響較小),延遲與Java客戶端至關。
(3)將來支持。Coufluent由Kafka創始人建立,致力於構建以Apache Kafka爲核心的流處理平臺。確保核心Apache Kafka和Coufluent平臺組件保持同步是當務之急。git
建立confluent源:
進入/etc/yum.repos.d目錄建立confluent.repo文件:github
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.4/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.4/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.4 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.4/archive.key enabled=1
安裝:正則表達式
sudo yum clean all && sudo yum install confluent-community-2.12 sudo yum install librdkafka-devel python-devel pip install confluent-kafka
安裝AvroProducer、AvroConsumer:pip install "confluent-kafka[avro]"
docker
Consumer(config)
使用指定的配置dict建立Consumer實例。Consumer.assign(partitions)
由指定TopicPartition列表設置Consumer的分區分配策略,啓動消費。若是對關閉的Consumer調用本函數會拋出RuntimeError。Consumer.assignment()
返回當前分區分配策略,返回list(TopicPartition)Consumer.close()
關閉和終止Consumer實例,關閉Consumer實例會執行如下操做:中止消費;提交位移(若是enable.auto.commit設置爲False會拋出異常)、離開Consumer Group。Consumer.commit([message=None][, offsets=None][, asynchronous=True])
提交一條消息或位移列表,message和offsets是互斥參數,若是沒有指定參數,會使用當前分區分配策略的offsets。
message:提交消息的位移加1
offsets:要提交的TopicPartition列表
asynchronous:是否異步提交。異步提交會當即返回None。若是設置爲False,會阻塞直到提交成功或失敗,若是提交成功,會返回提交的offsets。注意:提交成功,須要對返回的TopicPartition列表的每一個TopicPartition的err字段進行檢查,TopicPartition可能會提交失敗。Consumer.committed(partitions[, timeout=None])
獲取已提交的分區的offsets。
partitions:TopicPartition列表
timeout:請求超時,單位秒。
返回TopicPartition列表或錯誤集Consumer.consume([num_messages=1][, timeout=-1])
消費消息,調用回調函數,返回消息列表,若是超時,返回空。
應用程序必須檢查返回Message的error方法,正常Message的error返回None。
num_messages:返回的最大消息數量,默認爲1
timeout:阻塞等待消息、事件、回調函數的最大時間Connsumer.get_watermark_offsets(partition[, timeout=None][, cached=False])
獲取分區的低水位和高水位
partition:TopicPartition對象
Timeout:請求超時,
Cached:是否替換正在查詢的Broker使用的緩存信息。
成功返回低水位和高水位的元組,超時返回None。Consumer.list_topics([topic=None][, timeout=-1])
請求集羣的元數據信息。
topic:字符串類,若是指定,只請求本Topic的信息,不然返回集羣的全部Topic。
timeout:超時前的最大響應時間,-1表示永不超時。
返回ClusterMetadata類型Consumer.offsets_for_times(partitions[, timeout=None])
對指定的分區列表根據時間戳查詢offsets。
返回每一個分區的offsets大於等於指定分區列表的時間戳的位移。
partitions:TopicPartition列表
timeout:請求超時時間。Consumer.pause(partitions)
暫停指定分區列表的分區的消費Consumer.poll([timeout=None])
消費消息,調用回調函數,返回事件。
應用程序必須檢查返回的Message對象的error()方法,若是是正常消息,返回None。
返回Message對象回None。Consumer.position(partitions)
獲取指定分區列表分區的位移
partitions:分區列表
返回帶位移的TopicPartition列表,當前位移是最新消費消息的位移加1。Consumer.resume(partitions)
恢復指定分區列表的分區的消費
partitions:要恢復的TopicPartitio列表Consumer.seek(partition)
定位分區的消費位移到offset。offset能夠是絕對值,也能夠是邏輯位移OFFSET_BEGINNING。本函數只用於活躍消費分區更新消費位移,要設置分區的起始位移可使用assign函數。Consumer.store_offsets([message=None][, offsets=None])
存儲一條消息的位移或位移列表。
message和offsets是互斥參數。
被存儲的位移會根據auto.commit.interval.m參數值被提交,使用本函數時enable.auto.offset.store參數必須被設置爲False。
message:存儲message的位移加1。
offsets:要存儲位移的TopicPartition列表Consumer.subscribe(topics[, on_assign=None][, on_revoke=None])
設置要訂閱的Topic列表,會替代此前訂閱的Topic。
訂閱的Topic名稱支持正則表達式,使用」^」做爲Topic名稱前綴。
topics:Topic名稱列表
on_assign:完成分區再分配的回調函數
on_revoke:再平衡操做的bootstrap
on_assign(consumer, partitions) on_revoke(consumer, partitions)
Consumer.unassign()
刪除當前分區分配策略和中止消費Consumer.unsubscribe()
刪除當前訂閱Topicapi
confluent_kafka.Producer是異步Kafka生產者。Producer.Producer(config)
使用config字典建立Producer實例。
config:配置字典對象,至少應該設置bootstrap.servers屬性
Producer.len()
返回要傳遞到Broker的消息數量Producer.flush([timeout])
等待Producer隊列中要傳遞的全部消息。
timeout:阻塞的最大時間,要求librdkafka版本大於0.9.4。
返回Producer消息隊列中仍然存在的消息的數量。Producer.list_topics([topic=None][, timeout=-1])
請求集羣的元數據。
topic:若是指定Topic,只返回Topic的相應元數據,不然返回全部Topic的元數據。
timeout:超時前的最大響應時間,-1表示永不超時。
返回ClusterMetadata類型。Producer.poll([timeout])
Poll生產者事件,調用相應回調函數。
timeout:阻塞等待事件的最大時間。
返回處理事件的數量。Producer.produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
生產消息到指定Topic,異步操做。
topic:要生產消息到的指定Topic。
value:str或bytes類型,消息數據。
key:str或bytes類型,消息Key。
partition:要生產消息到指定分區,不然使用內置分區分配策略。
on_delivery:投遞報告回調函數
timestamp:消息事件戳,要求librdkafka v0.9.4以上版本,api.version.request=true, Kafka 0.10.0.0以上版本。
headers:消息頭,字典類型,消息頭的key必須是字符串,value必須是二進制數據,unicode或None。要求librdkafka v0.11.4以上版本和Kafka 0.11.0.0以上版本。緩存
AdminClient提供對Kafka Broker、Topic、Group、Broker支持的其它資源進行管理操做。AdminClient.alter_configs(resources, **kwargs)
更新指定resource的配置值。AdminClient.create_partitions(new_partitions, **kwargs)
建立指定Topic的分區AdminClient.create_topics(new_topics, **kwargs)
建立TopicAdminClient.delete_topics(topics, **kwargs)
刪除TopicAdminClient.describe_configs(resources, **kwargs)
獲取指定resource的配置安全
BrokerMetadata定義了Kafka Broker的信息,是非實例化類,BrokerMetadata包含的屬性以下:
id:整型,Broker ID
host:字符串類型,Broker 主機名
port:整型,Broker端口bash
ClusterMetadata定義了Kafka集羣、Broker、Topic等信息,是非實例化類,ClusterMetadata包含以下屬性:
cluster_id :字符串,Kafka集羣ID字符串。
controller_id:id類型,當前Controller Broker ID
brokers:字典,key爲整型Broker ID,值爲BrokerMetadata對象。
topics :字典,key爲字符串Topic名稱, 值爲TopicMetadata對象。
orig_broker_id:整型,數據源於Broker的ID。
orig_broker_name:字符串,數據源於Broker的名稱或地址。
ConfigEntry由describe_configs()返回指定資源的配置實體,是非實例化類,ConfigEntry包含以下屬性:
name:字符串,配置屬性名稱。
value:字符串,配置值。
source :ConfigSource類型,配置源。
is_read_only:bool類型,指明配置屬性是否只讀。
is_default:bool類型,指明配置屬性是否使用默認值。
is_sensitive:bool類型,指明配置屬性值是否包含敏感信息,如安全配置。
is_synonym:bool類型,指明配置屬性是不是賦配置實體的別名。
synonyms :list類型,配置屬性的備用源的配置實體列表。
ConfigResource(restype, name, set_config=None, described_configs=None, error=None)
restype:resource類型
name:resource名稱
set_config:胚珠屬性值設置方法
described_configs:
error:錯誤信息
Kafka資源ConfigResource.Type以下:
ANY= 1,任何資源
BROKER= 4,Broker資源,資源名稱是Broker ID
GROUP= 3,Group資源,資源名稱是group.id
TOPIC= 2,Topic資源,資源名稱是Topic名稱。
UNKNOWN= 0,未知類型,未設置類型。
set_config(name, value, overwrite=True)
設置、覆寫配置實體
name:配置屬性名稱。
value:配置屬性值。
overwrite:是否覆寫。
ConfigSource是由 describe_configs()返回的配置實體的配置源。
DEFAULT_CONFIG= 5
DYNAMIC_BROKER_CONFIG= 2
DYNAMIC_DEFAULT_BROKER_CONFIG= 3
DYNAMIC_TOPIC_CONFIG= 1
STATIC_BROKER_CONFIG= 4
UNKNOWN_CONFIG= 0
PartitionsMetadata包含Kafka分區的元數據,是非實例化類,PartitionsMetadata包含的屬性以下:
id :整型,分區編號。
leader :整型,分區的當前Leader,或是-1。
replicas: 整型列表,分區的副本的Broker ID的列表。
Isrs: 整型列表,分區的ISR Broker ID列表。
error:KafkaError類型,分區錯誤。
TopicMetadata包含Kafka Topic相關的元數據,是非實例化類,TopicMetadata包含的屬性以下:
topic:字符串類型,Topic名稱
partitions:字典,key是分區編號,值是PartitionMetadata對象。
error:KafkaError類型,Topic錯誤。
AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None)
Kafka Consumer客戶端,對消息進行avro模式解碼,處理消息反序列化。
config:字典,配置參數,包含schema.registry.url和bootstrap.servers參數。
reader_key_schema:schema類型,消息key的讀取器。
reader_value_schema:(schema類型,消息值的讀取器。
AvroConsumer.poll(timeout=None)
confluent_kafka.Consumer類的poll方法的覆寫,使用avro scema處理消息的反序列化。
AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None)
對消息進行avro schema編碼的Kafka Producer客戶端,處理結構註冊、消息序列化。
config:字典,配置參數,包含schema.registry.url和bootstrap.servers參數。
default_key_schema:字符串,可選,key的默認avro schema。
default_value_schema:字符串,可選,value的默認avro schema。AvroProducer.produce(**kwargs)
異步發送消息到Kafka,使用指定編碼,默認使用avro schema。
topic:字符串,Topic名稱。
value: object類型,要序列化的對象。
value_schema:字符串,值的Avro schema。
key:object類型,要序列化的對象。
key_schema:字符串,鍵Avro schema。
Message對象表示一條消費或生產的消息,或是一個事件。
應用程序必須使用Message.error()方法檢查Message對象是不是一個正常的Message仍是一個錯誤事件。
Message類不是用戶可實例化的類,Message.len()
返回消息數據的長度Message.error()
Message對象用於傳播錯誤或事件,應用程序必須檢查error()方法以肯定Message對象是不是一個正常的消息(返回None),錯誤或是事件(返回KafkaError對象)。Message.headers()
獲取消息的頭。消息頭是鍵值對集合,消息頭的鍵是有序的,能夠重重複。
返回消息頭鍵值對的列表。Message.key()
獲取消息鍵。Message.offset()
消息位移Message.partition()
分區編號Message.set_headers(value)
使用新值設置Message.key字段的值。Message.set_value()
使用新值設置Message.value字段的值Message.timestamp()
獲取消息的時間戳類型和時間戳。時間戳類型以下:
TIMESTAMP_NOT_AVAILABLE:Broker不支持的時間戳
TIMESTAMP_CREATE_TIME:生產者時間戳
TIMESTAMP_LOG_APPEND_TIME:Broker接收時間
返回時間戳類型和時間戳的元組。
若是返回的時間戳類型是TIMESTAMP_NOT_AVAILABLE,返回的時間戳應該被忽略。時間戳要求Kafka 0.10.0.0以上版本,客戶端配置的api.version.request屬性值爲True。Message.topic()
返回消息的TopicMessage.Value()
返回消息數據。
TopicPartition是一種泛類型,用戶保存單個分區及其相關的各類信息。一般用於爲不一樣的操做提供TopicPartition列表。TopicPartition(topic[, partition][, offset])
實例化TopicPartition對象。
topic:Topic名稱
partition:分區編號
offset:位移
TopicPartition.error:屬性值,使用KafkaError表示一個錯誤。
TopicPartition.offset:屬性值,位移
TopicPartition.partition:屬性值,分區編號
TopicPartition.topic:屬性值,Topic名稱
KafkaError表示Kafka錯誤和事件對象,不是用戶實例化類,用於事件傳播、錯誤傳播、異常。KafkaError.code()
返回錯誤或事件的代碼KafkaError.name()
返回錯誤或事件的枚舉名稱KafkaError.str()
返回事件或錯誤的可讀字符串描述
KafkaException是對KafkaError類的封裝,使用exception.args[0]能夠提取KafkaError對象。
OFFSET_BEGINNING:從分區開始
OFFSET_END:分區結束位置
OFFSET_STORED:使用存儲提交位移
OFFSET_INVALID:非法/默認位移。
ThrottleEvent包含限制請求的相關數據,是非實例化類,包含以下屬性:
broker_name:字符串,限制請求的Broker的主機名稱。
broker_id:整型,Broker ID。
throttle_time:float類型,Broker限制請求的時間,單爲秒。
生產者和消費者實例的配置。
conf = {'bootstrap.servers': 'mybroker.com', 'group.id': 'mygroup', 'session.timeout.ms': 6000, 'on_commit': my_commit_callback, 'default.topic.config': {'auto.offset.reset': 'smallest'}} consumer = confluent_kafka.Consumer(conf)
Python綁定提供了其它的配置屬性:
default.topic.config:屬性值是頂層配置屬性字典。
from confluent_kafka import Producer class KafkaProducer: def __init__(self, brokers): self.producer = Producer({'bootstrap.servers': brokers}) def sendMessage(self, topic, payloads): for payload in payloads: # Trigger any available delivery report callbacks from previous produce() calls self.producer.poll(0) # Asynchronously produce a message, the delivery report callback # will be triggered from poll() above, or flush() below, when the message has # been successfully delivered or failed permanently. self.producer.produce(topic, payload.encode('utf-8'), callback=self.delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. self.producer.flush() @staticmethod def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == "__main__": producer = KafkaProducer('192.168.0.105:9092') source_data = list() for x in range(1000): source_data.append("Hello kafka{}".format(x)) producer.sendMessage('test2', source_data)
進入kafka容器:docker exec -it kafka-test /bin/bash
查看Topic的消息:kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic test2 --from-beginning
from confluent_kafka import Consumer class KafkaConsumer: def __init__(self, brokers, group): config = dict() config['bootstrap.servers'] = brokers config['group.id'] = group config['auto.offset.reset'] = 'earliest' self.consumer = Consumer(config) def subscribe(self, topics): self.consumer.subscribe(topics=topics) def pull(self): while True: msg = self.consumer.poll(1.0) if msg is None: continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) def close(self): self.consumer.close() if __name__ == "__main__": consumer = KafkaConsumer("192.168.0.105:9092", "test_group") consumer.subscribe(["test1", "test2"]) consumer.pull() consumer.close()
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer class Message: """ Message struct """ def __init__(self, key, value): self.key = {"name": "{}".format(key)} self.value = {"name": "{}".format(value)} class KafkaAvroProducer: """ Kafka Avro Producer Wrapper class """ value_schema = "" key_schema = "" def __init__(self, brokers, schema_registry_url): config = dict() config['bootstrap.servers'] = brokers config['on_delivery'] = KafkaAvroProducer.delivery_report config['schema.registry.url'] = schema_registry_url self.avro_producer = AvroProducer(config=config, default_key_schema=KafkaAvroProducer.key_schema, default_value_schema=KafkaAvroProducer.value_schema) @classmethod def register_value_schema(cls, schema): cls.key_schema = avro.loads(schema) @classmethod def register_key_schema(cls, schema): cls.value_schema = avro.loads(schema) def send_message(self, topic, messages): for message in messages: self.avro_producer.produce(topic='test1', value=message.value, key=message.key) self.avro_producer.flush() @staticmethod def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == "__main__": value_schema_str = """ { "namespace": "kafka.test", "name": "value", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ] } """ key_schema_str = """ { "namespace": "kafka.test", "name": "key", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ] } """ messages = list() for i in range(1000): messages.append(Message("key{}".format(i), "Hello Confluent Kafka{}".format(i))) KafkaAvroProducer.register_key_schema(key_schema_str) KafkaAvroProducer.register_value_schema(value_schema_str) schema_registry_url = 'http://127.0.0.1:8081' avroProducer = KafkaAvroProducer("192.168.0.105:9092", schema_registry_url) avroProducer.send_message("test1", messages=messages)
from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError class KafkaAvroConsumer: def __init__(self, brokers, group, schema_registry_url): self.avro_consumer = AvroConsumer({ 'bootstrap.servers': brokers, 'group.id': group, 'auto.offset.reset': 'earliest', 'schema.registry.url': schema_registry_url}) def subscribe(self, topics): self.avro_consumer.subscribe(topics=topics) def pull_message(self): while True: try: msg = self.avro_consumer.poll(2) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.key(), ": ", msg.value()) def close(self): self.avro_consumer.close() if __name__ == "__main__": schema_registry_url = "http://127.0.0.1:8081" avroConsumer = KafkaAvroConsumer("192.168.0.105:9092", "test_group", schema_registry_url) avroConsumer.subscribe(["test1", "test2"]) avroConsumer.pull_message() avroConsumer.close()
from confluent_kafka.admin import NewTopic, AdminClient class KafkaManager: def __init__(self, broker): self.admin_client = AdminClient({'bootstrap.servers': broker}) def create_topics(self, topics, num_partition): new_topics = [NewTopic(topic, num_partitions=num_partition, replication_factor=1) for topic in topics] fs = self.admin_client.create_topics(new_topics) # Wait for each operation to finish. for topic, f in fs.items(): try: f.result() # The result itself is None print("Topic {} created".format(topic)) except Exception as e: print("Failed to create topic {}: {}".format(topic, e)) def delete_topics(self, topics): fs = self.admin_client.delete_topics(topics=topics) for topic, f in fs.items(): try: f.result() # The result itself is None print("Topic {} deleted".format(topic)) except Exception as e: print("Failed to delete topic {}: {}".format(topic, e)) if __name__ == "__main__": import time manager = KafkaManager("192.168.0.105:9092") manager.create_topics(["test3", "test4"], 1) time.sleep(3) manager.delete_topics(["test3", "test4"])