Kafka消息過長詳解

Kafka發送消息大小問題

⚠️ 本文實驗的Kafka版本爲2.11版本.java

消息概述

kafka中的消息指的就是一條ProducerRecord,裏面除了攜帶發送的數據以外,還包含:apache

  • topic 發往的Topic
  • partition 發往的分區
  • headers 頭信息
  • key 數據
  • value 數據
  • timestamp-long 時間戳

Producer生產消息過長

在生產者發送消息的時候,並非上面全部的信息都算在發送的消息大小.詳情見下面代碼.
序列化value - KafkaProducer.doSend()方法數組

上面的代碼會將value序列化成字節數組,參與序列化的有topic,headers,key. 用來驗證value是否超出長度的是ensureValidRecordSize(serializedSize);方法.併發

ensureValidRecordSize從兩個方面驗證,一個是maxRequestSize(max.request.size),另外一個是totalMemorySize(buffer.memory), 只有當value的長度同時小於時,消息才能夠正常發送.less

private void ensureValidRecordSize(int size) {
    if (size > this.maxRequestSize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than the maximum request size you have configured with the " +
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                " configuration.");
    if (size > this.totalMemorySize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                ProducerConfig.BUFFER_MEMORY_CONFIG +
                " configuration.");
}

單條消息過長或產生以下錯誤.
單條消息過長異步

這裏有個注意的點,若是隻是單純的發送消息,沒有用Callback進行監控或者用Future進行得到結果,在消息過長的狀況下,不會主動發出提示,ide

使用Future接收結果

Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"));
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);

Future類中get()方法, @throws ExecutionException 若是計算拋出異常,該方法將會拋出該異常.fetch

/**
 * Waits if necessary for the computation to complete, and then
 * retrieves its result.
 *
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 */
V get() throws InterruptedException, ExecutionException;

使用Callback進行監控

先看Kafka專門爲回調寫的接口.this

// 英文註釋省略,總的來講: 用於異步回調,當消息發送server已經被確認以後,就會調用該方法
// 該方法中的確定有一個參數不爲null,若是沒有異常產生,則metadata有數據,若是有異常則相反
public void onCompletion(RecordMetadata metadata, Exception exception);
kafkaProducer.send(new ProducerRecord<>("topic", "key", "value"), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        }
    }
});

日誌Level=DEBUG

將日誌的消息級別設置爲DEBUG,也會給標準輸出輸出該警告信息.spa

Future和Callback總結

經過上面兩種比較,不難發現Future是Java併發標準庫中,並非專門爲kafka而設計,須要顯示捕獲異常,而Callback接口是kafka提供標準回調措施,因此應儘量採用後者.

服務端接收消息限制

在生產者有一個限制消息的參數,而在服務端也有限制消息的參數,該參數就是
message.max.bytes,默認爲1000012B (大約1MB),服務端能夠接收不到1MB的數據.(在新客戶端producer,消息老是通過分批group into batch的數據,詳情見RecordBatch接口).

/**
 * A record batch is a container for records. In old versions of the record format (versions 0 and 1),
 * a batch consisted always of a single record if no compression was enabled, but could contain
 * many records otherwise. Newer versions (magic versions 2 and above) will generally contain many records
 * regardless of compression.
 * 在舊版本不開啓消息壓縮的狀況下,一個batch只包含一條數據
 * 在新版本中老是會包含多條消息,不會去考慮消息是否壓縮
 */
public interface RecordBatch extends Iterable<Record>{
    ...
}

設置Broker端接收消息大小

修改broker端的能夠接收的消息大小,須要在broker端server.properties文件中添加message.max.bytes=100000. 數值能夠修改爲本身想要的,單位是byte.

生產端消息大於broker會發生什麼

若是生產者設置的消息發送大小爲1MB,而broker端設置的消息大小爲512KB會發生什麼?
答案就是broker會拒絕該消息,生產者會返回一個RecordTooLargeException. 該消息是不會被消費者消費.提示的信息爲: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

消費者消息的限制

消費者也會進行消息限制,這裏介紹有關三個限制消費的參數

  • fetch.max.bytes 服務端消息合集(多條)能返回的大小
  • fetch.min.bytes 服務端最小返回消息的大小
  • fetch.max.wait.ms 最多等待時間

若是fetch.max.wait.ms設置的時間到達,即便能夠返回的消息總大小沒有知足fetch.min.bytes設置的值,也會進行返回.

fetch.max.bytes設置太小

若是fetch.max.bytes設置太小會發生什麼? 會是不知足條件的數據一條都不返回嗎? 咱們能夠根據文檔來查看一下.

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress.

英文的大意就是: fetch.max.bytes 表示服務端能返回消息的總大小. 消息是經過分批次返回給消費者. 若是在分區中的第一個消息批次大於這個值,那麼該消息批次依然會返回給消費者,保證流程運行.

能夠得出結論: 消費端的參數只會影響消息讀取的大小.

實踐fetch.max.bytes設置太小

properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024);
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1);
...
while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
    System.out.println(records.count());
}

啓動消費者,添加上面三個參數. 指定消息批次最小最大返回的大小以及容許抓取最長的等待時間. 最後將返回的消息總數輸出到標準輸出.

實驗結果: 由於每次發送的消息都要大於1024B,因此消費者每一個批次只能返回一條數據. 最終會輸出1...

相關文章
相關標籤/搜索