Kafka冪等性原理及實現剖析

1.概述

最近和一些同窗交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及做用等問題都能詳細做答。可是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就爲你們來剖析一下Kafka的冪等性原理及實現。node

2.內容

2.1 Kafka爲啥須要冪等性?

Producer在生產發送消息時,不免會重複發送消息。Producer進行retry時會產生重試機制,發生消息重複發送。而引入冪等性後,重複發送只會生成一條有效的消息。Kafka做爲分佈式消息系統,它的使用場景常見與分佈式系統中,好比消息推送系統、業務平臺系統(如物流平臺、銀行結算平臺等)。以銀行結算平臺來講,業務方做爲上游把數據上報到銀行結算平臺,若是一份數據被計算、處理屢次,那麼產生的影響會很嚴重。面試

2.2 影響Kafka冪等性的因素有哪些?

在使用Kafka時,須要確保Exactly-Once語義。分佈式系統中,一些不可控因素有不少,好比網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時致使Ack超時,Producer會進行重複發送。可能出現的狀況以下:數據庫

Kafka冪等性原理及實現剖析

2.3 Kafka的冪等性是如何實現的?

Kafka爲了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?apache

  • ProducerID:在每一個新的Producer初始化時,會被分配一個惟一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
  • SequenceNumber:對於每一個ProducerID,Producer發送數據的每一個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。

2.3.1 冪等性引入以前的問題?

Kafka在引入冪等性以前,Producer向Broker發送消息,而後Broker將消息追加到消息流中後給Producer返回Ack信號值。實現流程以下:緩存

Kafka冪等性原理及實現剖析

上圖的實現流程是一種理想狀態下的消息發送狀況,可是實際狀況中,會出現各類不肯定的因素,好比在Producer在發送給Broker的時候出現網絡異常。好比如下這種異常狀況的出現:網絡

Kafka冪等性原理及實現剖析

上圖這種狀況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,可是在返回Ack信號給Producer時失敗了(好比網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)從新發送給Broker,Broker接收到消息後,再次將該消息追加到消息流中,而後成功返回Ack信號給Producer。這樣下來,消息流中就被重複追加了兩條相同的(x2,y2)的消息。架構

2.3.2 冪等性引入以後解決了什麼問題?

面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複發送消息的問題的呢?下面咱們能夠先來看看流程圖:分佈式

Kafka冪等性原理及實現剖析

 一樣,這是一種理想狀態下的發送流程。實際狀況下,會有不少不肯定的因素,好比Broker在發送Ack信號給Producer時出現網絡異常,致使發送失敗。異常狀況以下圖所示:ide

Kafka冪等性原理及實現剖析

 當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常致使Producer接收Ack信號失敗。對於Producer來講,會觸發重試機制,將消息(x2,y2)再次發送,可是,因爲引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而以前Broker緩存過以前發送的相同的消息,那麼在消息流中的消息就只有一條(x2,y2),不會出現重複發送的狀況。debug

2.3.3 ProducerID是如何生成的?

客戶端在生成Producer時,會實例化以下代碼:

// 實例化一個Producer對象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼以下:

private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

3.事務

與冪等性有關的另一個特性就是事務。Kafka中的事務與數據庫的事務相似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操做在一個事務中,即原子性操做。對應的結果是同時成功或者同時失敗。

這裏須要與數據庫中事務進行區別,操做數據庫中的事務指一系列的增刪查改,對Kafka來講,操做事務是指一系列的生產和消費等原子性操做。

3.1 Kafka引入事務的用途?

在事務屬性引入以前,先引入Producer的冪等性,它的做用爲:

  • Producer屢次發送消息能夠封裝成一個原子性操做,即同時成功,或者同時失敗;
  • 消費者&生產者模式下,由於Consumer在Commit Offsets出現問題時,致使重複消費消息時,Producer重複生產消息。須要將這個模式下Consumer的Commit Offsets操做和Producer一系列生產消息的操做封裝成一個原子性操做。

產生的場景有:

好比,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets爲100(假設最近一次Commit的Offsets爲50),那麼執行觸發Balance時,其餘Consumer就會重複消費消息(消費的Offsets介於50~100之間的消息)。

3.2 事務提供了哪些可以使用的API?

Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口以下:

// 初始化事務,須要注意確保transation.id屬性被分配
void initTransactions();

// 開啓事務
void beginTransaction() throws ProducerFencedException;

// 爲Consumer提供的在事務內Commit Offsets的操做
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事務
void commitTransaction() throws ProducerFencedException;

// 放棄事務,相似於回滾事務的操做
void abortTransaction() throws ProducerFencedException;

3.3 事務的實際應用場景有哪些?

在Kafka事務中,一個原子性操做,根據操做類型能夠分爲3種狀況。狀況以下:

  • 只有Producer生產消息,這種場景須要事務的介入;
  • 消費消息和生產消息並存,好比Consumer&Producer模式,這種場景是通常Kafka項目中比較常見的模式,須要事務介入;
  • 只有Consumer消費消息,這種操做在實際項目中意義不大,和手動Commit Offsets的結果同樣,並且這種場景不是事務的引入目的。

4.總結

Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重複的問題上很是重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性相似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。

相關文章
相關標籤/搜索