Kafka科普系列 | Kafka中的事務是什麼樣子的?

事務,對於你們來講可能並不陌生,好比數據庫事務、分佈式事務,那麼Kafka中的事務是什麼樣子的呢?數據庫

在說Kafka的事務以前,先要說一下Kafka中冪等的實現。冪等和事務是Kafka 0.11.0.0版本引入的兩個特性,以此來實現EOS(exactly once semantics,精確一次處理語義)。緩存

冪等,簡單地說就是對接口的屢次調用所產生的結果和調用一次是一致的。生產者在進行重試的時候有可能會重複寫入消息,而使用Kafka的冪等性功能以後就能夠避免這種狀況。bash

開啓冪等性功能的方式很簡單,只須要顯式地將生產者客戶端參數enable.idempotence設置爲true便可(這個參數的默認值爲false)。微信

Kafka是如何具體實現冪等的呢?Kafka爲此引入了producer id(如下簡稱PID)和序列號(sequence number)這兩個概念。每一個新的生產者實例在初始化的時候都會被分配一個PID,這個PID對用戶而言是徹底透明的。session

對於每一個PID,消息發送到的每個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將對應的序列號的值加1。分佈式

broker端會在內存中爲每一對維護一個序列號。對於收到的每一條消息,只有當它的序列號的值(SN_new)比broker端中維護的對應的序列號的值(SN_old)大1(即SN_new = SN_old + 1)時,broker纔會接收它。ide

若是SN_new< SN_old + 1,那麼說明消息被重複寫入,broker能夠直接將其丟棄。若是SN_new> SN_old + 1,那麼說明中間有數據還沒有寫入,出現了亂序,暗示可能有消息丟失,這個異常是一個嚴重的異常。spa

引入序列號來實現冪等也只是針對每一對而言的,也就是說,Kafka的冪等只能保證單個生產者會話(session)中單分區的冪等。冪等性不能跨多個分區運做,而事務能夠彌補這個缺陷。.net

事務能夠保證對多個分區寫入操做的原子性。操做的原子性是指多個操做要麼所有成功,要麼所有失敗,不存在部分紅功、部分失敗的可能。設計

爲了使用事務,應用程序必須提供惟一的transactionalId,這個transactionalId經過客戶端參數transactional.id來顯式設置。事務要求生產者開啓冪等特性,所以經過將transactional.id參數設置爲非空從而開啓事務特性的同時須要將enable.idempotence設置爲true(若是未顯式設置,則KafkaProducer默認會將它的值設置爲true),若是用戶顯式地將enable.idempotence設置爲false,則會報出ConfigException的異常。

transactionalId與PID一一對應,二者之間所不一樣的是transactionalId由用戶顯式設置,而PID是由Kafka內部分配的。

另外,爲了保證新的生產者啓動後具備相同transactionalId的舊生產者可以當即失效,每一個生產者經過transactionalId獲取PID的同時,還會獲取一個單調遞增的producer epoch。若是使用同一個transactionalId開啓兩個生產者,那麼前一個開啓的生產者會報錯。

從生產者的角度分析,經過事務,Kafka能夠保證跨生產者會話的消息冪等發送,以及跨生產者會話的事務恢復。

前者表示具備相同transactionalId的新生產者實例被建立且工做的時候,舊的且擁有相同transactionalId的生產者實例將再也不工做。

後者指當某個生產者實例宕機後,新的生產者實例能夠保證任何未完成的舊事務要麼被提交(Commit),要麼被停止(Abort),如此可使新的生產者實例從一個正常的狀態開始工做。

KafkaProducer提供了5個與事務相關的方法,詳細以下:

void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId)
        throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;
複製代碼

initTransactions()方法用來初始化事務;beginTransaction()方法用來開啓事務;sendOffsetsToTransaction()方法爲消費者提供在事務內的位移提交的操做;commitTransaction()方法用來提交事務;abortTransaction()方法用來停止事務,相似於事務回滾。

在消費端有一個參數isolation.level,與事務有着莫大的關聯,這個參數的默認值爲「read_uncommitted」,意思是說消費端應用能夠看到(消費到)未提交的事務,固然對於已提交的事務也是可見的。

這個參數還能夠設置爲「read_committed」,表示消費端應用不能夠看到還沒有提交的事務內的消息。

舉個例子,若是生產者開啓事務並向某個分區值發送3條消息msg一、msg2和msg3,在執行commitTransaction()或abortTransaction()方法前,設置爲「read_committed」的消費端應用是消費不到這些消息的,不過在KafkaConsumer內部會緩存這些消息,直到生產者執行commitTransaction()方法以後它才能將這些消息推送給消費端應用。反之,若是生產者執行了abortTransaction()方法,那麼KafkaConsumer會將這些緩存的消息丟棄而不推送給消費端應用。

在這裏插入圖片描述

日誌文件中除了普通的消息,還有一種消息專門用來標誌一個事務的結束,它就是控制消息(ControlBatch)。控制消息一共有兩種類型:COMMIT和ABORT,分別用來表徵事務已經成功提交或已經被成功停止。

RecordBatch中attributes字段的第6位用來標識當前消息是不是控制消息。若是是控制消息,那麼這一位會置爲1,不然會置爲0,如上圖所示。

attributes字段中的第5位用來標識當前消息是否處於事務中,若是是事務中的消息,那麼這一位置爲1,不然置爲0。因爲控制消息也處於事務中,因此attributes字段的第5位和第6位都被置爲1。

在這裏插入圖片描述
KafkaConsumer能夠經過這個控制消息來判斷對應的事務是被提交了仍是被停止了,而後結合參數isolation.level配置的隔離級別來決定是否將相應的消息返回給消費端應用,如上圖所示。注意ControlBatch對消費端應用不可見。

咱們在上一篇Kafka科普系列中還講過LSO——《Kafka科普系列 | 什麼是LSO》,它與Kafka的事務有着密切的聯繫,看着下圖,你回憶起來了嘛。

在這裏插入圖片描述


歡迎支持筆者小冊:《圖解Kafka之實戰指南》和《圖解Kafka之核心原理


歡迎支持筆者新做:《深刻理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公衆號:朱小廝的博客。

相關文章
相關標籤/搜索