Kafka消費者的使用和原理

繼上週的《Kafka生產者的使用和原理》,這周咱們學習下消費者,仍然仍是先從一個消費者的Hello World學起:apache

public class Consumer {

    public static void main(String[] args) {
        // 1. 配置參數
        Properties properties = new Properties();
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "group.demo");
        // 2. 根據參數建立KafkaConsumer實例(消費者)
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 3. 訂閱主題
        consumer.subscribe(Collections.singletonList("topic-demo"));
        try {
            // 4. 輪循消費
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
            }
        } finally {
            // 5. 關閉消費者
            consumer.close();
        }
    }

}

前兩步和生產者相似,配置參數而後根據參數建立實例,區別在於消費者使用的是反序列化器,以及多了一個必填參數group.id,用於指定消費者所屬的消費組。關於消費組的概念在《圖解Kafka中的基本概念》中介紹過了,消費組使得消費者的消費能力可橫向擴展,此次再介紹一個新的概念「再均衡」,其意思是將分區的所屬權進行從新分配,發生於消費者中有新的消費者加入或者有消費者宕機的時候。咱們先了解再均衡的概念,至於如何再均衡不在此深究。bootstrap

咱們繼續看上面的代碼,第3步,subscribe訂閱指望消費的主題,而後進入第4步,輪循調用poll方法從Kafka服務器拉取消息。給poll方法中傳遞了一個Duration對象,指定poll方法的超時時長,即當緩存區中沒有可消費數據時的阻塞時長,避免輪循過於頻繁。poll方法返回的是一個ConsumerRecords對象,其內部對多個分區的ConsumerRecored進行了封裝,其結構以下:緩存

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {

    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
    // ...

}

而ConsumerRecord則相似ProducerRecord,封裝了消息的相關屬性:安全

public class ConsumerRecord<K, V> {
    private final String topic;  // 主題
    private final int partition;  // 分區號
    private final long offset;  // 偏移量
    private final long timestamp;  // 時間戳
    private final TimestampType timestampType;  // 時間戳類型
    private final int serializedKeySize;  // key序列化後的大小
    private final int serializedValueSize;  // value序列化後的大小
    private final Headers headers;  // 消息頭部
    private final K key;  // 鍵
    private final V value;  // 值
    private final Optional<Integer> leaderEpoch;  // leader的週期號

相比ProdercerRecord的屬性更多,其中重點講下偏移量,偏移量是分區中一條消息的惟一標識。消費者在每次調用poll方法時,則是根據偏移量去分區拉取相應的消息。而當一臺消費者宕機時,會發生再均衡,將其負責的分區交給其餘消費者處理,這時能夠根據偏移量去繼續從宕機前消費的位置開始。
Kafka消費者的使用和原理服務器

而爲了應對消費者宕機狀況,偏移量被設計成不存儲在消費者的內存中,而是被持久化到一個Kafka的內部主題__consumer_offsets中,在Kafka中,將偏移量存儲的操做稱做提交。而消息者在每次消費消息時都將會將偏移量進行提交,提交的偏移量爲下次消費的位置,例如本次消費的偏移量爲x,則提交的Kafka消費者的使用和原理是x+1。網絡

在代碼中咱們並無看到顯示的提交代碼,那麼Kafka的默認提交方式是什麼?默認狀況下,消費者會按期以auto_commit_interval_ms(5秒)的頻率進行一次自動提交,而提交的動做發生於poll方法裏,在進行拉取操做前會先檢查是否能夠進行偏移量提交,若是能夠,則會提交即將拉取的偏移量。多線程

下面咱們看下這樣一個場景,上次提交的偏移量爲2,而當前消費者已經處理了二、三、4號消息,正準備提交5,但卻宕機了。當發生再均衡時,其餘消費者將繼續從已提交的2開始消費,因而發生了重複消費的現象。
Kafka消費者的使用和原理異步

咱們能夠經過減少自動提交的時間間隔來減少重複消費的窗口大小,但這樣仍然沒法避免重複消費的發生。ide

按照線性程序的思惟,因爲自動提交是延遲提交,即在處理完消息以後進行提交,因此應該不會出現消息丟失的現象,也就是已提交的偏移量會大於正在處理的偏移量。但放在多線程環境中,消息丟失的現象是可能發生的。例如線程A負責調用poll方法拉取消息並放入一個隊列中,由線程B負責處理消息。若是線程A已經提交了偏移量5,而線程B還未處理完二、三、4號消息,這時候發生宕機,則將丟失消息。
Kafka消費者的使用和原理學習

從上述場景的描述,咱們能夠知道自動提交是存在風險的。因此Kafka除了自動提交,還提供了手動提交的方式,能夠細分爲同步提交和異步提交,分別對應了KafkaConsumer中的commitSync和commitAsync方法。咱們先嚐試使用同步提交修改程序:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    consumer.commitSync();;
}

在處理完一批消息後,都會提交偏移量,這樣能減少重複消費的窗口大小,可是因爲是同步提交,因此程序會阻塞等待提交成功後再繼續處理下一條消息,這樣會限制程序的吞吐量。那咱們改成使用異步提交:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    consumer.commitAsync();;
}

異步提交時,程序將不會阻塞,但異步提交在提交失敗時也不會進行重試,因此提交是否成功是沒法保證的。所以咱們能夠組合使用兩種提交方式。在輪循中使用異步提交,而當關閉消費者時,再經過同步提交來保證提交成功。

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
        consumer.commitAsync();
    }
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

上述介紹的兩種無參的提交方式都是提交的poll返回的一個批次的數據。若將來得及提交,也會形成重複消費,若是還想更進一步減小重複消費,能夠在for循環中爲commitAsync和commitSync傳入分區和偏移量,進行更細粒度的提交,例如每1000條消息咱們提交一次:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());

        // 偏移量加1
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                           new OffsetAndMetadata(record.offset() + 1));
        if (count % 1000 == 0) {
            consumer.commitAsync(currentOffsets, null);
        }
        count++;
    }
}

關於提交就介紹到這裏。在使用消費者的代理中,咱們能夠看到poll方法是其中最爲核心的方法,可以拉取到咱們須要消費的消息。因此接下來,咱們一塊兒深刻到消費者API的幕後,看看在poll方法中,都發生了什麼,其實現以下:

public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

在咱們使用設置超時時間的poll方法中,會調用重載方法,第二個參數includeMetadataInTimeout用於標識是否把元數據的獲取算在超時時間內,這裏傳值爲true,也就是算入超時時間內。下面再看重載的poll方法的實現:

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    // 1. 獲取鎖並確保消費者沒有關閉
    acquireAndEnsureOpen();
    try {
        // 2.記錄poll開始
        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

        // 3.檢查是否有訂閱主題
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        do {
            // 4.安全的喚醒消費者
            client.maybeTriggerWakeup();

            // 5.更新偏移量(若是須要的話)
            if (includeMetadataInTimeout) {
                updateAssignmentMetadataIfNeeded(timer, false);
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                    log.warn("Still waiting for metadata");
                }
            }

            // 6.拉取消息
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // 7.若是拉取到了消息或者有未處理的請求,因爲用戶還須要處理未處理的消息
                // 因此會再次發起拉取消息的請求(異步),提升效率
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }

                // 8.調用消費者攔截器處理
                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        // 9.釋放鎖
        release();
        // 10.記錄poll結束
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}

咱們對上面的代碼逐步分析,首先是第1步
acquireAndEnsureOpen方法,獲取鎖並確保消費者沒有關閉,其實現以下:

private void acquireAndEnsureOpen() {
    acquire();
    if (this.closed) {
        release();
        throw new IllegalStateException("This consumer has already been closed.");
    }
}

其中acquire方法用於獲取鎖,爲何這裏會要上鎖。這是由於KafkaConsumer是線程不安全的,因此須要上鎖,確保只有一個線程使用KafkaConsumer拉取消息,其實現以下:

private static final long NO_CURRENT_THREAD = -1L;
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refcount = new AtomicInteger(0);

private void acquire() {
    long threadId = Thread.currentThread().getId();
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
}

用一個原子變量currentThread做爲鎖,經過cas操做獲取鎖,若是cas失敗,即獲取鎖失敗,表示發生了競爭,有多個線程在使用KafkaConsumer,則會拋出ConcurrentModificationException異常,若是cas成功,還會將refcount加一,用於重入。
再看第二、3步,記錄poll的開始以及檢查是否有訂閱主題。而後進入do-while循環,若是沒有拉取到消息,將在不超時的狀況下一直輪循。

第4步,安全的喚醒消費者,並非喚醒,而是檢查是否有喚醒的風險,若是程序在執行不可中斷的方法或是收到中斷請求,會拋出異常,這裏我還不是很明白,先放一下。
第5步,更新偏移量,就是咱們在前文說的在進行拉取操做前會先檢查是否能夠進行偏移量提交。

第6步,pollForFetches方法拉取消息,其實現以下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
    Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // 1.若是消息已經有了,則當即返回
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }

    // 2.準備拉取請求
    fetcher.sendFetches();

    if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
        pollTimeout = retryBackoffMs;
    }

    Timer pollTimer = time.timer(pollTimeout);
    // 3.發送拉取請求
    client.poll(pollTimer, () -> {
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());
    // 3.返回消息
    return fetcher.fetchedRecords();
}

若是fetcher已經有消息了則當即返回,這裏和下面將要講的第7步對應。若是沒有消息則使用Fetcher準備拉取請求而後再經過ConsumerNetworkClient發送請求,最後返回消息。

爲啥消息會已經有了呢,咱們回到poll的第7步,若是拉取到了消息或者有未處理的請求,因爲用戶還須要處理未處理的消息,這時候可使用異步的方式發起下一次的拉取消息的請求,將數據提早拉取,減小網絡IO的等待時間,提升程序的效率。

第8步,調用消費者攔截器處理,就像KafkaProducer中有ProducerInterceptor,在KafkaConsumer中也有ConsumerInterceptor,用於處理返回的消息,處理完後,再返回給用戶。

第九、10步,釋放鎖和記錄poll結束,對應了第一、2步。
對KafkaConsumer的poll方法就分析到這裏。最後用一個思惟導圖回顧下文中較爲重要的知識點:
Kafka消費者的使用和原理

相關文章
相關標籤/搜索