Kafka produce flush 引發的性能分析

調用kafka producer發送數據時,發現延遲級別在10-200ms不等,與正常的kafka寫入速度不匹配,因而開始找問題~html

 

一.場景:

一批數據,須要遍歷每一個數據併發送數據細節的信息到kafka,下面是我原始代碼,每一個人發送後執行一次flush操做。java

val results = Array[DataObject](...)
results.foreach(data => {
    val info = new ProducerRecord[String, String](topic, message)
    producer.send(info)
})
kafka.flush()

服務器執行延遲在10-200ms不等apache

 

二.可能緣由分析:

1.send 函數形成阻塞

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }

查看源碼的send邏輯,一種有回調函數,一種沒有回調函數,因此這裏send是異步執行,不會形成堵塞,排除緩存

2.flush 函數形成阻塞

public void flush() {
        log.trace("Flushing accumulated records in producer.");
        this.accumulator.beginFlush();
        this.sender.wakeup();

        try {
            this.accumulator.awaitFlushCompletion();
        } catch (InterruptedException var2) {
            throw new InterruptException("Flush interrupted.", var2);
        }
    }

flush 這裏accumulator會調用await相關方法,查看官方API的解釋是:bash

flush()
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

調用此方法可以使全部緩衝記錄當即可用於發送(即便linger.ms大於0)並在與這些記錄關聯的請求完成時發生阻塞。 ok,找到問題服務器

 

三.Flush 原理

基於flush引發的延遲,首先看一下kafka生產的過程併發

Step1:異步調用send發送日誌,根據Properties的配置對kv進行序列化異步

Step2::根據k hash 獲得分區信息,追加到對應topic下的partition,這裏先會寫入到本地緩存區函數

Step3: 本地緩存寫入後,有獨立的線程傳送向producer發送ACKthis

1.分析:

flush 是將第二步寫到緩存區的數據強制推送發送,正常狀況下清空緩存區操做經過參數配置實現:

batch.size 離線緩存達到該size時執行一次flush

linger.ms 達到該時間間隔時,執行一次flush

調用flush時,會清空緩存區內存,調用 awaitFlushCompletion 時須要等待緩存區清空,這裏會形成線程的堵塞

public void awaitFlushCompletion() throws InterruptedException {
        try {
            Iterator i$ = this.incomplete.all().iterator();

            while(i$.hasNext()) {
                RecordBatch batch = (RecordBatch)i$.next();
                batch.produceFuture.await();
            }
        } finally {
            this.flushesInProgress.decrementAndGet();
        }

    }

awaitFlushCompletion 將當前緩存區數據構造迭代器循環發送,並在finally階段調整offset。

這裏我設置發送延遲時間爲1000ms

個人實際發送時間在1000ms之內,因此每次發送調用 flush 都會形成延遲,至關於手動調用頻繁的刷新緩存區,增長的IO等待的時間,違背了批處理減小IO的規則,因此形成kafka寫入時長增長,這裏取消flush,經過參數控制 producer 生產解決問題。

第一次時間長是由於初始化kafka服務端,和最一開始添加 flush 相比,時間消耗基本能夠忽略。

相關文章
相關標籤/搜索