因爲 0.10.x 版 Kafka 與 0.8.x 版有很大的變化,這種變化對下游 Storm 有很是大的影響,0.10.x 版的 Kafka 不但增長了權限管理的功能,並且還將 simple 和 high consumer 的 offsets 進行統一管理,也就意味着在 0.8.x 中 Storm 須要去負責管理 offsets,而在 0.10.x 中,Storm 不須要關心 consumer 的 offsets 的問題,這對 KafkaSpout 的設計有很大的影響,本文就是對 Storm 對 0.10.x 版 Kafka 支持的實現
部分的解析。java
社區對新版 Kafka 的支持,整體分爲兩種狀況:git
下面分別對這兩種狀況進行分析。github
Kafka Consumer 的一些配置會對 Storm 的性能很大影響,下面的三個參數的設置對其性能的影響最大(默認值是根據MICROBENCHMARKING APACHE STORM 1.0 PERFORMANCE測試獲得):apache
fetch.min.bytes
:默認值 1;fetch.max.wait.ms
:默認值 500(ms);Kafka Consumer instance poll timeout
, 它能夠在經過 KafkaSpoutConfig 的方法 setPollTimeoutMs 來配置,默認值是 200ms;自動 commit 模式就是 commit 的時機由 Consumer 來控制,本質上是異步 commit,當定時達到時,就進行 commit。而 Storm 端並無進行任何記錄,也就是這部分的容錯徹底由 Consumer 端來控制,而 Consumer 並不會關心數據的處理成功與否,只關心數據是否 commit,若是未 commit,就會從新發送數據,那麼就有可能致使下面這個後果:異步
丟失的緣由函數
一些數據發送到 Spout 以後,剛好 commit 的定時到達,進行了 commit,可是這中間有某條或者幾條數據處理失敗,這就是說,這幾條處理失敗的數據已經進行 commit 了,Kafka 端也就不會從新進行發送。性能
可能出現的這種後果也肯定了自動 commit 模式不能知足咱們的需求,爲了保證數據不丟,須要數據在 Storm 中 ack 以後才能被 commit,所以,commit 仍是應該由 Storm 端來進行控制,才能保證數據被正確處理。測試
當選用非自動的 commit 機制(實際上就是使用 Consumer 的同步 commit 機制)時,須要手動去設置 commit 的參數,有如下兩項須要設置:fetch
offset.commit.period.ms
:設置 spout 多久向 Kafka commit一次,在 KafkaSpoutConfig 的 setOffsetCommitPeriodMs 中配置;max.uncommitted.offsets
:控制在下一次拉取數據以前最多能夠有多少數據在等待 commit,在 KafkaSpoutConfig 的 setMaxUncommittedOffsets 中配置;關於 Kafka 的幾個 offset 的概念,能夠參考 offset的一些相關概念spa
KafkaSpout 的處理過程主要是在 nextTuple()
方法,其處理過程以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public void nextTuple() {
if (initialized) {
if (commit()) {// Step1 非自動 commit,而且定時達到
commitOffsetsForAckedTuples();
// 對全部已經 ack 的 msgs 進行 commit
}
if (poll()) {//Step2 拉取的數據都已經發送,而且未 commit 的消息數小於設置的最大 uncommit 數
setWaitingToEmit(pollKafkaBroker());
//將拉取的全部 record 都放到 waitingToEmit 集合中,可能會重複拉取數據(因爲一些 msg 須要重試,經過修改 Last Committed Offset 的值來實現的)
}
if (waitingToEmit()) {//Step3 waitingToEmit 中還有數據
emit();
//發送數據,但會跳過已經 ack 或者已經發送的消息
}
}
else {
LOG.debug(
"Spout not initialized. Not sending tuples until initialization completes");
}
}
|
上面主要分爲三步:
numUncommittedOffsets
中)小於設置的最大 uncommit 數,那麼就根據更新後的 offset (將 offset 重置到須要重試的 msg 的最小 offset,這樣該 offset 後面的 msg 仍是會被從新拉取)拉取數據,並將拉取到的數據存儲到 waitingToEmit
集合中;waitingToEmit
集合中還有數據,就發送數據,但在發送數據的過程當中,會進行判斷,只發送沒有 ack 的數據。舉個示例,以下圖所示
nextTuple()
循環結束以後,offset 爲14那條數據處理失敗,而offset 爲15-18的數據處理成功;也就是說從 offset 爲14開始,後面的數據會從新發送。
有人可能會問,那樣的話會不會形成數據重複發送?
Storm 是如何解決這個問題的呢?答案就是 Storm 會用一個 map 記錄已經 ack 的數據(acked
),Storm 在進行 commit 的時候也是根據這個 map 的數據進行 commit 的,不過 commit 數據的 offset 必須是連續的,如上圖所示,只能將 offset 爲11-13的數據 commit,而15-18的數據因爲 offset 爲14的數據未處理成功而不能 commit。offset 爲11-13的數據在 commit 成功後會從 map 中移除,而 offset 爲15-18的數據依然在 map 中,Storm 在將從 Kafka 拉取的數據加入到 waitingToEmit
集合時後,進行 emit 數據時,會先檢測該數據是否存在 acked
中,若是存在的話,就證實該條數據已經處理過了,不會在進行發送。
這裏有幾點須要注意的:
這樣設計有一個反作用就是:若是有一個 msg 一直不成功,就會致使 KafkaSpout 由於這一條數據的影響而不斷地重複拉取這批數據,形成整個拓撲卡在這裏。
Kafka Rebalance 能夠參考Consumer Rebalance.
KafkaSpout 實現了一個內部類用來監控 Group Rebalance 的狀況,實現了兩個回調函數,一旦發現 group 的狀態變爲 preparingRabalance
以後
onPartitionsRevoked
這個方法會在 Consumer 中止拉取數據以後、group 進行 rebalance 操做以前調用,做用是對已經 ack 的 msg 進行 commit;onPartitionsAssigned
這個方法 group 已經進行 reassignment 以後,開始拉取數據以前調用,做用是清理內存中不屬於這個線程的 msg、獲取 partition 的 last committed offset。這部分仍是有可能致使數據重複發送的,設想下面一種狀況:
若是以前因爲一個條消息處理失敗(Partition 1),形成部分數據沒有 commit 成功,在進行 rebalance 後,剛好 Partition 1 被分配到其餘 spout 線程時,那麼當前的 spout 就會關於 Partition 1 的相關數據刪除掉,致使部分已經 commit 成功的數據(記錄在 acked 中)被刪除,而另外的 spout 就會從新拉取這部分數據進行處理,那麼就會致使這部分已經成功處理的數據重複處理。