團隊在平常工做中,通常狀況下使用的消息隊列是騰訊雲 CKafka。CKafka 提供了高可靠的開箱即用消息隊列能力,讓咱們在平常可以放心使用,減小花在運維上的投入。不過即使如此,咱們仍是須要學習 Kafka 的一些基本概念和功能,從而在實際應用中嗯可以充分高效、高質量地利用 Kafka 的能力。編程
本小節主要說明的是在軟件業務層面,咱們使用 Kafka 中會接觸到的概念segmentfault
對於一個消息隊列系統,最基礎的天然是 「消息」。在 Kafka 中,「消息」 就是 Message
,是在 Kafka 中數據傳輸的基本單位。多個消息會被分批次寫入 Kafka 中。這同一批次的消息,就稱爲一組消息。服務器
生產者和消費者的概念就很好理解了:產生消息的服務就稱爲 「生產者」 Producer
,也稱爲 「發佈者」 Publisher
或 Writer
。網絡
而須要獲取消息的服務就稱爲 「消費者」 Consumer
,也稱爲 「訂閱者」 SubScriber
或 Reader
。架構
在 Kafka 中,全部的消息並非還有一條隊列。Kafka 的消息經過 Topic
進行分類。而一個 Topic 能夠被區分爲多個 「分區」 Partitions
。app
這裏須要注意的是,在 同一個 partition 內部,消息的順序是可以保證的。也就是說:若是消息A到達 partition 的時間早於消息B,那麼消費者在獲取消息的時候,必然是先得到消息A以後,纔可以獲取到消息B。負載均衡
可是,若是在多個 partitions 之間,消息的順序就沒法保證了。好比當消費者監聽多個 partitions 時的話,消息A和消息B被讀取出來的時間沒法保證。運維
那這麼一來,partition 有什麼用呢?實際上 Partition 是用來作負載均衡的。當 comsumer 將消息發到一個 topic 上時,Kafka 默認會將消息儘可能均衡地分發到多個 partitions 上。做爲消費者監聽 topic 時,須要配置監聽哪些 partitions。一個 consumer 能夠監聽多個 partitions,comsumer 和 partition(s) 的對應關係也稱爲 「全部權關係」。異步
Offset
是一個遞增的整數值,由 Kafka 自動遞增後自動寫入每個 partition 中。在同一個 partition 中,一個 offset 值惟一對應着一條 message。此外,因爲 offset 是遞增的,所以也能夠用來區分多個 message 之間的順序。Consumer 的重啓動做並不影響 offset 的值,由於這是 Kafka 來進行維護的數值。分佈式
一個獨立的 Kafka server 就稱爲一個 broker
。一個或多個 broker 能夠組成一個 「集羣」 broker cluster
。Kafka 雖然是一個分佈式的消息隊列系統,可是在集羣中,Kafka 依然是準中心化的系統架構。也就是說每個集羣中依然是有一臺主 broker,稱爲 controller
。
每個 cluster 會自動選舉一個 cluster controller
出來,controller 須要負責如下操做:
在 cluster 中,一個 partition 會從屬於一個 broker,這個 broker 也會稱做該 partition 的 leader
。同時該 partition 也能夠分配給多個 broker,進行 分區複製——若是其中一個 broker 失效了,那麼其他的 broker 能夠儘快接管 leader 的位置。若是是使用雲原生的 Kafka,咱們通常就不須要太擔憂這個問題。
若是是運維本身安裝 Kafka 的話,須要提早安裝的軟件是 Java
和 Zookeeper
。我當時就很是疑惑怎麼多了一個 Zookeeper
?實際上 Kafka 是使用 Zookeeper 來保存 cluster 中的元數據和消費者信息。這裏體現出了 Java 強大和完善的生態圈,各類方案都可以找到已有的輪子。
Zookeeper 也支持集羣部署。Zookeeper 集羣稱爲 「羣組」 Ensemble
。由於 ensemble 也是使用了選舉機制,所以每一個 ensemble 中有奇數個節點,不建議超過7個。若是咱們使用了雲原生的 Kafka,就不須要過多關心這個細節啦。
部署好了 broker 和 Zookeeper 以後,咱們就能夠建立 topics 了。建立 topic 時有一些參數須要進行配置。主要的有如下幾項須要特別留意:
num.partitions
: 新 topic 默認的分區數。在後續運維中,partition 的數量只會增長,不會減小。在騰訊雲 CKafka 中,這對應着 「分區數」 配置log.retension.ms
: 按照時間決定 topic 中的數據能夠保存多久。這對應 CKafka 界面中的 「retension.ms」 參數log.retension.bytes
: 按照存儲空間決定 topic 中的數據能夠保存多少。該參數在 CKafka 中不支持log.segment.bytes
: 表示按照存儲空間決定日誌片斷文件的大小。該參數在 CKafka 中不支持log.segment.ms
: 表示按照時長決定日誌片斷大小。對應 CKafka 界面中的 「segment.ms」 參數。不是必要參數
前面提到,在同一個 partition 中,消息的順序是可以獲得保證的。所以對於一個小型的、對可靠性要求不高、可是對順序性要求很高的系統而言,或許可使用單 partition 的方案。
可是這個方案實際上是很是危險的:
所以在實際生產環境中,咱們應當適當地分配 partition 的數量。若是對順序性有要求,那麼不該該依賴 kafka 的順序機制,而是使用額外的機制來保證。
生產者向 Kafka broker 發送消息通常是用各語言的 SDK 來完成的。下面框圖中是 SDK 完成的邏輯。首先 producer 在發送 message 以前,須要將 message 封裝到 producer record
中,record 包含的必填信息是 topic 和 value(也就是 message 正文)信息。此外還可選 partition 和 key 信息,不過相對少用。Key 參數的做用後文會做說明。
當消息被寫入 Kafka broker 以後,broker 會回調到 SDK 中,將消息最終落地的 partition 和 partition 中的 offset 信息返回給 SDK,並最終視須要返回給 Producer。
Kafka 生產者有兩種消息發送方式:同步 和 異步。
同步發送方式就是生產者發出的每個消息,都須要按照上面的結構圖的流程處理:消息發出後等待 Kafka broker 的結果響應以後再作進一步的處理。Kafka broker 返回的錯誤中包含了兩種錯誤:
異步發送方式就是生產者經過 SDK 發送消息以後就直接返回;SDK 在後臺處理消息的發送、響應處理,而後經過回調告知生產者以進行進一步的處理。
生產者啓動以前也有一些參數可進行配置。讀者能夠在各語言的 SDK 中具體查找:
acks
: 消息發送給 Kafka broker,因爲實際上會有多個 broker,所以消息是須要複製多份的。該參數表示須要等待多少個 broker 的響應,才視爲消息發送成功:
buffer.memory
: 生產者的緩衝區大小。單位是 message 的數量。當緩衝區滿了以後,SDK 會根據 maxblock.ms
等待並阻塞一段時間以後再進行重試。若是緩衝區仍是滿了的狀態,則會拋出異常或返回錯誤compression.type
: 消息壓縮格式,可選值爲: snappy
, gzip
, lz4
retries
: 重試次數,重試間隔爲 retry.backoff.ms
,默認是 100msbatch.size
: 一個批次的數據大小,字節數。爲了減小網絡傳輸中的消耗,Kafka 生產者並非一個消息就經過一次發送發出去,而是組成一個個批次進行發送。當一個批次的大小達到這個參數時,則會立刻發出。linger.ms
: 一個批次發送以前的緩衝時間。當批次的尺寸未達到 batch.size
的話,SDK 也不會一直按住 message 不發送,而是等待一段時間以後也會把內存中的批次發出client id
: 自定義字符串,用於標識生產者max.id.flight.requests.per.connection
: 這個參數指的是收到服務器響應以前,生產者能夠發送的消息數。設置爲 1 能夠保證消息順序,可是相應的效率就降低了request.timeout.ms
: 生產者發送數據以後等待響應的時間 在 producer record 中的 key
有兩個用途:
生產者也能夠經過自定義分區器來實現業務的具體分區功能,具體參見各語言的 SDK
一個 Kafka 的消費者是從屬於其對應的 comsumer group 的,每個 group 訂閱一個 topic,每一個 consumer 消費一部分的消息。整個 group 內部經過消費不一樣的 partition 實現負載均衡。每個 group 都有一個 group.id
用於標識一個消費者羣組,這在業務中就對應着一個消費者業務。
不要讓消費者數量多於分區數量,不然會致使出現重複消費的問題。所以在 partition 選用時,宜多不宜省。更多的分區數量也可以更加合理地分配 consumer 之間的負載。
每一個消費者能夠對應一個或多個 partition;多個 consumer 組成一個 group,覆蓋 topic 的所有 partitions。可是當 consumer 和 partitions 數量發生變化時,須要從新分配全部權關係。這個動做就稱爲 Rebalance
。至因而熱切換仍是冷切換,則由業務方決定。
消費者在調用 subscribe()
監聽消息時,能夠傳入一個 ConsumerRebalanceListener
實例來監聽事件。其中須要關注的事件有:
onPartitionsRevoked()
: 這是再平衡開始以前的事件。注意此時消費者應中止消費,而且 commit 已完成但還沒有 commit 的 offset 值onPartitionsAssigned()
: 這是再平衡結束,也就是從新分配分區結束以後的時間。大部分狀況下消費者也不須要特別處理什麼,不過能夠在這裏進行一些消費過程的重啓動做前文提到,一個 message 可以與 kafka 中的一個 partition 中的一個 offset 值一一對應。對於消費者而言,partition-offset 對也能夠用於標識當前 comsumer 已經獲取到的消息的進度,也能夠用於消費者在 kafka 中進行歷史消息的尋址。
當對某個 message 消費完成後,消費者會將 offset 值提交到 kafka 中,從而讓 Kafka 識別並保存某個 comsumer group 的消費進度。下一次 consumer 再請求事件時,默認會從該 offset 日後繼續獲取。Consummer 向 Kafka 更新 offset 的這一動做就稱爲 「提交」 commit
。
若是 consumer 發生崩潰,或者有新的 consumer 加入 group,就會觸發 rebalance。完成 rebalancing 以後,每一個 consumer 有可能會被分配到不一樣的分區。爲了可以繼續以前的工做,consumer 須要讀取每個分區最後一次提交的 offset,而後從指定的 offset 繼續處理。這個操做,通常在 SDK 中就完成了。可是在上述切換過程當中,因爲分佈式系統的分佈式、異步特性,咱們不可避免的仍是可能遇到一些不一致的狀況,具體表現爲消息的重複處理和漏處理。因此咱們在任什麼時候候都不能簡單依賴 Kafka 自己提供的消息隊列機制,而是在各自的業務系統中也須要進行必定的防護式編程,避免錯誤處理出現。
通常而言,SDK 有下面幾種 commit 方式:
enable.auto.commit
爲 true
時,API 定時、異步地進行 commit。所以,若是在觸發了再均衡的時候還有部分數據未 commit,那麼在再均衡以後在其餘的消費者中就有可能發生重複消費enable.auto.commit
爲 false
時,業務方須要主動調用相關 API 進行 commit。本文章採用 知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議 進行許可。
原做者: amc,歡迎轉載,但須要按照以上許可協議註明出處。
本文連接: https://segmentfault.com/a/1190000038592433
原文標題:入門 Kafka 你所須要瞭解的基本概念和開發模式
發佈日期:2020-12-01
原文連接:https://cloud.tencent.com/developer/article/1755160。也是本人的博客