Apche Kafka 的生與死 – failover 機制詳解

轉自:http://www.cnblogs.com/fxjwind/p/4972244.htmlhtml

Kafka 做爲 high throughput 的消息中間件,以其性能,簡單和穩定性,成爲當前實時流處理框架中的主流的基礎組件。node

固然在使用 Kafka 中也碰到很多問題,尤爲是 failover 的問題,經常給你們帶來很多困擾和麻煩。 
因此在梳理完 kafka 源碼的基礎上,儘可能用通俗易懂的方式,把 Kafka 發生 failover 時的機制解釋清楚,讓你們在使用和運維中,作到心中有數。算法

若是對 kafka 不瞭解的,能夠先參考https://kafka.apache.org/08/design.html,有個大體的概念。apache

 

0 背景

這裏討論 kafka 的 failover 的前提是在0.8版本後, kafka 提供了 replica 機制。 
對於0.7版本不存在 failover 的說法,由於任意一個 broker dead 都會致使上面的數據不可讀,從而致使服務中斷。網絡

下面簡單的介紹一下 0.8中加入的 replica 機制和相應的組件,session

Replica 機制

基本思想大同小異,以下圖 (Ref.2):併發

image_thumb3

 

圖中有4個 kafka brokers,而且Topic1有四個 partition(用藍色表示)分佈在4個 brokers 上,爲 leader replica; 
且每一個 partition 都有兩個 follower replicas(用橘色表示),分佈在和 leader replica 不一樣的 brokers。 
這個分配算法很簡單,有興趣的能夠參考kafka的design。框架

 

Replica 組件

爲了支持replica機制,主要增長的兩個組件是,Replica Manager和Controller, 以下圖:less

image_thumb1

 

Replica Manager

每一個 broker server 都會建立一個 Replica Manager,全部的數據的讀寫都須要通過它 , 
0.7版本,kafka 會直接從 LogManager 中讀數據,但在增長 replica 機制後,只有 leader replica 能夠響應數據的讀寫請求 。 
因此,Replica Manager 須要管理全部 partition 的 replica 狀態,並響應讀寫請求,以及其餘和 replica 相關的操做。運維

 

Controller

你們能夠看到,每一個 partition 都有一個 leader replica,和若干的 follower replica,那麼誰來決定誰是leader? 
你說有 zookeeper,但用 zk 爲每一個 partition 作 elect,效率過低,並且 zk 會不堪重負; 
因此如今的通用作法是,只用 zk 選一個 master 節點,而後由這個 master 節點來作其餘的全部仲裁工做。 
kafka 的作法就是在 brokers 中選出一個做爲 controller,來作爲 master 節點,從而仲裁全部的 partition 的 leader 選舉。

 

下面咱們會從以下幾個方面來解釋 failover 機制, 
先從 client 的角度看看當 kafka 發生 failover 時,數據一致性問題。 
而後從 Kafka 的各個重要組件,Zookeeper,Broker, Controller 發生 failover 會形成什麼樣的影響? 
最後給出一些判斷 kafka 狀態的 tips。

 

1 從 Client 的角度

從 producer 的角度, 發的數據是否會丟?

除了要打開 replica 機制,還取決於 produce 的 request.required.acks 的設置,

  • acks = 0,發就發了,不須要 ack,不管成功與否 ;
  • acks = 1,當寫 leader replica 成功後就返回,其餘的 replica 都是經過fetcher去異步更新的,固然這樣會有數據丟失的風險,若是leader的數據沒有來得及同步,leader掛了,那麼會丟失數據;
  • acks = –1, 要等待全部的replicas都成功後,才能返回;這種純同步寫的延遲會比較高。

因此,通常的狀況下,thoughput 優先,設成1,在極端狀況下,是有可能丟失數據的; 
若是能夠接受較長的寫延遲,能夠選擇將 acks 設爲 –1。

 

從 consumer 的角度, 是否會讀到不一致的數據?

首先不管是 high-level 或 low-level consumer,咱們要知道他是怎麼從 kafka 讀數據的?

image_thumb12

kafka 的 log patition 存在文件中,並以 offset 做爲索引,因此 consumer 須要對於每一個 partition 記錄上次讀到的 offset (high-level和low-level的區別在因而 kafka 幫你記,仍是你本身記);

因此若是 consumer dead,重啓後只須要繼續從上次的 offset 開始讀,那就不會有不一致的問題。

但若是是 Kafka broker dead,併發生 partition leader 切換,如何保證在新的 leader 上這個 offset 仍然有效?  
Kafka 用一種機制,即 committed offset,來保證這種一致性,以下圖(Ref.2)

image_thumb13

log 除了有 log end offset 來表示 log 的末端,還有一個 committed offset, 表示有效的 offset; 
committed offset 只有在全部 replica 都同步完該 offset 後,纔會被置爲該offset; 
因此圖中 committed 置爲2, 由於 broker3 上的 replica 尚未完成 offset 3 的同步; 
因此這時,offset 3 的 message 對 consumer 是不可見的,consumer最多隻能讀到 offset 2。 
若是此時,leader dead,不管哪一個 follower 從新選舉成 leader,都不會影響數據的一致性,由於consumer可見的offset最多爲2,而這個offset在全部的replica上都是一致的。

因此在通常正常狀況下,當 kafka 發生 failover 的時候,consumer 是不會讀到不一致數據的。特例的狀況就是,當前 leader 是惟一有效的 replica,其餘replica都處在徹底不一樣步狀態,這樣發生 leader 切換,必定是會丟數據的,並會發生 offset 不一致。

 

2 Zookeeper Failover

Kafka 首先對於 zookeeper 是強依賴,因此 zookeeper 發生異常時,會對數據形成如何的影響?

Zookeeper Dead

若是 zookeeper dead,broker 是沒法啓動的,報以下的異常:

image_thumb16

這種異常,有多是 zookeeper dead,也有多是網絡不通,總之就是連不上 zookeeper。 
這種 case,kafka徹底不工做,直到能夠連上 zookeeper 爲止。

 

Zookeeper Hang

其實上面這種狀況比較簡單,比較麻煩的是 zookeeper hang,能夠說 kafka 的80%以上問題都是因爲這個緣由 
zookeeper hang 的緣由有不少,主要是 zk 負載太重,zk 所在主機 cpu,memeory 或網絡資源不夠等

zookeeper hang 帶來的主要問題就是 session timeout,這樣會觸發以下的問題,

a. Controller Fail,Controller 發生從新選舉和切換,具體過程參考下文。

b. Broker Fail,致使partition的leader發生切換或partition offline,具體過程參考下文。

c. Broker 被 hang 住 。 
這是一種比較特殊的 case,出現時在 server.log 會出現以下的log,

server.log: 
「INFO I wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709  63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)」

這個問題自己是因爲 zookeeper 的一個 bug,參考:https://issues.apache.org/jira/browse/ZOOKEEPER-1740

問題在於「The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.」 
即 zk 的 session 過時和 ephemeral node 刪除並非一個原子操做; 
出現的case以下:

  • 在極端case下,zk 觸發了 session timeout,但還沒來得及完成 /brokers/ids/1 節點的刪除,就被 hang 住了,好比是去作很耗時的 fsync 操做 。
  • 可是 broker 1 收到 session timeout 事件後,會嘗試從新去 zk 上建立 /brokers/ids/1 節點,可這時舊的節點仍然存在,因此會獲得 NodeExists,其實這個是不合理的,由於既然 session timeout,這個節點就應該不存在。
  • 一般的作法,既然已經存在,我就無論了,該幹啥幹啥去;問題是一會 zk 從 fsync hang 中恢復了,他會記得還有一個節點沒有刪除,這時會去把 /brokers/ids/1 節點刪除。
  • 結果就是對於client,雖然沒有再次收到 session 過時的事件,可是 /brokers/ids/1 節點卻不存在了。

因此這裏作的處理是,在前面發現 NodeExists 時,while true 等待,一直等到 zk 從 hang 中恢復刪除該節點,而後建立新節點成功,纔算完; 
這樣作的結果是這個broker也會被一直卡在這兒,等待該節點被成功建立。

 

3 Broker Failover

Broker 的 Failover,能夠分爲兩個過程,一個是 broker failure, 一個是 broker startup。

新加 broker

在談failover以前,咱們先看一個更簡單的過程,就是新加一個全新的 broker: 
首先明確,新加的 broker 對現存全部的 topic 和 partition,不會有任何影響; 
由於一個 topic 的 partition 的全部 replica 的 assignment 狀況,在建立時就決定了,並不會自動發生變化,除非你手動的去作 reassignment。 
因此新加一個 broker,所須要作的只是你們同步一下元數據,你們都知道來了一個新的 broker,當你建立新的 topic 或 partition 的時候,它會被用上。

 

Broker Failure

首先明確,這裏的 broker failure,並不必定是 broker server 真正的 dead了, 只是指該 broker 所對應的 zk ephemeral node ,好比/brokers/ids/1,發生 session timeout; 
固然發生這個的緣由,除了server dead,還有不少,好比網絡不通;可是咱們不關心,只要出現 sessioin timeout,咱們就認爲這個 broker 不工做了; 
會出現以下的log,

controller.log: 
「INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)」 
「INFO [Controller 1]: Broker failure callback for 4 (kafka.controller.KafkaController)」

當一個 broker failure 會影響什麼,其實對於多 replicas 場景,通常對最終客戶沒啥影響。 
只會影響哪些 leader replica 在該 broker 的 partitions; 須要從新作 leader election,若是沒法選出一個新的 leader,會致使 partition offline。 
由於若是隻是 follow replica failure,不會影響 partition 的狀態,仍是能夠服務的,只是可用 replica 少了一個;須要注意的是,kafka 是不會自動補齊失敗的replica的,即壞一個少一個; 
可是對於 leader replica failure,就須要從新再 elect leader,前面已經討論過,新選取出的 leader 是能夠保證 offset 一致性的;

Note: 其實這裏的一致性是有前提的,即除了 fail 的 leader,在 ISR(in-sync replicas) 裏面還存在其餘的 replica;顧名思義,ISR,就是能 catch up with leader 的 replica。 
雖然 partition 在建立的時候,會分配一個 AR(assigned replicas),可是在運行的過程當中,可能會有一些 replica 因爲各類緣由沒法跟上 leader,這樣的 replica 會被從 ISR 中去除。 
因此 ISR <= AR; 
若是,ISR 中 沒有其餘的 replica,而且容許 unclean election,那麼能夠從 AR 中選取一個 leader,但這樣必定是丟數據的,沒法保證 offset 的一致性。

 

 

Broker Startup

這裏的 startup,就是指 failover 中的 startup,會出現以下的log,

controller.log: 
「INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)」 
「INFO [Controller 1]: New broker startup callback for 
3 (kafka.controller.KafkaController)」

過程也不復雜,先將該 broker 上的全部的 replica 設爲 online,而後觸發 offline partition 或 new partition 的 state 轉變爲 online; 
因此 broker startup,只會影響 offline partition 或 new partition,讓他們有可能成爲 online。 
那麼對於普通的已經 online partition,影響只是多一個可用的 replica,那仍是在它完成catch up,被加入 ISR 後的事。

Note: Partition 的 leader 在 broker failover 後,不會立刻自動切換回來,這樣會產生的問題是,broker間負載不均衡,由於全部的讀寫都須要經過 leader。 
爲了解決這個問題,在server的配置中有個配置,auto.leader.rebalance.enable,將其設爲true; 
這樣 Controller 會啓動一個 scheduler 線程,按期去爲每一個 broker 作 rebalance,即發現若是該 broker 上的 imbalance ratio 達到必定比例,就會將其中的某些 partition 的 leader,進行從新 elect 到原先的 broker 上。

 

4 Controller Failover

前面說明過,某個 broker server 會被選出做爲 Controller,這個選舉的過程就是依賴於 zookeeper 的 ephemeral node,誰能夠先在"/controller"目錄建立節點,誰就是 controller; 
因此反之,咱們也是 watch 這個目錄來判斷 Controller 是否發生 failover 或 變化。Controller 發生 failover 時,會出現以下 log:

controller.log: 
「INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)」

Controller 主要是做爲 master 來仲裁 partition 的 leader 的,並維護 partition 和 replicas 的狀態機,以及相應的 zk 的 watcher 註冊;

Controller 的 failover 過程以下:

  • 試圖去在「/controller」 目錄搶佔建立 ephemeral node;
  • 若是已經有其餘的 broker 先建立成功,那麼說明新的 controller 已經誕生,更新當前的元數據便可;
  • 若是本身建立成功,說明我已經成爲新的 controller,下面就要開始作初始化工做,
  • 初始化主要就是建立和初始化 partition 和 replicas 的狀態機,並對 partitions 和 brokers 的目錄的變化設置 watcher。

能夠看到,單純 Controller 發生 failover,是不會影響正常數據讀寫的,只是 partition 的 leader 沒法被從新選舉,若是此時有 partition 的 leader fail,會致使 partition offline; 
可是 Controller 的 dead,每每是伴隨着 broker 的 dead,因此在 Controller 發生 failover 的過程當中,每每會出現 partition offline, 致使數據暫時不可用。

 

5 Tips

Kafka 提供一些工具來方便的查看信息,參考:Kafka Tools

a, 驗證topic 是否work?

最簡單的方式,就是用 producer 和 consumer console 來測試

Producer console,以下能夠往 localhost 的 topic test,插入兩條 message,

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

Consumer console,以下就能夠把剛寫入的 message 讀出,

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

若是整個過程沒有報錯,ok,說明你的topic是能夠工做的

 

b, 再看看topic是否健康?

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

這樣會打印出 topic test 的 detail 信息,如圖,

image_thumb2

從這個圖能夠說明幾個問題:

首先,topic 有幾個 partitions,而且 replicas factor 是多少,即有幾個 replica? 
圖中分別有32個 partitions,而且每一個 partition 有兩個 replica。

再者,每一個 partition 的 replicas 都被分配到哪些 brokers 上,而且該 partition 的 leader 是誰? 
好比,圖中的 partition0,replicas 被分配到 brokers 4和1上面,其中 leader replica 在 broker 1 上。

最後,是否健康? 
從如下幾個方面依次代表健康程度,

  • Isr 爲空,說明這個 partition 已經 offline 沒法提供服務了,這種 case 在咱們的圖中沒有出現;
  • Isr 有數據,可是 Isr < Replicas,這種狀況下對於用戶是沒有感知的,可是說明有部分 replicas 已經出問題了,至少是暫時沒法和 leader 同步;好比,圖中的 partition0,Isr 只有1,說明 replica 4 已經 offline
  • Isr = Replicas,可是 leader 不是 Replicas 中的第一個 replica,這個說明 leader 是發生太重新選取的,這樣可能會致使 brokers 負載不均衡;好比,圖中的 partition9,leader是2,而不是3,說明雖然當前它的全部 replica 都是正常的,但以前發生太重新選舉。

 

c,最後就是看kafka的日誌,kafka/logs

主要是看 controller.log 和 server.log,分別記錄 controller 和 broker server 的日誌。 
而後根據前面我給的每種異常的日誌,你能夠看出來究竟是出現什麼問題。

 

Reference

1. https://kafka.apache.org/08/design.html

2. Neha NarkhedeHands-free Kafka Replication: A lesson in operational simplicity

3. Kafka Tools

相關文章
相關標籤/搜索