如何使用Python讀寫Kafka?

如何使用Python讀寫Kafka?

如何使用Python讀寫Kafka?

攝影:產品經理
吃了不會禿頭的禿黃油
關於Kafka的第三篇文章,咱們來說講如何使用Python讀寫Kafka。這一篇文章裏面,咱們要使用的一個第三方庫叫作kafka-python。你們可使用pip或者pipenv安裝它。下面兩種安裝方案,任選其一便可。python

python3 -m pip install kafka-python
pipenv install kafka-python

以下圖所示:json

如何使用Python讀寫Kafka?
這篇文章,咱們將會使用最短的代碼來實現一個讀、寫Kafka的示例。bootstrap

建立配置文件服務器

因爲生產者和消費者都須要鏈接Kafka,因此我單獨寫了一個配置文件config.py用來保存鏈接Kafka所須要的各個參數,而不是直接把這些參數Hard Code寫在代碼裏面:ide

# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'

本文演示所用的Kafka由我司平臺組的同事搭建,須要帳號密碼才能鏈接,因此我在配置文件中加上了USERNAME和PASSWORD兩項。你使用的Kafka若是沒有帳號和密碼,那麼你只須要SERVER和TOPIC便可。rest

建立生產者code

代碼簡單到甚至不須要解釋。首先使用KafkaProducer類鏈接 Kafka,得到一個生產者對象,而後往裏面寫數據。server

import json
import time
import datetime
import config
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=config.SERVER,
                         value_serializer=lambda m: json.dumps(m).encode())

for i in range(100):
    data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(config.TOPIC, data)
    time.sleep(1)

參數bootstrap_servers用於指定 Kafka 的服務器鏈接地址。對象

參數value_serializer用來指定序列化的方式。這裏我使用 json 來序列化數據,從而實現我向 Kafka 傳入一個字典,Kafka 自動把它轉成 JSON 字符串的效果。blog

以下圖所示:
如何使用Python讀寫Kafka?

注意,上圖中,我多寫了4個參數:

security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD

這四個參數是由於我這裏須要經過密碼鏈接 Kafka 而加上的,若是你的 Kafka 沒有帳號密碼,就不須要這四個參數。

建立消費者

Kafka 消費者也須要鏈接 Kafka,首先使用KafkaConsumer類初始化一個消費者對象,而後循環讀取數據。代碼以下:

import config
from kafka import KafkaConsumer

consumer = KafkaConsumer(config.TOPIC,
                         bootstrap_servers=config.SERVER,
                         group_id='test',
                         auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)

KafkaConsumer 的第一個參數用於指定 Topic。你能夠把這個 Topic 理解成 Redis 的 Key。

bootstrap_servers用於指定 Kafka 服務器鏈接地址。

group_id這個參數後面的字符串能夠任意填寫。若是兩個程序的Topic與group_id相同,那麼他們讀取的數據不會重複,兩個程序的Topic相同,可是group_id不一樣,那麼他們各自消費所有數據,互不影響。

auto_offset_rest 這個參數有兩個值,earliest和latest,若是省略這個參數,那麼默認就是latest。這個參數會單獨介紹。這裏先略過。

鏈接好 Kafka 之後,直接對消費者對象使用 for 循環迭代,就能持續不斷獲取裏面的數據了。

運行演示

運行兩個消費者程序和一個生產者程序,效果以下圖所示。
如何使用Python讀寫Kafka?

咱們能夠看到,兩個消費者程序讀取數據不重複,不遺漏。

當全部數據都消費完成之後,若是你把兩個消費者程序關閉,再運行其中一個,你會發現已經沒有數據會被打印出來了。

但若是你修改一下 group_id,程序又能正常從頭開始消費了,以下圖所示:
如何使用Python讀寫Kafka?

不少人都會搞混的幾個地方

earliest 與 latest

在咱們建立消費者對象的時候,有一個參數叫作auto_offset_reset='earliest'。有人看到earliest與latest,想固然地認爲設置爲earliest,就是從 Topic 的頭日後讀,設置爲latest就是忽略以前的數據,從程序運行之後,新來的數據開始讀。

這種見解是不正確的。

auto_offset_reset這個參數,只有在一個group第一次運行的時候纔有做用,從第二次運行開始,這個參數就失效了。

假設如今你的 Topic 裏面有100個數據,你設置了一個全新的 group_id 爲test2。auto_offset_reset設置爲 earliest。那麼當你的消費者運行的時候,Kafka 會先把你的 offset 設置爲0,而後讓你從頭開始消費的。

假設如今你的 Topic 裏面有100個數據,你設置了一個全新的 group_id 爲test3。auto_offset_reset設置爲 latest。那麼當你的消費者運行的時候,Kafka 不會給你返回任何數據,消費者看起來就像卡住了同樣,可是 Kafka 會直接強制把前100條數據的狀態設置爲已經被你消費的狀態。因此當前你的 offset 就直接是99了。直到生產者插入了一條新的數據,此時消費者才能讀取到。這條新的數據對應的 offset 就變成了100。

假設如今你的 Topic 裏面有100個數據,你設置了一個全新的 group_id 爲test4。auto_offset_reset設置爲 earliest。那麼當你的消費者運行的時候,Kafka 會先把你的 offset 設置爲0,而後讓你從頭開始消費的。等消費到第50條數據時,你把消費者程序關了,把auto_offset_reset設置爲latest,再從新運行。此時消費者依然會接着從第51條數據開始讀取。不會跳過剩下的50條數據。

因此,auto_offset_reset的做用,是在你的 group 第一次運行,尚未 offset 的時候,給你設定初始的 offset。而一旦你這個 group 已經有 offset 了,那麼auto_offset_reset這個參數就不會再起做用了。

partition 是如何分配的?

對於同一個 Topic 的同一個 Group:

假設你的 Topic 有10個 Partition,一開始你只啓動了1個消費者。那麼這個消費者會輪換着從這10個Partition 中讀取數據。

當你啓動第二個消費者時,Kafka 會從第一個消費者手上搶走5個Partition,分給第二個消費者。因而兩個消費者各自讀5個 Partition。互不影響。

當第三個消費者又出現時,Kafka 從第一個消費者手上再搶走1個 Partition,從第二個消費者手上搶走2個 Partition 給第三個消費者。因而,消費者1有4個 Partition,消費者2有3個 Partition,消費者3有3個 Partiton,互不影響。

當你有10個消費者一塊兒消費時,每一個消費者讀取一個 Partition,互不影響。

當第11個消費者出現時,它因爲分配不到 Partition,因此它什麼都讀不到。

因此在上一篇文章中,我說,在同一個 Topic,同一個 Group 中,你有多少個 Partiton,就能起多少個進程同時消費。

Kafka 是否是徹底不重複不遺漏?

在極端狀況下,Kafka 會重複,也會遺漏,可是這種極端狀況並不常見。若是你的 Kafka 頻繁漏數據,或者老是出現重複數據,那麼確定是你環境沒有搭建正確,或者代碼有問題。

忠告

再次提醒:專業的人作專業的事情,不要輕易自建Kafka 集羣。讓專門的同事複製搭建和維護,你只管使用。這纔是最高效省事的作法。

如何使用Python讀寫Kafka?

相關文章
相關標籤/搜索