詳解Kafka Producer

上一篇文章咱們主要介紹了什麼是 Kafka,Kafka 的基本概念是什麼,Kafka 單機和集羣版的搭建,以及對基本的配置文件進行了大體的介紹,還對 Kafka 的幾個主要角色進行了描述,咱們知道,無論是把 Kafka 用做消息隊列、消息總線仍是數據存儲平臺來使用,最終是繞不過消息這個詞的,這也是 Kafka 最最核心的內容,Kafka 的消息從哪裏來?到哪裏去?都幹什麼了?彆着急,一步一步來,先說說 Kafka 的消息從哪來。html

生產者概述

在 Kafka 中,咱們把產生消息的那一方稱爲生產者,好比咱們常常回去淘寶購物,你打開淘寶的那一刻,你的登錄信息,登錄次數都會做爲消息傳輸到 Kafka 後臺,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會做爲一個個消息傳遞給 Kafka 後臺,而後淘寶會根據你的愛好作智能推薦,導致你的錢包歷來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程序的呢?發送過程是怎麼樣的呢?java

儘管消息的產生很是簡單,可是消息的發送過程仍是比較複雜的,如圖git

image.png

咱們從建立一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它表明了一組 Kafka 須要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。程序員

在發送 ProducerRecord 時,咱們須要將鍵值對對象由序列化器轉換爲字節數組,這樣它們纔可以在網絡上傳輸。而後消息到達了分區器。github

若是發送過程當中指定了有效的分區號,那麼在發送記錄時將使用該分區。若是發送過程當中未指定分區,則將使用key 的 hash 函數映射指定一個分區。若是發送的過程當中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區後,生產者就知道向哪一個主題和分區發送數據了。算法

ProducerRecord 還有關聯的時間戳,若是用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間做爲時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。apache

  • 若是將主題配置爲使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 若是將主題配置爲使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日誌中時,將由 broker 重寫。

而後,這條消息被存放在一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。bootstrap

Kafka Broker 在收到消息時會返回一個響應,若是寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量,上面兩種的時間戳類型也會返回給用戶。若是寫入失敗,會返回一個錯誤。生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗的話,就返回錯誤消息。數組

建立 Kafka 生產者

要往 Kafka 寫入消息,首先須要建立一個生產者對象,並設置一些屬性。Kafka 生產者有3個必選的屬性服務器

  • bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式爲 host:port。清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找到其餘的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然可以鏈接到集羣上。

  • key.serializer

broker 須要接收到序列化以後的 key/value 值,因此生產者發送的消息須要通過序列化以後才傳遞給 Kafka Broker。生產者須要知道採用何種方式把 Java 對象轉換爲字節數組。key.serializer 必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化爲字節數組。這裏拓展一下 Serializer 類

Serializer 是一個接口,它表示類將會採用何種方式序列化,它的做用是把對象轉換爲字節,實現了 Serializer 接口的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其餘的序列化器還有不少,你能夠經過 這裏 查看其餘序列化器。要注意的一點:key.serializer 是必需要設置的,即便你打算只發送值的內容

  • value.serializer

與 key.serializer 同樣,value.serializer 指定的類會將值序列化。

下面代碼演示瞭如何建立一個 Kafka 生產者,這裏只指定了必要的屬性,其餘使用默認的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段代碼

  • 首先建立了一個 Properties 對象
  • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
  • 在這裏咱們建立了一個新的生產者對象,併爲鍵值設置了恰當的類型,而後把 Properties 對象傳遞給他。

實例化生產者對象後,接下來就能夠開始發送消息了,發送消息主要由下面幾種方式

直接發送,不考慮結果

使用這種發送方式,不會關心消息是否到達,會丟失一些消息,由於 Kafka 是高可用的,生產者會自動嘗試重發,這種發送方式和 UDP 運輸層協議很類似。

同步發送

同步發送仍然使用 send() 方法發送消息,它會返回一個 Future 對象,調用 get() 方法進行等待,就能夠知道消息時候否發送成功。

異步發送

異步發送指的是咱們調用 send() 方法,並制定一個回調函數,服務器在返回響應時調用該函數。

下一節咱們會從新討論這三種實現。

向 Kafka 發送消息

簡單消息發送

Kafka 最簡單的消息發送以下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代碼中生產者(producer)的 send() 方法須要把 ProducerRecord 的對象做爲參數進行發送,ProducerRecord 有不少構造函數,這個咱們下面討論,這裏調用的是

public ProducerRecord(String topic, K key, V value) {}

這個構造函數,須要傳遞的是 topic主題,key 和 value。

把對應的參數傳遞完成後,生產者調用 send() 方法發送消息(ProducerRecord對象)。咱們能夠從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,而後分批次發送給 Kafka Broker。

image.png

發送成功後,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata 類型,咱們上面這段代碼沒有考慮返回值,因此沒有生成對應的 Future 對象,因此沒有辦法知道消息是否發送成功。若是不是很重要的信息或者對結果不會產生影響的信息,可使用這種方式進行發送。

咱們能夠忽略發送消息時可能發生的錯誤或者在服務器端可能發生的錯誤,但在消息發送以前,生產者還可能發生其餘的異常。這些異常有多是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

同步發送消息

第二種消息發送機制以下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,而後再調用 get() 方法等待 Kafka 響應。若是服務器返回錯誤,get() 方法會拋出異常,若是沒有發生錯誤,咱們會獲得 RecordMetadata 對象,能夠用它來查看消息記錄。

生產者(KafkaProducer)在發送的過程當中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤能夠經過重發消息來解決。好比鏈接的錯誤,能夠經過再次創建鏈接來解決;無錯誤則能夠經過從新爲分區選舉首領來解決。KafkaProducer 被配置爲自動重試,若是屢次重試後仍沒法解決問題,則會拋出重試異常。另外一類錯誤是沒法經過重試來解決的,好比消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

異步發送消息

同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會形成許多消息沒法直接發送,形成消息滯後,沒法發揮效益最大化。

好比消息在應用程序和 Kafka 集羣之間一個來回須要 10ms。若是發送完每一個消息後都等待響應的話,那麼發送100個消息須要 1 秒,可是若是是異步方式的話,發送 100 條消息所須要的時間就會少不少不少。大多數時候,雖然Kafka 會返回 RecordMetadata 消息,可是咱們並不須要等待響應。

爲了在異步發送消息的同時可以對異常狀況進行處理,生產者提供了回掉支持。下面是回調的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回調須要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。若是 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空(non null)異常,這裏咱們只是簡單的把它打印出來,若是是生產環境須要更詳細的處理,而後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

生產者分區機制

Kafka 對於數據的讀寫是以分區爲粒度的,分區能夠分佈在多個主機(Broker)中,這樣每一個節點可以實現獨立的數據寫入和讀取,而且可以經過增長新的節點來增長 Kafka 集羣的吞吐量,經過分區部署在多個 Broker 來實現負載均衡的效果。

上面咱們介紹了生產者的發送方式有三種:無論結果如何直接發送發送並返回結果發送並回調。因爲消息是存在主題(topic)的分區(partition)中的,因此當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪一個分區中呢?

這其實就設計到 Kafka 的分區機制了。

分區策略

Kafka 的分區策略指的就是將生產者發送到哪一個分區的算法。Kafka 爲咱們提供了默認的分區策略,同時它也支持你自定義分區策略。

若是要自定義分區策略的話,你須要顯示配置生產者端的參數 Partitioner.class,咱們能夠看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個參數: topic,表示須要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化事後的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化後的值數組;cluster表示當前集羣的原數據。Kafka 給你這麼多信息,就是但願讓你可以充分地利用這些信息對消息進行分區,計算出它要被髮送到哪一個分區中。
  • close() : 繼承了 Closeable 接口可以實現 close() 方法,在分區關閉時調用。
  • onNewBatch(): 表示通知分區程序用來建立新的批次

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪訓

順序分配,消息是均勻的分配給每一個 partition,即每一個分區存儲一次消息。就像下面這樣

image.png

上圖表示的就是輪訓策略,輪訓策略是 Kafka Producer 提供的默認策略,若是你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪訓

隨機輪訓簡而言之就是隨機的向 partition 中保存消息,以下圖所示

image.png

實現隨機分配的代碼只須要兩行,以下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,而後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,因此若是追求數據的均勻分佈,仍是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改成輪詢了。

按照 key 進行消息保存

這個策略也叫作 key-ordering 策略,Kafka 中每條消息都會有本身的key,一旦消息被定義了 Key,那麼你就能夠保證同一個 Key 的全部消息都進入到相同的分區裏面,因爲每一個分區下的消息處理都是有順序的,故這個策略被稱爲按消息鍵保序策略,以下圖所示

image.png

實現這個策略的 partition 方法一樣簡單,只須要下面兩行代碼便可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分區策略都是比較基礎的策略,除此以外,你還能夠自定義分區策略。

生產者壓縮機制

壓縮一詞簡單來說就是一種互換思想,它是一種經典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,但願以較小的 CPU 開銷帶來更少的磁盤佔用或更少的網絡 I/O 傳輸。若是你還不瞭解的話我但願你先讀完這篇文章 程序員須要瞭解的硬核知識之壓縮算法,而後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的消息分爲兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項纔是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 一般不會直接操做具體的一條條消息,它老是在消息集合這個層面上進行寫入操做。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,爲何啓用壓縮?說白了就是消息太大,須要變小一點 來使消息發的更快一些。

Kafka Producer 中使用 compression.type 來開啓壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼代表該 Producer 的壓縮算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息後併發送給服務器後,由 Consumer 消費者進行解壓縮,由於採用的何種壓縮算法是隨着 key、value 一塊兒發送過去的,因此消費者知道採用何種壓縮算法。

Kafka 重要參數配置

在上一篇文章 帶你漲姿式的認識一下kafka中,咱們主要介紹了一下 kafka 集羣搭建的參數,本篇文章咱們來介紹一下 Kafka 生產者重要的配置,生產者有不少可配置的參數,在文檔裏(http://kafka.apache.org/docum...)都有說明,咱們介紹幾個在內存使用、性能和可靠性方面對生產者影響比較大的參數進行說明

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認爲消息是寫入成功的。此參數對消息丟失的影響較大

  • 若是 acks = 0,就表示生產者也不知道本身產生的消息是否被服務器接收了,它才知道它寫成功了。若是發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,由於沒有返回任何消息。這就相似於 UDP 的運輸層協議,只管發,服務器接受不接受它也不關心。
  • 若是 acks = 1,只要集羣的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。若是發送途中形成了網絡異常或者 Leader 還沒選舉出來等其餘狀況致使消息寫入失敗,生產者會受到錯誤消息,這時候生產者每每會再次重發數據。由於消息的發送也分爲 同步異步,Kafka 爲了保證消息的高效傳輸會決定是同步發送仍是異步發送。若是讓客戶端等待服務器的響應(經過調用 Future 中的 get() 方法),顯然會增長延遲,若是客戶端使用回調,就會解決這個問題。
  • 若是 acks = all,這種狀況下是隻有當全部參與複製的節點都收到消息時,生產者纔會接收到一個來自服務器的消息。不過,它的延遲比 acks =1 時更高,由於咱們要等待不僅一個服務器節點接收消息。

buffer.memory

此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

compression.type

此參數來表示生產者啓用何種壓縮算法,默認狀況下,消息發送時不會被壓縮。該參數能夠設置爲 snappy、gzip 和 lz4,它指定了消息發送給 broker 以前使用哪種壓縮算法進行壓縮。下面是各壓縮算法的對比

image.png

image.png

retries

生產者從服務器收到的錯誤有多是臨時性的錯誤(好比分區找不到首領),在這種狀況下,reteis 參數的值決定了生產者能夠重發的消息次數,若是達到這個次數,生產者會放棄重試並返回錯誤。默認狀況下,生產者在每次重試之間等待 100ms,這個等待參數能夠經過 retry.backoff.ms 進行修改。

batch.size

當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。當批次被填滿,批次裏的全部消息會被髮送出去。不過生產者井不必定都會等到批次被填滿才發送,任意條數的消息均可能被髮送。

client.id

此參數能夠是任意的字符串,服務器會用它來識別消息的來源,通常配置在日誌裏

max.in.flight.requests.per.connection

此參數指定了生產者在收到服務器響應以前能夠發送多少消息,它的值越高,就會佔用越多的內存,不過也會提升吞吐量。把它設爲1 能夠保證消息是按照發送的順序寫入服務器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(好比目標分區的首領是誰)時等待服務器返回響應的時間。若是等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配----若是在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它能夠指能發送的單個消息的最大值,也能夠指單個請求裏全部消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,爲了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。若是它們被設置爲 -1,就使用操做系統的默認值。若是生產者或消費者與 broker 處於不一樣的數據中心,那麼能夠適當增大這些值。

文章參考:

《Kafka 權威指南》

極客時間 -《Kafka 核心技術與實戰》

Kafka官方文檔

Kafka 源碼

Squeezing the firehose: getting the most from Kafka compression

https://github.com/facebook/zstd

相關文章
相關標籤/搜索