KafkaConsumer(消費者)每次調用 poll()方法,它老是返回由生產者寫入 Kafka但尚未被消費者讀取過的記錄, 咱們因 此能夠追蹤到哪些記錄是被羣組裏的哪一個消費者讀取的。以前已經討論過, Kafka 不會像其餘 JMS 隊列那樣須要獲得消費者的確認,這是 Kafka 的一個獨特之處。相反,消 費者可使用 Kafka來追蹤消息在分區裏的位置(偏移量)。數據庫
咱們把更新分區當前位置的操做叫做提交。安全
那麼消費者是如何提交偏移量的呢?消費者往一個 叫做 _consumer_offset 的特殊主題發送 消息,消息裏包含每一個分區的偏移量。 若是消費者一直處於運行狀態,那麼偏移量就沒有 什麼用處。不過,若是悄費者發生崩潰或者有新 的消費者加入羣組,就會觸發再均衡,完 成再均衡以後,每一個消費者可能分配到新 的分區,而不是以前處理的那個。爲了可以繼續 以前的工做,消費者須要讀取每一個分區最後一次提交 的偏移量,而後從偏移量指定的地方 繼續處理。服務器
若是提交的偏移量小於客戶端處理 的最後一個消息的偏移量 ,那麼處於兩個偏移量之間的 消息就會被重複處理,如圖 4-6所示。異步
若是提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的 消息將會丟失,如圖 4-7 所示。oop
因此,處理偏移量的方式對客戶端會有很大的影響。 KafkaConsumer API提供了不少種方式來提交偏移量。網站
最簡單的提交方式是讓悄費者自動提交偏移量。若是enable.auto.commit被設爲 true,那麼每過5s,消費者會自動把從 poll() 方法接收到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認值是 5s。與消費者裏的其餘東西 同樣,自動提交也是在輪詢(poll() )裏進行的。消費者每次在進行輪詢時會檢查是否該提交偏移量了,若是是,那 麼就會提交從上一次輪詢返回的偏移量。線程
不過,在使用這種簡便的方式以前,須要知道它將會帶來怎樣的結果。 假設咱們仍然使用默認的 5s提交時間間隔,在最近一次提交以後的 3s發生了再均衡,再 均衡以後,消費者從最後一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落後 了 3s,因此在這 3s 內到達的消息會被重複處理。能夠經過修改提交時間間隔來更頻繁地提交偏移量,減少可能出現重複消息的時間窗,不過這種狀況是無也徹底避免的 。blog
在使用自動提交時 ,每次調用輪詢方怯都會把上一次調用返 回的偏移量提交上去,它並不 知道具體哪些消息已經被處理了,因此在再次調用以前最好確保全部當前調用返回 的消息 都已經處理完畢(在調用 close() 方法以前也會進行自動提交)。 通常狀況下不會有什麼問 題,不過在處理異常或提早退出輪詢時要格外當心 。隊列
自動提交雖然方便 , 不過並無爲開發者留有餘地來避免重複處理消息。事件
大部分開發者經過控制偏移量提交時間來消除丟失消息的可能性,井在發生再均衡時減小 重複消息的數量。消費者 API提供了另外一種提交偏移量的方式 , 開發者能夠在必要的時候 提交當前偏移盤,而不是基於時間間隔。
取消自動提交,把 auto.commit.offset 設爲 false,讓應用程序決定什麼時候提交 偏 移量。使用 commitSync() 提交偏移量最簡單也最可靠。這個 API會提交由 poll() 方法返回 的最新偏移量,提交成 功後立刻返回,若是提交失敗就拋出異常。
要記住, commitSync() 將會提交由 poll() 返回的最新偏移量 , 因此在處理完全部記錄後要 確保調用了 commitSync(),不然仍是會有丟失消息的風險。若是發生了再均衡,從最近一 批消息到發生再均衡之間的全部消息都將被重複處理。
下面是咱們在處理完最近一批消息後使用 commitSync() 方法提交偏移量的例子。
同步提交有一個不足之處,在 broker對提交請求做出迴應以前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。咱們能夠經過下降提交頻率來提高吞吐量,但若是發生了再均衡, 會增長重複消息的數量。
這個時候可使用異步提交 API。咱們只管發送提交請求,無需等待 broker的響應。
在成功提交或碰到無怯恢復的錯誤以前, commitSync() 會一直重試(應用程序也一直阻塞),可是 commitAsync() 不會,這也是 commitAsync() 很差的 一個地方。它之因此不進行重試,是由於在它收到 服務器響應的時候,可能有一個更大的偏移量已經提交成功。假設咱們發出一個請求用於提交偏移量 2000,這個時候發生了短暫的通訊問題 ,服務器收不到請求,天然也不會 做出任何響應。與此同時,咱們處理了另一批消息,併成功提交了偏移量 3000。若是 commitAsync() 從新嘗試提交偏移量 2000,它有可能在偏移量 3000以後提交成功。這個時 候若是發生再均衡,就會出現重複消息。
咱們之因此提到這個問題的複雜性和提交順序的重要性,是由於 commitAsync()也支持回 調,在 broker 做出響應時會執行回調。回調常常被用於記錄提交錯誤或生成度量指標, 不 過若是你要用它來進行重試, 必定要注意提交的順序。
咱們可使用一個單調遞增的序列號來維護異步提交的順序。在每次提交偏 移量以後或在回調裏提交偏移量時遞增序列號。在進行重試前,先檢查回調 的序列號和即將提交的偏移量是否相等,若是相等,說明沒有新的提交,那麼能夠安全地進行重試。若是序列號比較大,說明有一個新的提交已經發送出去了,應該中止重試。
通常狀況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,由於若是提交失敗 是 由於臨時問題致使的,那麼後續的提交總會有成功的。但若是這是發生在關閉消費者或 再均衡前的最後一次提交,就要確保可以提交成功。
所以,在消費者關閉前通常會組合使用 commitAsync()和 commitSync()。它們的工做原理以下(後面講到再均衡監聽器時,咱們會討論如何在發生再均衡前提交偏移量):
提交偏移量的頻率與處理消息批次的頻率是同樣的。但若是想要更頻繁地提交出怎麼辦?若是 poll() 方法返回一大批數據,爲了不因再均衡引發的重複處理整批消息,想要在批次中間提交偏移量該怎麼辦?這種狀況沒法經過調用 commitSync()或 commitAsync() 來實現,由於它們只會提交最後一個偏移量,而此時該批次裏的消息尚未處理完。
幸運的是,消費者 API 容許在調用 commitSync()和 commitAsync()方法時傳進去但願提交 的分區和偏移量的 map。假設你處理了半個批次的消息, 最後一個來自主題「customers」 分區 3 的消息的偏移量是 5000, 你能夠調用 commitSync() 方法來提交它。不過,由於消費者可能不僅讀取一個分區, 你須要跟蹤全部分區的偏移量,因此在這個層面上控制偏移 量 的提交會讓代碼變複雜。
下面是提交特定偏移量的例子 :
在提交偏移量一節中提到過,消費者在退出和進行分區再均衡以前,會作一些清理工做。
你會在消費者失去對一個分區的全部權以前提交最後一個已處理記錄的偏移量。若是消費 者準備了 一 個緩衝區用於處理偶發的事件,那麼在失去分區全部權以前, 須要處理在緩衝 區累積下來的記錄。你可能還須要關閉文件句柄、數據庫鏈接等。
在爲消費者分配新分區或移除舊分區時,能夠經過消費者 API執行 一 些應用程序代碼,在調用 subscribe()方法時傳進去一個ConsumerRebalancelistener實例就能夠了。 ConsumerRebalancelistener有兩個須要實現的方法。
(1) public void onPartitionsRevoked(Collection<TopicPartition> partitions)方法會在 再均衡開始以前和消費者中止讀取消息以後被調用。若是在這裏提交偏移量,下一個接 管分區 的消費者就知道該從哪裏開始讀取了。
(2) public void onPartitionsAssigned(Collection<TopicPartition> partitions)方法會在 從新分配分區以後和消費者開始讀取消息以前被調用。
下面的例子將演示如何在失去分區全部權以前經過 onPartitionsRevoked()方法來提交偏移量。在下一節,咱們會演示另外一個同時使用了 onPartitionsAssigned()方法的例子。
到目前爲止,咱們知道了如何使用 poll() 方法從各個分區的最新偏移量處開始處理消息。 不過,有時候咱們也須要從特定的偏移量處開始讀取消息。
若是你想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息, 可使 用 seekToBeginning(Collection<TopicPartition> tp) 和 seekToEnd(Collection<TopicPartition> tp) 這兩個方法。
不過, Kafka也爲咱們提供了用 於查找特定偏移量的 API。 它有不少用途,好比向 後回退 幾個消息或者向前跳過幾個消息(對時間比較敏感的應用程序在處理滯後的狀況下但願能 夠向前跳過若干個消息)。在使用 Kafka 之外的系統來存儲偏移量時,它將給咱們 帶來更 大的驚喜。
試想一下這樣的場景:應用程序從 Kafka讀取事件(多是網站的用戶點擊事件流 ),對 它們進行處理(多是使用自動程序清理點擊操做井添加會話信息),而後把結果保 存到 數據庫、 NoSQL 存儲引擎或 Hadoop。假設咱們真的不想丟失任何數據,也不想在數據庫 裏屢次保存相同的結果。
這種狀況下,消費者的代碼多是這樣的 :
在這個例子裏,每處理一條記錄就提交一次偏移量。儘管如此, 在記錄被保存到數據庫以後以及偏移量被提交以前 ,應用程序仍然有可能發生崩潰,致使重複處理數據,數據庫裏就會出現重複記錄。
若是保存記錄和偏移量能夠在一個原子操做裏完成,就能夠避免出現上述狀況。記錄和偏 移量要麼 都被成功提交,要麼都不提交。若是記錄是保存在數據庫裏而偏移量是提交到 Kafka 上,那麼就沒法實現原子操做。
不過 ,若是在同一個事務裏把記錄和偏移量都寫到數據庫裏會怎樣呢?那麼咱們就會知道 記錄和偏移量要麼都成功提交,要麼都沒有,而後從新處理記錄。
如今的問題是:若是偏移量是保存在數據庫裏而不是 Kafka裏,那麼消費者在獲得新分區 時怎麼知道該從哪裏開始讀取?這個時候可使用 seek() 方法。在消費者啓動或分配到新 分區時 ,可使用 seek()方法查找保存在數據庫裏的偏移量。
下面的例子大體說明了如何使用這個 API。 使用 ConsumerRebalancelistener和 seek() 方 戰確保咱們是從數據庫裏保存的偏移量所指定的位置開始處理消息的。
經過把偏移量和記錄保存到同 一個外部系統來實現單次語義能夠有不少種方式,不過它們 都須要結合使用 ConsumerRebalancelistener和 seek() 方法來確保可以及時保存偏移量, 井保證消費者老是可以從正確的位置開始讀取消息。
在以前討論輪詢時就說過,不須要擔憂消費者會在一個無限循環裏輪詢消息,咱們會告訴消費者如何優雅地退出循環。
若是肯定要退出循環,須要經過另外一個線程調用 consumer.wakeup()方法。若是循環運行 在主線程裏,能夠在 ShutdownHook裏調用該方法。要記住, consumer.wakeup() 是消費者 惟一一個能夠從其餘線程裏安全調用的方法。調用 consumer.wakeup()能夠退出 poll(), 並拋出 WakeupException異常,或者若是調用 cconsumer.wakeup() 時線程沒有等待輪詢, 那 麼異常將在下一輪調用 poll()時拋出。咱們不須要處理 WakeupException,由於它只是用於跳出循環的一種方式。不過, 在退出線程以前調用 consumer.close()是頗有必要的, 它 會提交任何尚未提交的東西 , 並向羣組協調器(broker)發送消息,告知本身要離開羣組,接下來 就會觸發再均衡 ,而不須要等待會話超時。
下面是運行在主線程上的消費者退出線程的代碼 。