【Kafka】《Kafka權威指南》——寫數據

無論是把 Kafka 做爲消息隊列、消息、總線仍是數據存儲平臺來使用 ,老是須要有一個能夠往 Kafka 寫入數據的生產者和一個能夠從 Kafka讀取數據的消費者,或者一個兼具兩種角 色的應用程序。java

例如,在一個信用卡事務處理系統裏,有一個客戶端應用程序,它多是一個在線商店, 每當有支付行爲發生時,它負責把事務發送到 Kafka上。另外一個應用程序根據規則引擎檢 查這個事務,決定是批准仍是拒絕。 批准或拒絕的響應消息被寫回 Kafka,而後發送給發起事務的在線商店。第三個應用程序從 Kafka上讀取事務和審覈狀態,把它們保存到數據 庫, 隨後分析師能夠對這些結果進行分析,或許還能借此改進規則引擎 。算法

開發者們可使用 Kafka 內置的客戶端 API開發 Kafka應用程序。數據庫

在這一章,咱們將從 Kafra生產者的設計和組件講起,學習如何使用 Kafka生產者。咱們將展現如何建立 KafkaProducer和 ProducerRecords對象、如何將記錄發送給 Kafka,以及如何處理從 Kafka 返回的錯誤,而後介紹用幹控制生產者行爲的重要配置選項,最後深刻 探討如何使用不一樣的分區方法和序列化器,以及如何自定義序列化器和分區器 。apache

在下一章,咱們將會介紹 Kafra的悄費者客戶端,以及如何從 Kafka讀取消息。bootstrap

生產者概覽

一個應用程序在不少狀況下須要往 Kafka 寫入消息 : 記錄用戶的活動(用於審計和分析 )、 記錄度量指標、保存日誌、消息、記錄智能家電的信息、與其餘應用程序進行異步通訊、 緩衝即將寫入到數據庫的數據,等等。數組

多樣的使用場景意味着多樣的需求:是否每一個消息都很重要?是否容許丟失 一 小部分消息?偶爾出現重複消息是否能夠接受?是否有嚴格的延遲和吞吐量要求?安全

在以前提到的信用卡事務處理系統裏,消息丟失或消息重複是不容許的,能夠接受的延遲最大爲 500ms,對吞吐量要求較高,咱們但願每秒鐘能夠處理一百萬個消息。服務器

保存網站的點擊信息是另 一種使用場景。在這個場景裏,容許丟失少許的消息或出現少許 的消息重複,延遲能夠高一些,只要不影響用戶體驗就行。換句話說,只要用戶點擊連接 後能夠立刻加載頁面,那麼咱們並不介意消息要在幾秒鐘以後才能到達 Kafka 服務器。 吞 吐量則取決於網站用戶使用網站的頻度。網絡

不一樣的使用場景對生產者 API 的使用和配置會有直接的影響。多線程

儘管生產者 API 使用起來很簡單 ,但消息的發送過程仍是有點複雜的。下圖展現 了向Kafka 發送消息的主要步驟。

Kafka 生產者組件圖

咱們從建立 一個 ProducerRecord 對象開始, ProducerRecord 對象須要包含目標主題和要發送的內容。咱們還能夠指定鍵或分區。在發送 ProducerRecord對象時,生產者要先把鍵和 值對象序列化成字節數組,這樣它們纔可以在網絡上傳輸 。

接下來,數據被傳給分區器。若是以前在 ProducerRecord對象裏指定了分區,那麼分區器就不會再作任何事情,直接把指定的分區返回。若是沒有指定分區 ,那麼分區器會根據 ProducerRecord對象的鍵來選擇一個分區 。選好分區之後 ,生產者就知道該往哪一個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。

服務器在收到這些消息時會返回一個響應。若是消息成功寫入 Kafka,就返回 一 個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量。若是寫入 失敗, 就會返回 一個錯誤 。生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗,就返回錯誤信息。

建立Kafka生產者

要往 Kafka寫入消息,首先要建立一個生產者對象,井設置一些屬性。

下面的代碼片斷展現瞭如何建立一個新的生產者,這裏只指定了必要的屬性,其餘使用默認設置。

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
 
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
 
producer = new KafkaProducer<String, String>(kafkaProps);

Kafka生產者有 3個必選的屬性

bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式爲 host:port。清單裏不須要包含全部的broker地址,生產者會從給定的 broker裏查找到其餘 broker的信息。不過建議至少要提供兩個 broker的信息, 一旦其中一個宕機,生產者仍然可以鏈接到集羣上。

key.serializer

broker但願接收到的消息的鍵和值都是字節數組。生產者接口容許使用參數化類型,所以能夠把 Java對象做爲鍵和值發送給 broker。這樣的代碼具備良好的可讀性,不過生產者須要知道如何把這些 Java對象轉換成字節數組。 key.serializer必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer接口的類,生產者會使用這個類把鍵對象序列化成字節數組。 Kafka 客戶端默認提供了ByteArraySerializer(這個只作不多的事情)、 StringSerializer和 IntegerSerializer,所以,若是你只使用常見的幾種 Java對象類型,那麼就不必實現本身的序列化器 。要注意, key.serializer是必須設置的,就算你打算只發送值內容。

value.serializer

與 key.serializer同樣, value.serializer指定的類會將值序列化。若是鍵和值都是字符串,可使用與 key.serializer 同樣的序列化器。若是鍵是整數類型而值是字符扇 , 那麼須要使用不一樣的序列化器。

發送消息主要有3種方式:

一、發送並忘記( fire-and-forget):咱們把消息發送給服務器,但井不關心它是否正常到達。大多數狀況下,消息會正常到達,由於 Kafka是高可用的,並且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些消息。

二、同步發送:咱們使用send()方怯發送消息, 它會返回一個Future對象,調用get()方法進行等待, 就能夠知道悄息是否發送成功。

三、異步發送:咱們調用 send() 方怯,並指定一個回調函數, 服務器在返回響應時調用該函數。

在下面的幾個例子中 , 咱們會介紹如何使用上述幾種方式來發送消息,以及如何處理可能 發生的異常狀況。

本章的全部例子都使用單線程,但其實生產者是可使用多線程來發送消息的。剛開始的 時候可使用單個消費者和單個線程。若是須要更高的吞吐量,能夠在生產者數量不變的 前提下增長線程數量。若是這樣作還不夠 , 能夠增長生產者數量。

發送消息到Kafka

最簡單的同步發送消息方式以下所示 :

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
  producer.send(record);
} catch(Exception e) {
  e.printStack();
}

生產者的 send() 方住將 ProducerRecord對象做爲參數,它須要目標主題的名字和要發送的鍵和值對象,它們都是字符串。鍵和值對象的類型必須與序列化器和生產者對象相匹配。

咱們使用生產者的 send() 方愈加送 ProducerRecord對象。從生產者的架構圖裏能夠看到,消息先是被放進緩衝區,而後使用單獨的線程發送到服務器端。 send() 方法會返回一個包含 RecordMetadata 的 Future對象,不過由於咱們會忽略返回值,因此沒法知道消息是否發送成功。若是不關心發送結果,那麼可使用這種發送方式。好比,記錄 Twitter 消息日誌,或記錄不過重要的應用程序日誌。

咱們能夠忽略發送消息時可能發生的錯誤或在服務器端可能發生的錯誤,但在發送消息以前,生產者仍是有可能發生其餘的異常。這些異常有多是 SerializationException (說明序列化消息失敗)、 BufferExhaustedException 或 TimeoutException (說明緩衝區已滿),又或者是 InterruptException (說明發送線程被中斷)。

同步發送消息

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
    producer.send(record).get();
} catch(Exception e) {
    e.printStack();
}

在這裏, producer.send() 方住先返回一個 Future對象,而後調用 Future對象的 get() 方法等待 Kafka 響應。若是服務器返回錯誤, get()方怯會拋出異常。若是沒有發生錯誤,咱們會獲得一個 RecordMetadata對象,能夠用它獲取消息的偏移量。若是在發送數據以前或者在發送過程當中發生了任何錯誤 ,好比 broker返回 了一個不容許重發消息的異常或者已經超過了重發的次數 ,那麼就會拋出異常。咱們只是簡單地把異常信息打印出來。

如何處理從Kafka生產者返回的錯誤

KafkaProducer通常會發生兩類錯誤。其中一類是可重試錯誤 ,這類錯誤能夠經過重發消息來解決。好比對於鏈接錯誤,能夠經過再次創建鏈接來解決,「無主(noleader)」 錯誤則可 以經過從新爲分區選舉首領來解決。 KafkaProducer能夠被配置成自動重試,若是在屢次重試後仍無能解決問題,應用程序會收到一個重試異常。另外一類錯誤無出經過重試解決 ,好比「消息太大」異常。對於這類錯誤, KafkaProducer不會進行任何重試,直接拋出異常。

異步發送消息

假設消息在應用程序和 Kafka集羣之間一個來回須要 10ms。若是在發送完每一個消息後都等待迴應,那麼發送 100個消息須要 1秒。但若是隻發送消息而不等待響應,那麼發送100個消息所須要的時間會少不少。大多數時候,咱們並不須要等待響應——儘管 Kafka 會把目標主題、分區信息和消息的偏移量發送回來,但對於發送端的應用程序來講不是必需的。不過在遇到消息發送失敗時,咱們須要拋出異常、記錄錯誤日誌,或者把消息寫入 「錯誤消息」文件以便往後分析。

爲了在異步發送消息的同時可以對異常狀況進行處理,生產者提供了回調支持 。下面是使用異步發送消息、回調的一個例子。

生產者的配置

到目前爲止 , 咱們只介紹了生產者的幾個必要配置參數——bootstrap.servers API 以及序列化器。

生產者還有不少可配置的參數,在 Kafka文檔裏都有說明,它們大部分都有合理的默認值 , 因此沒有必要去修改它們 。不過有幾個參數在內存使用、性能和可靠性方面對生產者影響比較大,接下來咱們會一一說明。

1. acks

acks 參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。

這個參數對消息丟失的可能性有重要影響。 該參數有以下選項。 • 若是 acks=0, 生產者在成功寫入悄息以前不會等待任何來自服務器的響應。也就是說, 若是當中出現了問題 , 致使服務器沒有收到消息,那麼生產者就無從得知,消息也就丟 失了。不過,由於生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大 速度發送消息,從而達到很高的吞吐量。

• 若是 acks=1,只要集羣的首領節點收到消息,生產者就會收到 一個來自服務器的成功 響應。若是消息無撞到達首領節點(好比首領節點崩憤,新的首領尚未被選舉出來), 生產者會收到一個錯誤響應,爲了不數據丟失,生產者會重發消息。不過,若是一個 沒有收到消息的節點成爲新首領,消息仍是會丟失。這個時候的吞吐量取決於使用的是 同步發送仍是異步發送。若是讓發送客戶端等待服務器的響應(經過調用 Future對象 的 get()方法),顯然會增長延遲(在網絡上傳輸一個來回的延遲)。若是客戶端使用異步回調,延遲問題就能夠獲得緩解,不過吞吐量仍是會受發送中消息數量的限制(好比,生 產者在收到服務器響應以前能夠發送多少個消息)。

• 若是 acks=all,只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。這種模式是最安全的,它能夠保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集羣仍然能夠運行(第 5 章將討論更多的細節)。不過,它的延遲比 acks=1時更高,由於咱們要等待不僅一個服務器節點接收消息。

2. buffer.memory

該參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。若是 應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。這個時候, send()方法調用要麼被阻塞,要麼拋出異常,取決於如何設置 block.on.buffe.full 參數 (在0.9.0.0版本里被替換成了max.block.ms,表示在拋出異常以前能夠阻塞一段時間)。

3. compression.type

默認狀況下,消息發送時不會被壓縮。該參數能夠設置爲 snappy、 gzip 或 lz4,它指定了消息被髮送給 broker以前使用哪種壓縮算法進行壓縮。 snappy 壓縮算怯由 Google巳發明, 它佔用較少 的 CPU,卻能提供較好的性能和至關可觀的壓縮比,若是比較關注性能和網絡帶寬,可使用這種算法。 gzip壓縮算法通常會佔用較多的 CPU,但會提供更高的壓縮比,因此若是網絡帶寬比較有限,可使用這種算法。使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是向 Kafka發送消息的瓶頸所在。

4. retries

生產者從服務器收到的錯誤有多是臨時性的錯誤(好比分區找不到首領)。在這種狀況下, retries參數的值決定了生產者能夠重發消息的次數,若是達到這個次數,生產者會放棄重試並返回錯誤。默認狀況下,生產者會在每次重試之間等待 1OOms,不過能夠經過 retries.backoff.ms 參數來改變這個時間間隔。建議在設置重試次數和重試時間間隔以前, 先測試一下恢復一個崩潰節點須要多少時間(好比全部分區選舉出首領須要多長時間), 讓總的重試時間比 Kafka集羣從崩潰中恢復的時間長,不然生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦怯經過重試來解決(好比「悄息太大」錯誤)。通常情 況下,由於生產者會自動進行重試,因此就不必在代碼邏輯裏處理那些可重試的錯誤。 你只須要處理那些不可重試的錯誤或重試次數超出上限的狀況。

5. batch.size

當有多個消息須要被髮送到同一個分區時,生產者會把它們放在放一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算(而不是消息個數)。當批次被填滿,批次裏的全部消息會被髮送出去。不過生產者井不必定都會等到批次被填滿才發送,半捕 的批次,甚至只包含一個消息的批次也有可能被髮送。因此就算把批次大小設置得很大, 也不會形成延遲,只是會佔用更多的內存而已。但若是設置得過小,由於生產者須要更頻繁地發送消息,會增長一些額外的開銷。

6. linger.ms

該參數指定了生產者在發送批次以前等待更多消息加入批次的時間。 KafkaProducer 會在批次填滿或 linger.ms達到上限時把批次發送出去。默認狀況下,只要有可用的線程, 生產者就會把消息發送出去,就算批次裏只有一個消息。把 linger.ms設置成比0大的數, 讓生產者在發送批次以前等待一下子,使更多的消息加入到這個批次 。雖然這樣會增長延遲,但也會提高吞吐量(由於一次性發送更多的消息,每一個消息的開銷就變小了)。

7. client.id

該參數能夠是任意的字符串,服務器會用它來識別消息的來源,還能夠用在日誌和配額指標裏。

8. max.in.flight.requests.per.connection

該參數指定了生產者在收到服務器晌應以前能夠發送多少個消息。它的值越高,就會佔用越多的內存,不過也會提高吞吐量。 把它設爲 1 能夠保證消息是按照發送的順序寫入服務器的,即便發生了重試。

9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms指定了生產者在發送數據時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元數據(好比目標分區的首領是誰)時等待服務器返回響應的時間。若是等待響應超時,那麼生產者要麼重試發送數據,要麼返回 一個錯誤 (拋出異常或執行回調)。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配一一若是在指定時間內沒有收到同步副本的確認,那麼 broker就會返回 一個錯誤 。

10. max.block.ms

該參數指定了在調用 send() 方法或使用 parttitionFor() 方能獲取元數據時生產者的阻塞 時間。當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方屈就會阻塞。在阻塞時間達到 max.block.ms時,生產者會拋出超時異常。

11 . max.request.size

該參數用於控制生產者發送的請求大小。它能夠指能發送的單個消息的最大值,也能夠指單個請求裏全部消息總的大小。例如,假設這個值爲 1MB,那麼能夠發送的單個最大消息爲 1MB,或者生產者能夠在單個請求裏發送一個批次,該批次包含了 1000個消息,每一個消息大小爲 1KB 。另外, broker對可接收的消息最大值也有本身的限制( message.max.bytes),因此兩邊的配置最好能夠匹配,避免生產者發送的消息被 broker拒絕 。

12. receive.buffer.bytes 和 send.buffer.bytes

這兩個參數分別指定了 TCP socket接收和發送數據包的緩衝區大小 。 若是它們被設爲 -1 , 就使用操做系統的默認值。若是生產者或消費者與 broker處於不一樣的數據中心,那麼能夠適當增大這些值,由於跨數據中心的網絡通常都有比較高的延遲和比較低的帶寬。

順序保證

Kafka能夠保證同一個分區裏的消息是有序的。也就是說,若是生產者按照必定的順序發送消息, broker就會按照這個順序把它們寫入分區,消費者也會按照一樣的順序讀取它們。在某些狀況下 , 順序是很是重要的。若是把retries 設爲非零整數,同時把 max.in.flight.requests.per.connection 設爲比 1大的數,那麼,若是第一個批次消息寫入失敗,而第二個批次寫入成功, broker會重試寫入第一個批次。若是此時第一個批次也寫入成功,那 麼兩個批次的順序就反過來了。

通常來講,若是某些場景要求消息是有序的,那麼消息是否寫入成功也是 很關鍵的,因此不建議把順序是很是重要的。若是把retries 設爲 0。能夠把 max.in.flight.requests.per.connection設爲 1,這樣在生產者嘗試發送第一批消息時,就不會有其餘的消息發送給 broker。不過這樣會嚴重影響生產者的吞吐量 ,因此只有在 對消息的順序有嚴格要求的狀況下才能這麼作。

相關文章
相關標籤/搜索