真的,Kafka 入門一篇文章就夠了

image.png

初識 Kafka

什麼是 Kafka

Kafka 是由 Linkedin 公司開發的,它是一個分佈式的,支持多分區、多副本,基於 Zookeeper 的分佈式消息流平臺,它同時也是一款開源的基於發佈訂閱模式的消息引擎系統html

Kafka 的基本術語

消息:Kafka 中的數據單元被稱爲消息,也被稱爲記錄,能夠把它看做數據庫表中某一行的記錄。前端

批次:爲了提升效率, 消息會分批次寫入 Kafka,批次就代指的是一組消息。java

主題:消息的種類稱爲 主題(Topic),能夠說一個主題表明了一類消息。至關因而對消息進行分類。主題就像是數據庫中的表。程序員

分區:主題能夠被分爲若干個分區(partition),同一個主題中的分區能夠不在一個機器上,有可能會部署在多個機器上,由此來實現 kafka 的伸縮性,單一主題中的分區有序,可是沒法保證主題中全部的分區有序web

image.png

生產者: 向主題發佈消息的客戶端應用程序稱爲生產者(Producer),生產者用於持續不斷的向某個主題發送消息。正則表達式

消費者:訂閱主題消息的客戶端程序稱爲消費者(Consumer),消費者用於處理生產者產生的消息。算法

消費者羣組:生產者與消費者的關係就如同餐廳中的廚師和顧客之間的關係同樣,一個廚師對應多個顧客,也就是一個生產者對應多個消費者,消費者羣組(Consumer Group)指的就是由一個或多個消費者組成的羣體。數據庫

image.png

偏移量:偏移量(Consumer Offset)是一種元數據,它是一個不斷遞增的整數值,用來記錄消費者發生重平衡時的位置,以便用來恢復數據。apache

broker: 一個獨立的 Kafka 服務器就被稱爲 broker,broker 接收來自生產者的消息,爲消息設置偏移量,並提交消息到磁盤保存。bootstrap

broker 集羣:broker 是集羣 的組成部分,broker 集羣由一個或多個 broker 組成,每一個集羣都有一個 broker 同時充當了集羣控制器的角色(自動從集羣的活躍成員中選舉出來)。

副本:Kafka 中消息的備份又叫作 副本(Replica),副本的數量是能夠配置的,Kafka 定義了兩類副本:領導者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對外提供服務,後者只是被動跟隨。

重平衡:Rebalance。消費者組內某個消費者實例掛掉後,其餘消費者實例自動從新分配訂閱主題分區的過程。Rebalance 是 Kafka 消費者端實現高可用的重要手段。

Kafka 的特性(設計原則)

  • 高吞吐、低延遲:kakfa 最大的特色就是收發消息很是快,kafka 每秒能夠處理幾十萬條消息,它的最低延遲只有幾毫秒。
  • 高伸縮性: 每一個主題(topic) 包含多個分區(partition),主題中的分區能夠分佈在不一樣的主機(broker)中。
  • 持久性、可靠性: Kafka 可以容許數據的持久化存儲,消息被持久化到磁盤,並支持數據備份防止數據丟失,Kafka 底層的數據存儲是基於 Zookeeper 存儲的,Zookeeper 咱們知道它的數據可以持久存儲。
  • 容錯性: 容許集羣中的節點失敗,某個節點宕機,Kafka 集羣可以正常工做
  • 高併發: 支持數千個客戶端同時讀寫

Kafka 的使用場景

  • 活動跟蹤:Kafka 能夠用來跟蹤用戶行爲,好比咱們常常回去淘寶購物,你打開淘寶的那一刻,你的登錄信息,登錄次數都會做爲消息傳輸到 Kafka ,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會做爲一個個消息傳遞給 Kafka ,這樣就能夠生成報告,能夠作智能推薦,購買喜愛等。
  • 傳遞消息:Kafka 另一個基本用途是傳遞消息,應用程序向用戶發送通知就是經過傳遞消息來實現的,這些應用組件能夠生成消息,而不須要關心消息的格式,也不須要關心消息是如何發送的。
  • 度量指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
  • 日誌記錄:Kafka 的基本概念來源於提交日誌,好比咱們能夠把數據庫的更新發送到 Kafka 上,用來記錄數據庫的更新時間,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
  • 流式處理:流式處理是有一個可以提供多種應用程序的領域。
  • 限流削峯:Kafka 多用於互聯網領域某一時刻請求特別多的狀況下,能夠把請求寫入Kafka 中,避免直接請求後端程序致使服務崩潰。

Kafka 的消息隊列

Kafka 的消息隊列通常分爲兩種模式:點對點模式和發佈訂閱模式

Kafka 是支持消費者羣組的,也就是說 Kafka 中會有一個或者多個消費者,若是一個生產者生產的消息由一個消費者進行消費的話,那麼這種模式就是點對點模式

image.png

若是一個生產者或者多個生產者產生的消息可以被多個消費者同時消費的狀況,這樣的消息隊列成爲發佈訂閱模式的消息隊列

image.png

Kafka 系統架構

image.png

如上圖所示,一個典型的 Kafka 集羣中包含若干Producer(能夠是web前端產生的Page View,或者是服務器日誌,系統CPU、Memory等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干Consumer Group,以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發佈到broker,Consumer使用pull模式從broker訂閱並消費消息。

核心 API

Kafka 有四個核心API,它們分別是

  • Producer API,它容許應用程序向一個或多個 topics 上發送消息記錄
  • Consumer API,容許應用程序訂閱一個或多個 topics 並處理爲其生成的記錄流
  • Streams API,它容許應用程序做爲流處理器,從一個或多個主題中消費輸入流併爲其生成輸出流,有效的將輸入流轉換爲輸出流。
  • Connector API,它容許構建和運行將 Kafka 主題鏈接到現有應用程序或數據系統的可用生產者和消費者。例如,關係數據庫的鏈接器可能會捕獲對錶的全部更改

image.png

Kafka 爲什麼如此之快

Kafka 實現了零拷貝原理來快速移動數據,避免了內核之間的切換。Kafka 能夠將數據記錄分批發送,從生產者到文件系統(Kafka 主題日誌)到消費者,能夠端到端的查看這些批次的數據。

批處理可以進行更有效的數據壓縮並減小 I/O 延遲,Kafka 採起順序寫入磁盤的方式,避免了隨機磁盤尋址的浪費,更多關於磁盤尋址的瞭解,請參閱 程序員須要瞭解的硬核知識之磁盤

總結一下其實就是四個要點

  • 順序讀寫
  • 零拷貝
  • 消息壓縮
  • 分批發送

Kafka 安裝和重要配置

Kafka 安裝我在 Kafka 系列第一篇應該比較詳細了,詳情見帶你漲姿式的認識一下kafka 這篇文章。

那咱們仍是主要來講一下 Kafka 中的重要參數配置吧,這些參數對 Kafka 來講是很是重要的。

broker 端配置

  • broker.id

每一個 kafka broker 都有一個惟一的標識來表示,這個惟一的標識符便是 broker.id,它的默認值是 0。這個值在 kafka 集羣中必須是惟一的,這個值能夠任意設定,

  • port

若是使用配置樣原本啓動 kafka,它會監聽 9092 端口。修改 port 配置參數能夠把它設置成任意的端口。要注意,若是使用 1024 如下的端口,須要使用 root 權限啓動 kakfa。

  • zookeeper.connect

用於保存 broker 元數據的 Zookeeper 地址是經過 zookeeper.connect 來指定的。好比我能夠這麼指定 localhost:2181 表示這個 Zookeeper 是運行在本地 2181 端口上的。咱們也能夠經過 好比咱們能夠經過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個參數值。該配置參數是用冒號分割的一組 hostname:port/path 列表,其含義以下

hostname 是 Zookeeper 服務器的機器名或者 ip 地址。

port 是 Zookeeper 客戶端的端口號

/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了 chroot 環境,若是不指定默認使用跟路徑。

若是你有兩套 Kafka 集羣,假設分別叫它們 kafka1 和 kafka2,那麼兩套集羣的 zookeeper.connect參數能夠這樣指定: zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2
  • log.dirs

Kafka 把全部的消息都保存到磁盤上,存放這些日誌片斷的目錄是經過 log.dirs 來制定的,它是用一組逗號來分割的本地系統路徑,log.dirs 是沒有默認值的,你必須手動指定他的默認值。其實還有一個參數是 log.dir,如你所知,這個配置是沒有 s 的,默認狀況下只用配置 log.dirs 就行了,好比你能夠經過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個參數的值。

  • num.recovery.threads.per.data.dir

對於以下3種狀況,Kafka 會使用可配置的線程池來處理日誌片斷。

服務器正常啓動,用於打開每一個分區的日誌片斷;

服務器崩潰後重啓,用於檢查和截斷每一個分區的日誌片斷;

服務器正常關閉,用於關閉日誌片斷。

默認狀況下,每一個日誌目錄只使用一個線程。由於這些線程只是在服務器啓動和關閉時會用到,因此徹底能夠設置大量的線程來達到井行操做的目的。特別是對於包含大量分區的服務器來講,一旦發生崩憤,在進行恢復時使用井行操做可能會省下數小時的時間。設置此參數時須要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,若是 num.recovery.threads.per.data.dir 被設爲 8,而且 log.dir 指定了 3 個路徑,那麼總共須要 24 個線程。

  • auto.create.topics.enable

默認狀況下,kafka 會使用三種方式來自動建立主題,下面是三種狀況:

當一個生產者開始往主題寫入消息時

當一個消費者開始從主題讀取消息時

當任意一個客戶端向主題發送元數據請求時

auto.create.topics.enable參數我建議最好設置成 false,即不容許自動建立 Topic。在咱們的線上環境裏面有不少名字稀奇古怪的 Topic,我想大概都是由於該參數被設置成了 true 的緣故。

主題默認配置

Kafka 爲新建立的主題提供了不少默認配置參數,下面就來一塊兒認識一下這些參數

  • num.partitions

num.partitions 參數指定了新建立的主題須要包含多少個分區。若是啓用了主題自動建立功能(該功能是默認啓用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。要注意,咱們能夠增長主題分區的個數,但不能減小分區的個數。

  • default.replication.factor

這個參數比較簡單,它表示 kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務default.replication.factor 的默認值爲1,這個參數在你啓用了主題自動建立功能後有效。

  • log.retention.ms

Kafka 一般根據時間來決定數據能夠保留多久。默認使用 log.retention.hours 參數來配置時間,默認是 168 個小時,也就是一週。除此以外,還有兩個參數 log.retention.minutes 和 log.retentiion.ms 。這三個參數做用是同樣的,都是決定消息多久之後被刪除,推薦使用 log.retention.ms。

  • log.retention.bytes

另外一種保留消息的方式是判斷消息是否過時。它的值經過參數 log.retention.bytes 來指定,做用在每個分區上。也就是說,若是有一個包含 8 個分區的主題,而且 log.retention.bytes 被設置爲 1GB,那麼這個主題最多能夠保留 8GB 數據。因此,當主題的分區個數增長時,整個主題能夠保留的數據也隨之增長。

  • log.segment.bytes

上述的日誌都是做用在日誌片斷上,而不是做用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日誌片斷上,當日志片斷大小到達 log.segment.bytes 指定上限(默認爲 1GB)時,當前日誌片斷就會被關閉,一個新的日誌片斷被打開。若是一個日誌片斷被關閉,就開始等待過時。這個參數的值越小,就越會頻繁的關閉和分配新文件,從而下降磁盤寫入的總體效率。

  • log.segment.ms

上面提到日誌片斷經關閉後需等待過時,那麼 log.segment.ms 這個參數就是指定日誌多長時間被關閉的參數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片斷會在大小或時間到達上限時被關閉,就看哪一個條件先獲得知足。

  • message.max.bytes

broker 經過設置 message.max.bytes 參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,若是生產者嘗試發送的消息超過這個大小,不只消息不會被接收,還會收到 broker 返回的錯誤消息。跟其餘與字節相關的配置參數同樣,該參數指的是壓縮後的消息大小,也就是說,只要壓縮後的消息小於 mesage.max.bytes,那麼消息的實際大小能夠大於這個值

這個值對性能有顯著的影響。值越大,那麼負責處理網絡鏈接和請求的線程就須要花越多的時間來處理這些請求。它還會增長磁盤寫入塊的大小,從而影響 IO 吞吐量。

  • retention.ms

規定了該主題消息被保存的時常,默認是7天,即該主題只能保存7天的消息,一旦設置了這個值,它會覆蓋掉 Broker 端的全局參數值。

  • retention.bytes

retention.bytes:規定了要爲該 Topic 預留多大的磁盤空間。和全局參數做用類似,這個值一般在多租戶的 Kafka 集羣中會有用武之地。當前默認值是 -1,表示能夠無限使用磁盤空間。

JVM 參數配置

JDK 版本通常推薦直接使用 JDK1.8,這個版本也是如今中國大部分程序員的首選版本。

說到 JVM 端設置,就繞不開這個話題,業界最推崇的一種設置方式就是直接將 JVM 堆大小設置爲 6GB,這樣會避免不少 Bug 出現。

JVM 端配置的另外一個重要參數就是垃圾回收器的設置,也就是平時常說的 GC 設置。若是你依然在使用 Java 7,那麼能夠根據如下法則選擇合適的垃圾回收器:

  • 若是 Broker 所在機器的 CPU 資源很是充裕,建議使用 CMS 收集器。啓用方法是指定-XX:+UseCurrentMarkSweepGC
  • 不然,使用吞吐量收集器。開啓方法是指定-XX:+UseParallelGC

固然了,若是你已經在使用 Java 8 了,那麼就用默認的 G1 收集器就行了。在沒有任何調優的狀況下,G1 表現得要比 CMS 出色,主要體如今更少的 Full GC,須要調整的參數更少等,因此使用 G1 就行了。

通常 G1 的調整隻須要這兩個參數便可

  • MaxGCPauseMillis

該參數指定每次垃圾回收默認的停頓時間。該值不是固定的,G1能夠根據須要使用更長的時間。它的默認值是 200ms,也就是說,每一輪垃圾回收大概須要200 ms 的時間。

  • InitiatingHeapOccupancyPercent

該參數指定了 G1 啓動新一輪垃圾回收以前可使用的堆內存百分比,默認值是45,這就代表G1在堆使用率到達45以前不會啓用垃圾回收。這個百分比包括新生代和老年代。

Kafka Producer

在 Kafka 中,咱們把產生消息的那一方稱爲生產者,好比咱們常常回去淘寶購物,你打開淘寶的那一刻,你的登錄信息,登錄次數都會做爲消息傳輸到 Kafka 後臺,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會做爲一個個消息傳遞給 Kafka 後臺,而後淘寶會根據你的愛好作智能推薦,導致你的錢包歷來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程序的呢?發送過程是怎麼樣的呢?

儘管消息的產生很是簡單,可是消息的發送過程仍是比較複雜的,如圖

咱們從建立一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它表明了一組 Kafka 須要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。

在發送 ProducerRecord 時,咱們須要將鍵值對對象由序列化器轉換爲字節數組,這樣它們纔可以在網絡上傳輸。而後消息到達了分區器。

若是發送過程當中指定了有效的分區號,那麼在發送記錄時將使用該分區。若是發送過程當中未指定分區,則將使用key 的 hash 函數映射指定一個分區。若是發送的過程當中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區後,生產者就知道向哪一個主題和分區發送數據了。

ProducerRecord 還有關聯的時間戳,若是用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間做爲時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

  • 若是將主題配置爲使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 若是將主題配置爲使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日誌中時,將由 broker 重寫。

而後,這條消息被存放在一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到消息時會返回一個響應,若是寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量,上面兩種的時間戳類型也會返回給用戶。若是寫入失敗,會返回一個錯誤。生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗的話,就返回錯誤消息。

建立 Kafka 生產者

要向 Kafka 寫入消息,首先須要建立一個生產者對象,並設置一些屬性。Kafka 生產者有3個必選的屬性

  • bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式爲 host:port。清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找到其餘的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然可以鏈接到集羣上。

  • key.serializer

broker 須要接收到序列化以後的 key/value 值,因此生產者發送的消息須要通過序列化以後才傳遞給 Kafka Broker。生產者須要知道採用何種方式把 Java 對象轉換爲字節數組。key.serializer 必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化爲字節數組。這裏拓展一下 Serializer 類

Serializer 是一個接口,它表示類將會採用何種方式序列化,它的做用是把對象轉換爲字節,實現了 Serializer 接口的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其餘的序列化器還有不少,你能夠經過 這裏 查看其餘序列化器。要注意的一點:key.serializer 是必需要設置的,即便你打算只發送值的內容

  • value.serializer

與 key.serializer 同樣,value.serializer 指定的類會將值序列化。

下面代碼演示瞭如何建立一個 Kafka 生產者,這裏只指定了必要的屬性,其餘使用默認的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段代碼

  • 首先建立了一個 Properties 對象
  • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
  • 在這裏咱們建立了一個新的生產者對象,併爲鍵值設置了恰當的類型,而後把 Properties 對象傳遞給他。

Kafka 消息發送

實例化生產者對象後,接下來就能夠開始發送消息了,發送消息主要由下面幾種方式

簡單消息發送

Kafka 最簡單的消息發送以下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代碼中生產者(producer)的 send() 方法須要把 ProducerRecord 的對象做爲參數進行發送,ProducerRecord 有不少構造函數,這個咱們下面討論,這裏調用的是

public ProducerRecord(String topic, K key, V value) {}

這個構造函數,須要傳遞的是 topic主題,key 和 value。

把對應的參數傳遞完成後,生產者調用 send() 方法發送消息(ProducerRecord對象)。咱們能夠從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,而後分批次發送給 Kafka Broker。

發送成功後,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata 類型,咱們上面這段代碼沒有考慮返回值,因此沒有生成對應的 Future 對象,因此沒有辦法知道消息是否發送成功。若是不是很重要的信息或者對結果不會產生影響的信息,可使用這種方式進行發送。

咱們能夠忽略發送消息時可能發生的錯誤或者在服務器端可能發生的錯誤,但在消息發送以前,生產者還可能發生其餘的異常。這些異常有多是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

同步發送消息

第二種消息發送機制以下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,而後再調用 get() 方法等待 Kafka 響應。若是服務器返回錯誤,get() 方法會拋出異常,若是沒有發生錯誤,咱們會獲得 RecordMetadata 對象,能夠用它來查看消息記錄。

生產者(KafkaProducer)在發送的過程當中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤能夠經過重發消息來解決。好比鏈接的錯誤,能夠經過再次創建鏈接來解決;無錯誤則能夠經過從新爲分區選舉首領來解決。KafkaProducer 被配置爲自動重試,若是屢次重試後仍沒法解決問題,則會拋出重試異常。另外一類錯誤是沒法經過重試來解決的,好比消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

異步發送消息

同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會形成許多消息沒法直接發送,形成消息滯後,沒法發揮效益最大化。

好比消息在應用程序和 Kafka 集羣之間一個來回須要 10ms。若是發送完每一個消息後都等待響應的話,那麼發送100個消息須要 1 秒,可是若是是異步方式的話,發送 100 條消息所須要的時間就會少不少不少。大多數時候,雖然Kafka 會返回 RecordMetadata 消息,可是咱們並不須要等待響應。

爲了在異步發送消息的同時可以對異常狀況進行處理,生產者提供了回掉支持。下面是回調的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回調須要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。若是 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空(non null)異常,這裏咱們只是簡單的把它打印出來,若是是生產環境須要更詳細的處理,而後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

生產者分區機制

Kafka 對於數據的讀寫是以分區爲粒度的,分區能夠分佈在多個主機(Broker)中,這樣每一個節點可以實現獨立的數據寫入和讀取,而且可以經過增長新的節點來增長 Kafka 集羣的吞吐量,經過分區部署在多個 Broker 來實現負載均衡的效果。

上面咱們介紹了生產者的發送方式有三種:無論結果如何直接發送發送並返回結果發送並回調。因爲消息是存在主題(topic)的分區(partition)中的,因此當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪一個分區中呢?

這其實就設計到 Kafka 的分區機制了。

分區策略

Kafka 的分區策略指的就是將生產者發送到哪一個分區的算法。Kafka 爲咱們提供了默認的分區策略,同時它也支持你自定義分區策略。

若是要自定義分區策略的話,你須要顯示配置生產者端的參數 Partitioner.class,咱們能夠看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個參數: topic,表示須要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化事後的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化後的值數組;cluster表示當前集羣的原數據。Kafka 給你這麼多信息,就是但願讓你可以充分地利用這些信息對消息進行分區,計算出它要被髮送到哪一個分區中。
  • close() : 繼承了 Closeable 接口可以實現 close() 方法,在分區關閉時調用。
  • onNewBatch(): 表示通知分區程序用來建立新的批次

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪詢

順序分配,消息是均勻的分配給每一個 partition,即每一個分區存儲一次消息。就像下面這樣

image.png

上圖表示的就是輪詢策略,輪訓策略是 Kafka Producer 提供的默認策略,若是你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪詢

隨機輪詢簡而言之就是隨機的向 partition 中保存消息,以下圖所示

實現隨機分配的代碼只須要兩行,以下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,而後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,因此若是追求數據的均勻分佈,仍是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改成輪詢了。

按照 key 進行消息保存

這個策略也叫作 key-ordering 策略,Kafka 中每條消息都會有本身的key,一旦消息被定義了 Key,那麼你就能夠保證同一個 Key 的全部消息都進入到相同的分區裏面,因爲每一個分區下的消息處理都是有順序的,故這個策略被稱爲按消息鍵保序策略,以下圖所示

實現這個策略的 partition 方法一樣簡單,只須要下面兩行代碼便可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分區策略都是比較基礎的策略,除此以外,你還能夠自定義分區策略。

生產者壓縮機制

壓縮一詞簡單來說就是一種互換思想,它是一種經典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,但願以較小的 CPU 開銷帶來更少的磁盤佔用或更少的網絡 I/O 傳輸。若是你還不瞭解的話我但願你先讀完這篇文章 程序員須要瞭解的硬核知識之壓縮算法,而後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的消息分爲兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項纔是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 一般不會直接操做具體的一條條消息,它老是在消息集合這個層面上進行寫入操做。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,爲何啓用壓縮?說白了就是消息太大,須要變小一點 來使消息發的更快一些。

Kafka Producer 中使用 compression.type 來開啓壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼代表該 Producer 的壓縮算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息後併發送給服務器後,由 Consumer 消費者進行解壓縮,由於採用的何種壓縮算法是隨着 key、value 一塊兒發送過去的,因此消費者知道採用何種壓縮算法。

Kafka 重要參數配置

在上一篇文章 帶你漲姿式的認識一下kafka中,咱們主要介紹了一下 kafka 集羣搭建的參數,本篇文章咱們來介紹一下 Kafka 生產者重要的配置,生產者有不少可配置的參數,在文檔裏(http://kafka.apache.org/docum...)都有說明,咱們介紹幾個在內存使用、性能和可靠性方面對生產者影響比較大的參數進行說明

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認爲消息是寫入成功的。此參數對消息丟失的影響較大

  • 若是 acks = 0,就表示生產者也不知道本身產生的消息是否被服務器接收了,它才知道它寫成功了。若是發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,由於沒有返回任何消息。這就相似於 UDP 的運輸層協議,只管發,服務器接受不接受它也不關心。
  • 若是 acks = 1,只要集羣的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。若是發送途中形成了網絡異常或者 Leader 還沒選舉出來等其餘狀況致使消息寫入失敗,生產者會受到錯誤消息,這時候生產者每每會再次重發數據。由於消息的發送也分爲 同步異步,Kafka 爲了保證消息的高效傳輸會決定是同步發送仍是異步發送。若是讓客戶端等待服務器的響應(經過調用 Future 中的 get() 方法),顯然會增長延遲,若是客戶端使用回調,就會解決這個問題。
  • 若是 acks = all,這種狀況下是隻有當全部參與複製的節點都收到消息時,生產者纔會接收到一個來自服務器的消息。不過,它的延遲比 acks =1 時更高,由於咱們要等待不僅一個服務器節點接收消息。

buffer.memory

此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

compression.type

此參數來表示生產者啓用何種壓縮算法,默認狀況下,消息發送時不會被壓縮。該參數能夠設置爲 snappy、gzip 和 lz4,它指定了消息發送給 broker 以前使用哪種壓縮算法進行壓縮。下面是各壓縮算法的對比

retries

生產者從服務器收到的錯誤有多是臨時性的錯誤(好比分區找不到首領),在這種狀況下,reteis 參數的值決定了生產者能夠重發的消息次數,若是達到這個次數,生產者會放棄重試並返回錯誤。默認狀況下,生產者在每次重試之間等待 100ms,這個等待參數能夠經過 retry.backoff.ms 進行修改。

batch.size

當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。當批次被填滿,批次裏的全部消息會被髮送出去。不過生產者井不必定都會等到批次被填滿才發送,任意條數的消息均可能被髮送。

client.id

此參數能夠是任意的字符串,服務器會用它來識別消息的來源,通常配置在日誌裏

max.in.flight.requests.per.connection

此參數指定了生產者在收到服務器響應以前能夠發送多少消息,它的值越高,就會佔用越多的內存,不過也會提升吞吐量。把它設爲1 能夠保證消息是按照發送的順序寫入服務器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(好比目標分區的首領是誰)時等待服務器返回響應的時間。若是等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配----若是在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它能夠指能發送的單個消息的最大值,也能夠指單個請求裏全部消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,爲了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。若是它們被設置爲 -1,就使用操做系統的默認值。若是生產者或消費者與 broker 處於不一樣的數據中心,那麼能夠適當增大這些值。

Kafka Consumer

應用程序使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的消息,而後再把他們保存起來。應用程序首先須要建立一個 KafkaConsumer 對象,訂閱主題並開始接受消息,驗證消息並保存結果。一段時間後,生產者往主題寫入的速度超過了應用程序驗證數據的速度,這時候該如何處理?若是隻使用單個消費者的話,應用程序會跟不上消息生成的速度,就像多個生產者像相同的主題寫入消息同樣,這時候就須要多個消費者共同參與消費主題中的消息,對消息進行分流處理。

Kafka 消費者從屬於消費者羣組。一個羣組中的消費者訂閱的都是相同的主題,每一個消費者接收主題一部分分區的消息。下面是一個 Kafka 分區消費示意圖

image.png

上圖中的主題 T1 有四個分區,分別是分區0、分區一、分區二、分區3,咱們建立一個消費者羣組1,消費者羣組中只有一個消費者,它訂閱主題T1,接收到 T1 中的所有消息。因爲一個消費者處理四個生產者發送到分區的消息,壓力有些大,須要幫手來幫忙分擔任務,因而就演變爲下圖

image.png

這樣一來,消費者的消費能力就大大提升了,可是在某些環境下好比用戶產生消息特別多的時候,生產者產生的消息仍舊讓消費者吃不消,那就繼續增長消費者。

image.png

如上圖所示,每一個分區所產生的消息可以被每一個消費者羣組中的消費者消費,若是向消費者羣組中增長更多的消費者,那麼多餘的消費者將會閒置,以下圖所示

image.png

向羣組中增長消費者是橫向伸縮消費能力的主要方式。總而言之,咱們能夠經過增長消費組的消費者來進行水平擴展提高消費能力。這也是爲何建議建立主題時使用比較多的分區數,這樣能夠在消費負載高的狀況下增長消費者來提高性能。另外,消費者的數量不該該比分區數多,由於多出來的消費者是空閒的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,能夠支持任意多的應用讀取這個消息。換句話說,每一個應用均可以讀到全量的消息。爲了使得每一個應用都能讀到全量消息,應用須要有不一樣的消費組。對於上面的例子,假如咱們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變爲下圖這樣

image.png

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來講它們屬於不一樣的應用。

總結起來就是若是應用須要讀取全量消息,那麼請爲該應用設置一個消費組;若是該應用消費能力不足,那麼能夠考慮在這個消費組裏增長消費者

消費者組和分區重平衡

消費者組是什麼

消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的羣組,具備可擴展性和可容錯性的一種機制。消費者組內的消費者共享一個消費者組ID,這個ID 也叫作 Group ID,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區的消息,多餘的消費者會閒置,派不上用場。

咱們在上面提到了兩種消費方式

  • 一個消費者羣組消費一個主題中的消息,這種消費模式又稱爲點對點的消費方式,點對點的消費方式又被稱爲消息隊列
  • 一個主題中的消息被多個消費者羣組共同消費,這種消費模式又稱爲發佈-訂閱模式

消費者重平衡

咱們從上面的消費者演變圖中能夠知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其所有分區的消息,後來有一個消費者加入羣組,隨後又有更多的消費者加入羣組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區的全部權經過一個消費者轉到其餘消費者的行爲稱爲重平衡,英文名也叫作 Rebalance 。以下圖所示

image.png

重平衡很是重要,它爲消費者羣組帶來了高可用性伸縮性,咱們能夠放心的添加消費者或移除消費者,不過在正常狀況下咱們並不但願發生這樣的行爲。在重平衡期間,消費者沒法讀取消息,形成整個消費者組在重平衡的期間都不可用。另外,當分區被從新分配給另外一個消費者時,消息當前的讀取狀態會丟失,它有可能還須要去刷新緩存,在它從新恢復狀態以前會拖慢應用程序。

消費者經過向組織協調者(Kafka Broker)發送心跳來維護本身是消費者組的一員並確認其擁有的分區。對於不一樣不的消費羣體來講,其組織協調者能夠是不一樣的。只要消費者按期發送心跳,就會認爲消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

若是過了一段時間 Kafka 中止發送心跳了,會話(Session)就會過時,組織協調者就會認爲這個 Consumer 已經死亡,就會觸發一次重平衡。若是消費者宕機而且中止發送消息,組織協調者會等待幾秒鐘,確認它死亡了纔會觸發重平衡。在這段時間裏,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開羣組,組織協調者會觸發一次重平衡,儘可能下降處理停頓。

重平衡是一把雙刃劍,它爲消費者羣組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到如今社區還沒法修改。

重平衡的過程對消費者組有極大的影響。由於每次重平衡過程當中都會致使萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深刻理解 Java 虛擬機》中 p76 關於 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其餘全部的工做線程。直到它收集結束。 Stop The World 這個名字聽起來很帥,但這項工做其實是由虛擬機在後臺自動發起並完成的,在用戶不可見的狀況下把用戶正常工做的線程所有停掉,這對不少應用來講都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會中止消費,等待重平衡的完成。並且重平衡這個過程很慢......

建立消費者

上面的理論說的有點多,下面就經過代碼來說解一下消費者是如何消費的

在讀取消息以前,須要先建立一個 KafkaConsumer 對象。建立 KafkaConsumer 對象與建立 KafkaProducer 對象十分類似 --- 把須要傳遞給消費者的屬性放在 properties 對象中,後面咱們會着重討論 Kafka 的一些配置,這裏咱們先簡單的建立一下,使用3個屬性就足矣,分別是 bootstrap.serverkey.deserializervalue.deserializer

這三個屬性咱們已經用過不少次了,若是你還不是很清楚的話,能夠參考 帶你漲姿式是認識一下Kafka Producer

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪一個消費者羣組。建立不屬於任何一個羣組的消費者也是能夠的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

建立好消費者以後,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表做爲參數,使用起來比較簡單

consumer.subscribe(Collections.singletonList("customerTopic"));

爲了簡單咱們只訂閱了一個主題 customerTopic,參數傳入的是一個正則表達式,正則表達式能夠匹配多個主題,若是有人建立了新的主題,而且主題的名字與正則表達式相匹配,那麼會當即觸發一次重平衡,消費者就能夠讀取新的主題。

要訂閱全部與 test 相關的主題,能夠這樣作

consumer.subscribe("test.*");

輪詢

咱們知道,Kafka 是支持訂閱/發佈模式的,生產者發送數據給 Kafka Broker,那麼消費者是如何知道生產者發送了數據呢?其實生產者產生的數據消費者是不知道的,KafkaConsumer 採用輪詢的方式按期去 Kafka Broker 中進行數據的檢索,若是有數據就用來消費,若是沒有就再繼續輪詢等待,下面是輪詢等待的具體實現

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}
  • 這是一個無限循環。消費者其實是一個長期運行的應用程序,它經過輪詢的方式向 Kafka 請求數據。
  • 第三行代碼很是重要,Kafka 必須按期循環請求數據,不然就會認爲該 Consumer 已經掛了,會觸發重平衡,它的分區會移交給羣組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,若是該參數被設置爲 0 ,poll() 方法會馬上返回,不然就會在指定的毫秒數內一直等待 broker 返回數據。
  • poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區的信息、記錄在分區中的偏移量,以及記錄的鍵值對。咱們通常會遍歷這個列表,逐條處理每條記錄。
  • 在退出應用程序以前使用 close() 方法關閉消費者。網絡鏈接和 socket 也會隨之關閉,並當即觸發一次重平衡,而不是等待羣組協調器發現它再也不發送心跳並認定它已經死亡。
線程安全性

在同一個羣組中,咱們沒法讓一個線程運行多個消費者,也沒法讓多個線程安全的共享一個消費者。按照規則,一個消費者使用一個線程,若是一個消費者羣組中多個消費者都想要運行的話,那麼必須讓每一個消費者在本身的線程中運行,可使用 Java 中的 ExecutorService 啓動多個消費者進行進行處理。

消費者配置

到目前爲止,咱們學習瞭如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了全部與消費者相關的配置說明。大部分參數都有合理的默認值,通常不須要修改它們,下面咱們就來介紹一下這些參數。

  • fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。broker 在收到消費者的數據請求時,若是可用的數據量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣能夠下降消費者和 broker 的工做負載,由於它們在主題使用頻率不是很高的時候就不用來回處理消息。若是沒有不少可用數據,但消費者的 CPU 使用率很高,那麼就須要把該屬性的值設得比默認值大。若是消費者的數量比較多,把該屬性的值調大能夠下降 broker 的工做負載。

  • fetch.max.wait.ms

咱們經過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時纔會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,默認是 500 毫秒。若是沒有足夠的數據流入 kafka 的話,消費者獲取的最小數據量要求就得不到知足,最終致使 500 毫秒的延遲。若是要下降潛在的延遲,就能夠把參數值設置的小一些。若是 fetch.max.wait.ms 被設置爲 100 毫秒的延遲,而 fetch.min.bytes 的值設置爲 1MB,那麼 Kafka 在收到消費者請求後,要麼返回 1MB 的數據,要麼在 100 ms 後返回全部可用的數據。就看哪一個條件首先被知足。

  • max.partition.fetch.bytes

該屬性指定了服務器從每一個分區裏返回給消費者的最大字節數。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每一個分區裏返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節。若是一個主題有20個分區和5個消費者,那麼每一個消費者須要至少4 MB的可用內存來接收記錄。在爲消費者分配內存時,能夠給它們多分配一些,由於若是羣組裏有消費者發生崩潰,剩下的消費者須要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 可以接收的最大消息的字節數(經過 max.message.size 屬性配置大),不然消費者可能沒法讀取這些消息,致使消費者一直掛起重試。 在設置該屬性時,另一個考量的因素是消費者處理數據的時間。消費者須要頻繁的調用 poll() 方法來避免會話過時和發生分區再平衡,若是單次調用poll() 返回的數據太多,消費者須要更多的時間進行處理,可能沒法及時進行下一個輪詢來避免會話過時。若是出現這種狀況,能夠把 max.partition.fetch.bytes 值改小,或者延長會話過時時間。

  • session.timeout.ms

這個屬性指定了消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間,默認是 3s。若是消費者沒有在 session.timeout.ms 指定的時間內發送心跳給羣組協調器,就會被認定爲死亡,協調器就會觸發重平衡。把它的分區分配給消費者羣組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向羣組協調器發送心跳的頻率,session.timeout.ms 則指定了消費者能夠多久不發送心跳。因此,這兩個屬性通常須要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,通常是 session.timeout.ms 的三分之一。若是 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設置的比默認值小,能夠更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能致使非預期的重平衡。把該屬性的值設置得大一些,能夠減小意外的重平衡,不過檢測節點崩潰須要更長的時間。

  • auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據。另外一個值是 earliest,意思指的是在偏移量無效的狀況下,消費者將從起始位置處開始讀取分區的記錄。

  • enable.auto.commit

咱們稍後將介紹幾種不一樣的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,爲了儘可能避免出現重複數據和數據丟失,能夠把它設置爲 false,由本身控制什麼時候提交偏移量。若是把它設置爲 true,還能夠經過 auto.commit.interval.ms 屬性來控制提交的頻率

  • partition.assignment.strategy

咱們知道,分區會分配給羣組中的消費者。PartitionAssignor 會根據給定的消費者和主題,決定哪些分區應該被分配給哪一個消費者,Kafka 有兩個默認的分配策略RangeRoundRobin

  • client.id

該屬性能夠是任意字符串,broker 用他來標識從客戶端發送過來的消息,一般被用在日誌、度量指標和配額中

  • max.poll.records

該屬性用於控制單次調用 call() 方法可以返回的記錄數量,能夠幫你控制在輪詢中須要處理的數據量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩衝區也能夠設置大小。若是它們被設置爲 -1,就使用操做系統默認值。若是生產者或消費者與 broker 處於不一樣的數據中心內,能夠適當增大這些值,由於跨數據中心的網絡通常都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

咱們上面提到,消費者在每次調用poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 可是尚未被消費者消費的記錄,所以咱們能夠追蹤到哪些記錄是被羣組裏的哪一個消費者讀取的。消費者可使用 Kafka 來追蹤消息在分區中的位置(偏移量)

消費者會向一個叫作 _consumer_offset 的特殊主題中發送消息,這個主題會保存每次所發送消息中的分區偏移量,這個主題的主要做用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題發送消息,正常狀況下不觸發重平衡,這個主題是不起做用的,當觸發重平衡後,消費者中止工做,每一個消費者可能會分到對應的分區,這個主題就是讓消費者可以繼續處理消息所設置的。

若是提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的消息就會被重複處理

image.png

若是提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的消息將會丟失

image.png

既然_consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面咱們就來講一下####提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。若是 enable.auto.commit 被設置爲true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者裏的其餘東西同樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,若是是,那麼就會提交從上一次輪詢中返回的偏移量。

提交當前偏移量

auto.commit.offset 設置爲 false,可讓應用程序決定什麼時候提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後立刻返回,若是提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,若是處理完全部記錄後要確保調用了 commitSync(),不然仍是會有丟失消息的風險,若是發生了在均衡,從最近一批消息到發生在均衡之間的全部消息都將被重複處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於異步提交不會進行重試,同步提交會一致進行重試。

同步和異步組合提交

通常狀況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,由於若是提交失敗是由於臨時問題致使的,那麼後續的提交總會有成功的。可是若是在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。

所以,在消費者關閉以前通常會組合使用commitAsync和commitSync提交偏移量

提交特定的偏移量

消費者API容許調用 commitSync() 和 commitAsync() 方法時傳入但願提交的 partition 和 offset 的 map,即提交特定的偏移量。

下面爲本身作個宣傳,歡迎關注公衆號 Java建設者,號主是Java技術棧,熱愛技術,喜歡閱讀,熱衷於分享和總結,但願能把每一篇好文章分享給成長道路上的你。
關注公衆號回覆 002 領取爲你特地準備的大禮包,你必定會喜歡並收藏的。

文章參考:

Kafka史上最詳細原理總結

《Kafka 權威指南》

https://kafka.apache.org/

http://kafka.apache.org/docum...

https://www.tutorialkart.com/...

https://dzone.com/articles/wh...

《極客時間 - Kafka 核心技術與實戰》

相關文章
相關標籤/搜索