最近和一些同窗交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及做用等問題都能詳細做答。可是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就爲你們來剖析一下Kafka的冪等性原理及實現。node
Producer在生產發送消息時,不免會重複發送消息。Producer進行retry時會產生重試機制,發生消息重複發送。而引入冪等性後,重複發送只會生成一條有效的消息。Kafka做爲分佈式消息系統,它的使用場景常見與分佈式系統中,好比消息推送系統、業務平臺系統(如物流平臺、銀行結算平臺等)。以銀行結算平臺來講,業務方做爲上游把數據上報到銀行結算平臺,若是一份數據被計算、處理屢次,那麼產生的影響會很嚴重。面試
在使用Kafka時,須要確保Exactly-Once語義。分佈式系統中,一些不可控因素有不少,好比網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時致使Ack超時,Producer會進行重複發送。可能出現的狀況以下:數據庫
Kafka爲了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?apache
Kafka在引入冪等性以前,Producer向Broker發送消息,而後Broker將消息追加到消息流中後給Producer返回Ack信號值。實現流程以下:緩存
上圖的實現流程是一種理想狀態下的消息發送狀況,可是實際狀況中,會出現各類不肯定的因素,好比在Producer在發送給Broker的時候出現網絡異常。好比如下這種異常狀況的出現:網絡
上圖這種狀況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,可是在返回Ack信號給Producer時失敗了(好比網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)從新發送給Broker,Broker接收到消息後,再次將該消息追加到消息流中,而後成功返回Ack信號給Producer。這樣下來,消息流中就被重複追加了兩條相同的(x2,y2)的消息。架構
面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複發送消息的問題的呢?下面咱們能夠先來看看流程圖:分佈式
一樣,這是一種理想狀態下的發送流程。實際狀況下,會有不少不肯定的因素,好比Broker在發送Ack信號給Producer時出現網絡異常,致使發送失敗。異常狀況以下圖所示:ide
當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常致使Producer接收Ack信號失敗。對於Producer來講,會觸發重試機制,將消息(x2,y2)再次發送,可是,因爲引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而以前Broker緩存過以前發送的相同的消息,那麼在消息流中的消息就只有一條(x2,y2),不會出現重複發送的狀況。debug
客戶端在生成Producer時,會實例化以下代碼:
// 實例化一個Producer對象 Producer<String, String> producer = new KafkaProducer<>(props);
在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼以下:
private void maybeWaitForPid() { if (transactionState == null) return; while (!transactionState.hasPid()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node); if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { InitPidResponse initPidResponse = (InitPidResponse) response.responseBody(); transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch()); } else { log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node); } } else { log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again."); } } catch (Exception e) { log.warn("Received an exception while trying to get a pid. Will back off and retry.", e); } log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } }
與冪等性有關的另一個特性就是事務。Kafka中的事務與數據庫的事務相似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操做在一個事務中,即原子性操做。對應的結果是同時成功或者同時失敗。
這裏須要與數據庫中事務進行區別,操做數據庫中的事務指一系列的增刪查改,對Kafka來講,操做事務是指一系列的生產和消費等原子性操做。
在事務屬性引入以前,先引入Producer的冪等性,它的做用爲:
產生的場景有:
好比,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets爲100(假設最近一次Commit的Offsets爲50),那麼執行觸發Balance時,其餘Consumer就會重複消費消息(消費的Offsets介於50~100之間的消息)。
Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口以下:
// 初始化事務,須要注意確保transation.id屬性被分配 void initTransactions(); // 開啓事務 void beginTransaction() throws ProducerFencedException; // 爲Consumer提供的在事務內Commit Offsets的操做 void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 提交事務 void commitTransaction() throws ProducerFencedException; // 放棄事務,相似於回滾事務的操做 void abortTransaction() throws ProducerFencedException;
在Kafka事務中,一個原子性操做,根據操做類型能夠分爲3種狀況。狀況以下:
Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重複的問題上很是重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性相似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。