client端常見的3鍾消息交付語義:api
在0.11.0.0版本以前,Kafka producer默認提供的是at least once語義。設想一下這個場景:如當producer向broker發送新消息後,分區leader副本所在的broker成功地將該消息寫入本地磁盤,而後發送響應給producer,此時假設網絡出現了故障致使響應沒有發送成功,那麼未接收到響應的producer會認爲消息請求失敗而從新發送,若網絡恢復以後,那麼同一條消息被寫入日誌兩次,在極端的條件下,同一消息可能會被髮送屢次。網絡
在Kafka0.11.0.0版本以後推出了冪等性producer和對事務的支持,完美解決了消息重複發送的問題。ide
冪等性producer是0.11.0.0版本用於實現EOS(Exactly-Once semantics)的第一個利器。若一個操做執行屢次的結果與只運行一次的結果是相同的,那麼咱們稱該操做爲冪等操做。ui
0.11.0.0版本引入的冪等性producer表示它的發送操做是冪等的,同一條消息被producer發送屢次,但在broker端這條消息只會被寫入日誌一次,若是要啓用冪等性producer以及獲取其提供的EOS語義,用戶須要顯示設置producer端參數enable.idempotence爲truethis
// 此時ack默認被設置爲-1(all)
props.put("enable.idempotence",true);
發送到broker端的每批消息都會被賦予一個序列號(sequence number)用於消息去重。kafka會把它們保存到底層日誌,這樣即便分區leader副本掛掉,新選出來的leader broker也能執行消息去重的工做。spa
kafka還會爲每一個producer實例分配一個producer id(PID),消息要被髮送到每一個分區都有對應的序列號值,它們老是從0開始單調增長,對於PID、分區、序列號三者的關係,能夠設想爲一個map,key就是(PID,分區),value就是序列號,即每對(PID,分區)都有一個特定的序列號(seqID),若是發送消息的seqID小於等於broker端保存的seqID,那麼broker會拒絕接收這一條消息。設計
在單會話冪等性中介紹,kafka經過引入pid和seq來實現單會話冪等性,但正是引入了pid,當應用重啓時,新的producer並無old producer的狀態數據。可能重複保存。日誌
當前設計只能保證單個producer的EOS語義,沒法實現多個producer實例一塊兒提供EOS語義。code
代碼分析orm
在Sender.run(long now)方法中,maybeWaitForProducerId()方法會生成一個producerID
void run(long now) { if (transactionManager != null) { try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { .... } long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now); }
對事務的支持是kafka實現EOS的第二個利器,引入事務使得clients端程序可以將一組消息放入一個原子性單元統一處理。
kafka事務屬性是指一系列的生產者生產消息和消費者提交偏移量的操做在一個事務,或者說是是一個原子操做),同時成功或者失敗。
kafka爲實現事務要求應用程序必須提供一個惟一的id表徵事務,這個id被稱爲事務id,他必須在應用正序全部的會話上是惟一的。transactionID和Pid是不一樣的,前者是用戶顯式提供的,後者是producer自行分配的。
producer提供了initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五個事務方法。
/** * 初始化事務。須要注意的有: * 一、前提 * 須要保證transation.id屬性被配置。 * 二、這個方法執行邏輯是: * (1)Ensures any transactions initiated by previous instances of the producer with the same * transactional.id are completed. If the previous instance had failed with a transaction in * progress, it will be aborted. If the last transaction had begun completion, * but not yet finished, this method awaits its completion. * (2)Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * */ public void initTransactions(); /** * 開啓事務 */ public void beginTransaction() throws ProducerFencedException ; /** * 爲消費者提供的在事務內提交偏移量的操做 */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ; /** * 提交事務 */ public void commitTransaction() throws ProducerFencedException; /** * 放棄事務,相似回滾事務的操做 */ public void abortTransaction() throws ProducerFencedException ;
在一個原子操做中,根據包含的操做類型,能夠分爲三種狀況,前兩種狀況是事務引入的場景,最後一種狀況沒有使用價值。
使用kafka的事務api時的一些注意事項:
建立一個事務,在這個事務操做中,只有生成消息操做。代碼以下:
/** * 在一個事務只有生產消息操做 */ public void onlyProduceInTransaction() {
// 建立生成者,代碼以下,須要:
// 配置transactional.id屬性
// 配置enable.idempotence屬性
Producer producer = buildProducer(); // 1.初始化事務 producer.initTransactions(); // 2.開啓事務 producer.beginTransaction(); try { // 3.kafka寫操做集合 // 3.1 do業務邏輯 // 3.2 發送消息 producer.send(new ProducerRecord<String, String>("test", "transaction-data-1")); producer.send(new ProducerRecord<String, String>("test", "transaction-data-2")); // 3.3 do其餘業務邏輯,還能夠發送其餘topic的消息。 // 4.事務提交 producer.commitTransaction(); } catch (Exception e) { // 5.放棄事務 producer.abortTransaction(); } }
/** * 在一個事務內,即有生產消息又有消費消息 */ public void consumeTransferProduce() { // 1.構建上產者 Producer producer = buildProducer(); // 2.初始化事務(生成productId),對於一個生產者,只能執行一次初始化事務操做 producer.initTransactions(); // 3.構建消費者和訂閱主題
// 建立消費者代碼,須要:
// 將配置中的自動提交屬性(auto.commit)進行關閉
// 並且在代碼裏面也不能使用手動提交commitSync( )或者commitAsync( )
// 設置isolation.level
Consumer consumer = buildConsumer(); consumer.subscribe(Arrays.asList("test")); while (true) { // 4.開啓事務 producer.beginTransaction(); // 5.1 接受消息 ConsumerRecords<String, String> records = consumer.poll(500); try { // 5.2 do業務邏輯; System.out.println("customer Message---"); Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap(); for (ConsumerRecord<String, String> record : records) { // 5.2.1 讀取消息,並處理消息。print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); // 5.2.2 記錄提交的偏移量 commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 6.生產新的消息。好比外賣訂單狀態的消息,若是訂單成功,則須要發送跟商家結轉消息或者派送員的提成消息 producer.send(new ProducerRecord<String, String>("test", "data2")); } // 7.提交偏移量 producer.sendOffsetsToTransaction(commits, "group0323"); // 8.事務提交 producer.commitTransaction(); } catch (Exception e) { // 7.放棄事務 producer.abortTransaction(); } } }