Storm1.1.1 對 0.10.x 版 Kafka之commit offsets

因爲 0.10.x 版 Kafka 與 0.8.x 版有很大的變化,這種變化對下游 Storm 有很是大的影響,0.10.x 版的 Kafka 不但增長了權限管理的功能,並且還將 simple 和 high consumer 的 offsets 進行統一管理,也就意味着在 0.8.x 中 Storm 須要去負責管理 offsets,而在 0.10.x 中,Storm 不須要關心 consumer 的 offsets 的問題,這對 KafkaSpout 的設計有很大的影響,本文就是對 Storm 對 0.10.x 版 Kafka 支持的實現部分的解析。java

0.10.x 版 KafkaSpout 的實現

社區對新版 Kafka 的支持,整體分爲兩種狀況:git

  1. 一種是選擇自動 commit 機制;
  2. 另外一種是非自動 commit,就是將 commit 的權利交與 Storm 來控制。

下面分別對這兩種狀況進行分析。github

Kafka Consumer 的一些配置會對 Storm 的性能很大影響,下面的三個參數的設置對其性能的影響最大(默認值是根據MICROBENCHMARKING APACHE STORM 1.0 PERFORMANCE測試獲得):apache

  • fetch.min.bytes:默認值 1;
  • fetch.max.wait.ms:默認值 500(ms);
  • Kafka Consumer instance poll timeout, 它能夠在經過 KafkaSpoutConfig 的方法 setPollTimeoutMs 來配置,默認值是 200ms;

自動 commit 模式

自動 commit 模式就是 commit 的時機由 Consumer 來控制,本質上是異步 commit,當定時達到時,就進行 commit。而 Storm 端並無進行任何記錄,也就是這部分的容錯徹底由 Consumer 端來控制,而 Consumer 並不會關心數據的處理成功與否,只關心數據是否 commit,若是未 commit,就會從新發送數據,那麼就有可能致使下面這個後果:異步

形成那些已經 commit、但 Storm 端處理失敗的數據丟失

丟失的緣由函數

一些數據發送到 Spout 以後,剛好 commit 的定時到達,進行了 commit,可是這中間有某條或者幾條數據處理失敗,這就是說,這幾條處理失敗的數據已經進行 commit 了,Kafka 端也就不會從新進行發送。性能

可能出現的這種後果也肯定了自動 commit 模式不能知足咱們的需求,爲了保證數據不丟,須要數據在 Storm 中 ack 以後才能被 commit,所以,commit 仍是應該由 Storm 端來進行控制,才能保證數據被正確處理。測試

非自動 commit 模式

當選用非自動的 commit 機制(實際上就是使用 Consumer 的同步 commit 機制)時,須要手動去設置 commit 的參數,有如下兩項須要設置:fetch

  • offset.commit.period.ms:設置 spout 多久向 Kafka commit一次,在 KafkaSpoutConfig 的 setOffsetCommitPeriodMs 中配置;
  • max.uncommitted.offsets:控制在下一次拉取數據以前最多能夠有多少數據在等待 commit,在 KafkaSpoutConfig 的 setMaxUncommittedOffsets 中配置;

spout 的處理過程

關於 Kafka 的幾個 offset 的概念,能夠參考 offset的一些相關概念spa

KafkaSpout 的處理過程主要是在 nextTuple() 方法,其處理過程以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void nextTuple() {
  if (initialized) {
    if (commit()) {// Step1 非自動 commit,而且定時達到
      commitOffsetsForAckedTuples(); // 對全部已經 ack 的 msgs 進行 commit
    }
 
    if (poll()) {//Step2 拉取的數據都已經發送,而且未 commit 的消息數小於設置的最大 uncommit 數
      setWaitingToEmit(pollKafkaBroker());
      //將拉取的全部 record 都放到 waitingToEmit 集合中,可能會重複拉取數據(因爲一些 msg 須要重試,經過修改 Last Committed Offset 的值來實現的)
    }
 
    if (waitingToEmit()) {//Step3 waitingToEmit 中還有數據
      emit(); //發送數據,但會跳過已經 ack 或者已經發送的消息
    }
  } else {
    LOG.debug( "Spout not initialized. Not sending tuples until initialization completes");
  }
}

上面主要分爲三步:

  1. 若是是非自動 commit,而且 commit 定時達到,那麼就將全部已經 ack 的數據(這些數據的 offset 必須是連續的,不連續的數據不會進行 commit)進行 commit;
  2. 若是拉取的數據都已經發送,而且未 commit 的消息數(記錄在 numUncommittedOffsets 中)小於設置的最大 uncommit 數,那麼就根據更新後的 offset (將 offset 重置到須要重試的 msg 的最小 offset,這樣該 offset 後面的 msg 仍是會被從新拉取)拉取數據,並將拉取到的數據存儲到 waitingToEmit 集合中;
  3. 若是 waitingToEmit 集合中還有數據,就發送數據,但在發送數據的過程當中,會進行判斷,只發送沒有 ack 的數據。

KafkaSpout 如何進行容錯

舉個示例,以下圖所示

consumer offset

  1. 圖1表示一個 nextTuple() 循環結束以後,offset 爲14那條數據處理失敗,而offset 爲15-18的數據處理成功;
  2. 圖2表示在下次循環 Step 1 結束以後、Step 2 開始以前,Consumer 會將 the last committed offset 重置到 offset 爲14的位置。

也就是說從 offset 爲14開始,後面的數據會從新發送。

有人可能會問,那樣的話會不會形成數據重複發送?

Storm 是如何解決這個問題的呢?答案就是 Storm 會用一個 map 記錄已經 ack 的數據(acked),Storm 在進行 commit 的時候也是根據這個 map 的數據進行 commit 的,不過 commit 數據的 offset 必須是連續的,如上圖所示,只能將 offset 爲11-13的數據 commit,而15-18的數據因爲 offset 爲14的數據未處理成功而不能 commit。offset 爲11-13的數據在 commit 成功後會從 map 中移除,而 offset 爲15-18的數據依然在 map 中,Storm 在將從 Kafka 拉取的數據加入到 waitingToEmit 集合時後,進行 emit 數據時,會先檢測該數據是否存在 acked 中,若是存在的話,就證實該條數據已經處理過了,不會在進行發送。

這裏有幾點須要注意的:

  1. 對已經 ack 的 msg 進行 commit 時,所 commit 的 msg 的 offset 必須是連續的(該 msg 存儲在一個 TreeMap 中,按 offset 排序),斷續的數據會暫時接着保存在集合中,不會進行 commit,若是出現斷續,那就證實中間有數據處理失敗,須要從新處理;
  2. storm 處理 failed 的 msg,會保存到一個專門的集合中,在每次拉取數據時(是拉取數據,不是發送數據,發送數據時會檢測該數據是否已經成功處理),會遍歷該集合中包含的全部 TopicPartiion,獲取該 partition 的 Last Committed Offset;

這樣設計有一個反作用就是:若是有一個 msg 一直不成功,就會致使 KafkaSpout 由於這一條數據的影響而不斷地重複拉取這批數據,形成整個拓撲卡在這裏。

Kafka Rebalance 的影響

Kafka Rebalance 能夠參考Consumer Rebalance.

KafkaSpout 實現了一個內部類用來監控 Group Rebalance 的狀況,實現了兩個回調函數,一旦發現 group 的狀態變爲 preparingRabalance 以後

  1. onPartitionsRevoked 這個方法會在 Consumer 中止拉取數據以後、group 進行 rebalance 操做以前調用,做用是對已經 ack 的 msg 進行 commit;
  2. onPartitionsAssigned 這個方法 group 已經進行 reassignment 以後,開始拉取數據以前調用,做用是清理內存中不屬於這個線程的 msg、獲取 partition 的 last committed offset。

潛在的風險點

這部分仍是有可能致使數據重複發送的,設想下面一種狀況:

若是以前因爲一個條消息處理失敗(Partition 1),形成部分數據沒有 commit 成功,在進行 rebalance 後,剛好 Partition 1 被分配到其餘 spout 線程時,那麼當前的 spout 就會關於 Partition 1 的相關數據刪除掉,致使部分已經 commit 成功的數據(記錄在 acked 中)被刪除,而另外的 spout 就會從新拉取這部分數據進行處理,那麼就會致使這部分已經成功處理的數據重複處理

相關文章
相關標籤/搜索