入門 Kafka 你所須要瞭解的基本概念和開發模式

  團隊在平常工做中,通常狀況下使用的消息隊列是騰訊雲 CKafka。CKafka 提供了高可靠的開箱即用消息隊列能力,讓咱們在平常可以放心使用,減小花在運維上的投入。不過即使如此,咱們仍是須要學習 Kafka 的一些基本概念和功能,從而在實際應用中嗯可以充分高效、高質量地利用 Kafka 的能力。編程


業務基本概念

本小節主要說明的是在軟件業務層面,咱們使用 Kafka 中會接觸到的概念segmentfault

消息 Message

  對於一個消息隊列系統,最基礎的天然是 「消息」。在 Kafka 中,「消息」 就是 Message,是在 Kafka 中數據傳輸的基本單位。多個消息會被分批次寫入 Kafka 中。這同一批次的消息,就稱爲一消息。服務器

生產者消費者

  生產者和消費者的概念就很好理解了:產生消息的服務就稱爲 「生產者」 Producer,也稱爲 「發佈者」 PublisherWriter網絡

  而須要獲取消息的服務就稱爲 「消費者」 Consumer,也稱爲 「訂閱者」 SubScriberReader架構

主題 Topic 和 分區 Partition

  在 Kafka 中,全部的消息並非還有一條隊列。Kafka 的消息經過 Topic 進行分類。而一個 Topic 能夠被區分爲多個 「分區」 Partitionsapp

  這裏須要注意的是,在 同一個 partition 內部,消息的順序是可以保證的。也就是說:若是消息A到達 partition 的時間早於消息B,那麼消費者在獲取消息的時候,必然是先得到消息A以後,纔可以獲取到消息B。負載均衡

  可是,若是在多個 partitions 之間,消息的順序就沒法保證了。好比當消費者監聽多個 partitions 時的話,消息A和消息B被讀取出來的時間沒法保證。運維

  那這麼一來,partition 有什麼用呢?實際上 Partition 是用來作負載均衡的。當 comsumer 將消息發到一個 topic 上時,Kafka 默認會將消息儘可能均衡地分發到多個 partitions 上。做爲消費者監聽 topic 時,須要配置監聽哪些 partitions。一個 consumer 能夠監聽多個 partitions,comsumer 和 partition(s) 的對應關係也稱爲 「全部權關係」。異步

偏移量 Offset

  Offset 是一個遞增的整數值,由 Kafka 自動遞增後自動寫入每個 partition 中。在同一個 partition 中,一個 offset 值惟一對應着一條 message。此外,因爲 offset 是遞增的,所以也能夠用來區分多個 message 之間的順序。Consumer 的重啓動做並不影響 offset 的值,由於這是 Kafka 來進行維護的數值。分佈式

Broker 和 集羣

  一個獨立的 Kafka server 就稱爲一個 broker。一個或多個 broker 能夠組成一個 「集羣」 broker cluster。Kafka 雖然是一個分佈式的消息隊列系統,可是在集羣中,Kafka 依然是準中心化的系統架構。也就是說每個集羣中依然是有一臺主 broker,稱爲 controller

每個 cluster 會自動選舉一個 cluster controller 出來,controller 須要負責如下操做:

  1. 管理 cluster
  2. 將 partition 分配給 broker 和監控 broker。

  
在 cluster 中,一個 partition 會從屬於一個 broker,這個 broker 也會稱做該 partition 的 leader。同時該 partition 也能夠分配給多個 broker,進行 分區複製——若是其中一個 broker 失效了,那麼其他的 broker 能夠儘快接管 leader 的位置。若是是使用雲原生的 Kafka,咱們通常就不須要太擔憂這個問題。


安裝/運維基本概念

Kafka 部署架構

  若是是運維本身安裝 Kafka 的話,須要提早安裝的軟件是 JavaZookeeper。我當時就很是疑惑怎麼多了一個 Zookeeper?實際上 Kafka 是使用 Zookeeper 來保存 cluster 中的元數據和消費者信息。這裏體現出了 Java 強大和完善的生態圈,各類方案都可以找到已有的輪子。

image

  Zookeeper 也支持集羣部署。Zookeeper 集羣稱爲 「羣組」 Ensemble。由於 ensemble 也是使用了選舉機制,所以每一個 ensemble 中有奇數個節點,不建議超過7個。若是咱們使用了雲原生的 Kafka,就不須要過多關心這個細節啦。

Topic 參數

  部署好了 broker 和 Zookeeper 以後,咱們就能夠建立 topics 了。建立 topic 時有一些參數須要進行配置。主要的有如下幾項須要特別留意:

  • num.partitions: 新 topic 默認的分區數。在後續運維中,partition 的數量只會增長,不會減小。在騰訊雲 CKafka 中,這對應着 「分區數」 配置
  • log.retension.ms: 按照時間決定 topic 中的數據能夠保存多久。這對應 CKafka 界面中的 「retension.ms」 參數
  • log.retension.bytes: 按照存儲空間決定 topic 中的數據能夠保存多少。該參數在 CKafka 中不支持
  • log.segment.bytes: 表示按照存儲空間決定日誌片斷文件的大小。該參數在 CKafka 中不支持
  • log.segment.ms: 表示按照時長決定日誌片斷大小。對應 CKafka 界面中的 「segment.ms」 參數。不是必要參數

image

如何選擇 Partitions 的數量?

  前面提到,在同一個 partition 中,消息的順序是可以獲得保證的。所以對於一個小型的、對可靠性要求不高、可是對順序性要求很高的系統而言,或許可使用單 partition 的方案。

  可是這個方案實際上是很是危險的:

  • 首先,單一 partition 就意味着 consumer 也只能有一個,不然會出現消息重複消費的問題。在一個生產項目中進行單點部署,這幾乎是不可接受的
  • 雖然在 Kafka 內部,單一 partition 內的消息順序可以獲得保證,但若是生產者未能獲得保證的話,那麼 kafka 內的消息順序依然不是真實的。所以對於有強順序要求的消息隊列系統中,不建議使用時間順序,而是採用邏輯順序/邏輯時鐘來區分消息的前後。

所以在實際生產環境中,咱們應當適當地分配 partition 的數量。若是對順序性有要求,那麼不該該依賴 kafka 的順序機制,而是使用額外的機制來保證。


Kafka 生產者

架構圖

  生產者向 Kafka broker 發送消息通常是用各語言的 SDK 來完成的。下面框圖中是 SDK 完成的邏輯。首先 producer 在發送 message 以前,須要將 message 封裝到 producer record 中,record 包含的必填信息是 topic 和 value(也就是 message 正文)信息。此外還可選 partition 和 key 信息,不過相對少用。Key 參數的做用後文會做說明。

image

  當消息被寫入 Kafka broker 以後,broker 會回調到 SDK 中,將消息最終落地的 partition 和 partition 中的 offset 信息返回給 SDK,並最終視須要返回給 Producer。

消息發送

  Kafka 生產者有兩種消息發送方式:同步異步

  同步發送方式就是生產者發出的每個消息,都須要按照上面的結構圖的流程處理:消息發出後等待 Kafka broker 的結果響應以後再作進一步的處理。Kafka broker 返回的錯誤中包含了兩種錯誤:

  1. 可重試錯誤: 當遇到這一類錯誤時,生產者能夠直接從新嘗試發送。好比網絡錯誤、集羣錯誤等等。
  2. 不可重試錯誤: 當遇到這一類錯誤時,生產者只能考慮告警、記錄、修改軟件邏輯等等。好比消息過大等等。

異步發送方式就是生產者經過 SDK 發送消息以後就直接返回;SDK 在後臺處理消息的發送、響應處理,而後經過回調告知生產者以進行進一步的處理。

生產者參數

  生產者啓動以前也有一些參數可進行配置。讀者能夠在各語言的 SDK 中具體查找:

  • acks: 消息發送給 Kafka broker,因爲實際上會有多個 broker,所以消息是須要複製多份的。該參數表示須要等待多少個 broker 的響應,才視爲消息發送成功:

    • 0: 表示不須要等待 broker 響應
    • 1: 表示 leader 響應便可
    • all: 表示須要全部的 broker 響應
  • buffer.memory: 生產者的緩衝區大小。單位是 message 的數量。當緩衝區滿了以後,SDK 會根據 maxblock.ms 等待並阻塞一段時間以後再進行重試。若是緩衝區仍是滿了的狀態,則會拋出異常或返回錯誤
  • compression.type: 消息壓縮格式,可選值爲: snappy, gzip, lz4
  • retries: 重試次數,重試間隔爲 retry.backoff.ms,默認是 100ms
  • batch.size: 一個批次的數據大小,字節數。爲了減小網絡傳輸中的消耗,Kafka 生產者並非一個消息就經過一次發送發出去,而是組成一個個批次進行發送。當一個批次的大小達到這個參數時,則會立刻發出。
  • linger.ms: 一個批次發送以前的緩衝時間。當批次的尺寸未達到 batch.size 的話,SDK 也不會一直按住 message 不發送,而是等待一段時間以後也會把內存中的批次發出
  • client id: 自定義字符串,用於標識生產者
  • max.id.flight.requests.per.connection: 這個參數指的是收到服務器響應以前,生產者能夠發送的消息數。設置爲 1 能夠保證消息順序,可是相應的效率就降低了
  • request.timeout.ms: 生產者發送數據以後等待響應的時間

Key 的做用

  在 producer record 中的 key 有兩個用途:

  1. 做爲消息的附加消息
  2. 能夠用來決定寫入到哪個分區。默認分區器可使擁有相同 key 的消息寫入同一個分區。
  3. 若是 key == null,則默認採用輪詢方式寫入分區
  4. 若是 key 非空,則根據哈希結果決定分區

生產者也能夠經過自定義分區器來實現業務的具體分區功能,具體參見各語言的 SDK


Kafka 消費者

  一個 Kafka 的消費者是從屬於其對應的 comsumer group 的,每個 group 訂閱一個 topic,每一個 consumer 消費一部分的消息。整個 group 內部經過消費不一樣的 partition 實現負載均衡。每個 group 都有一個 group.id 用於標識一個消費者羣組,這在業務中就對應着一個消費者業務。

  不要讓消費者數量多於分區數量,不然會致使出現重複消費的問題。所以在 partition 選用時,宜多不宜省。更多的分區數量也可以更加合理地分配 consumer 之間的負載。

分區再均衡 Partitions Reoke / Rebalance

  每一個消費者能夠對應一個或多個 partition;多個 consumer 組成一個 group,覆蓋 topic 的所有 partitions。可是當 consumer 和 partitions 數量發生變化時,須要從新分配全部權關係。這個動做就稱爲 Rebalance。至因而熱切換仍是冷切換,則由業務方決定。

  消費者在調用 subscribe() 監聽消息時,能夠傳入一個 ConsumerRebalanceListener 實例來監聽事件。其中須要關注的事件有:

  • onPartitionsRevoked(): 這是再平衡開始以前的事件。注意此時消費者應中止消費,而且 commit 已完成但還沒有 commit 的 offset 值
  • onPartitionsAssigned(): 這是再平衡結束,也就是從新分配分區結束以後的時間。大部分狀況下消費者也不須要特別處理什麼,不過能夠在這裏進行一些消費過程的重啓動做

Commit 和 Offset

  前文提到,一個 message 可以與 kafka 中的一個 partition 中的一個 offset 值一一對應。對於消費者而言,partition-offset 對也能夠用於標識當前 comsumer 已經獲取到的消息的進度,也能夠用於消費者在 kafka 中進行歷史消息的尋址。

  當對某個 message 消費完成後,消費者會將 offset 值提交到 kafka 中,從而讓 Kafka 識別並保存某個 comsumer group 的消費進度。下一次 consumer 再請求事件時,默認會從該 offset 日後繼續獲取。Consummer 向 Kafka 更新 offset 的這一動做就稱爲 「提交」 commit

  若是 consumer 發生崩潰,或者有新的 consumer 加入 group,就會觸發 rebalance。完成 rebalancing 以後,每一個 consumer 有可能會被分配到不一樣的分區。爲了可以繼續以前的工做,consumer 須要讀取每個分區最後一次提交的 offset,而後從指定的 offset 繼續處理。這個操做,通常在 SDK 中就完成了。可是在上述切換過程當中,因爲分佈式系統的分佈式、異步特性,咱們不可避免的仍是可能遇到一些不一致的狀況,具體表現爲消息的重複處理和漏處理。因此咱們在任什麼時候候都不能簡單依賴 Kafka 自己提供的消息隊列機制,而是在各自的業務系統中也須要進行必定的防護式編程,避免錯誤處理出現。

  通常而言,SDK 有下面幾種 commit 方式:

  • 自動提交: enable.auto.committrue 時,API 定時、異步地進行 commit。所以,若是在觸發了再均衡的時候還有部分數據未 commit,那麼在再均衡以後在其餘的消費者中就有可能發生重複消費
  • 主動提交: enable.auto.commitfalse 時,業務方須要主動調用相關 API 進行 commit。
  • (主動的)異步提交: 其實就是主動提交的異步版,簡單而言就是開一個後臺異步 commit 的過程。
  • 提交特定的 offset: 這種模式就是顯式地 commit 具體 partition 的某個 offset 值。

參考資料


本文章採用 知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議 進行許可。

原做者: amc,歡迎轉載,但須要按照以上許可協議註明出處。

本文連接: https://segmentfault.com/a/1190000038592433

原文標題:入門 Kafka 你所須要瞭解的基本概念和開發模式

發佈日期:2020-12-01

原文連接:https://cloud.tencent.com/developer/article/1755160。也是本人的博客

相關文章
相關標籤/搜索