轉自:http://www.cnblogs.com/fxjwind/p/4972244.htmlhtml
Kafka 做爲 high throughput 的消息中間件,以其性能,簡單和穩定性,成爲當前實時流處理框架中的主流的基礎組件。node
固然在使用 Kafka 中也碰到很多問題,尤爲是 failover 的問題,經常給你們帶來很多困擾和麻煩。
因此在梳理完 kafka 源碼的基礎上,儘可能用通俗易懂的方式,把 Kafka 發生 failover 時的機制解釋清楚,讓你們在使用和運維中,作到心中有數。算法
若是對 kafka 不瞭解的,能夠先參考https://kafka.apache.org/08/design.html,有個大體的概念。apache
這裏討論 kafka 的 failover 的前提是在0.8版本後, kafka 提供了 replica 機制。
對於0.7版本不存在 failover 的說法,由於任意一個 broker dead 都會致使上面的數據不可讀,從而致使服務中斷。網絡
下面簡單的介紹一下 0.8中加入的 replica 機制和相應的組件,session
基本思想大同小異,以下圖 (Ref.2):併發
圖中有4個 kafka brokers,而且Topic1有四個 partition(用藍色表示)分佈在4個 brokers 上,爲 leader replica;
且每一個 partition 都有兩個 follower replicas(用橘色表示),分佈在和 leader replica 不一樣的 brokers。
這個分配算法很簡單,有興趣的能夠參考kafka的design。框架
爲了支持replica機制,主要增長的兩個組件是,Replica Manager和Controller, 以下圖:less
每一個 broker server 都會建立一個 Replica Manager,全部的數據的讀寫都須要通過它 ,
0.7版本,kafka 會直接從 LogManager 中讀數據,但在增長 replica 機制後,只有 leader replica 能夠響應數據的讀寫請求 。
因此,Replica Manager 須要管理全部 partition 的 replica 狀態,並響應讀寫請求,以及其餘和 replica 相關的操做。運維
你們能夠看到,每一個 partition 都有一個 leader replica,和若干的 follower replica,那麼誰來決定誰是leader?
你說有 zookeeper,但用 zk 爲每一個 partition 作 elect,效率過低,並且 zk 會不堪重負;
因此如今的通用作法是,只用 zk 選一個 master 節點,而後由這個 master 節點來作其餘的全部仲裁工做。
kafka 的作法就是在 brokers 中選出一個做爲 controller,來作爲 master 節點,從而仲裁全部的 partition 的 leader 選舉。
下面咱們會從以下幾個方面來解釋 failover 機制,
先從 client 的角度看看當 kafka 發生 failover 時,數據一致性問題。
而後從 Kafka 的各個重要組件,Zookeeper,Broker, Controller 發生 failover 會形成什麼樣的影響?
最後給出一些判斷 kafka 狀態的 tips。
除了要打開 replica 機制,還取決於 produce 的 request.required.acks 的設置,
因此,通常的狀況下,thoughput 優先,設成1,在極端狀況下,是有可能丟失數據的;
若是能夠接受較長的寫延遲,能夠選擇將 acks 設爲 –1。
首先不管是 high-level 或 low-level consumer,咱們要知道他是怎麼從 kafka 讀數據的?
kafka 的 log patition 存在文件中,並以 offset 做爲索引,因此 consumer 須要對於每一個 partition 記錄上次讀到的 offset (high-level和low-level的區別在因而 kafka 幫你記,仍是你本身記);
因此若是 consumer dead,重啓後只須要繼續從上次的 offset 開始讀,那就不會有不一致的問題。
但若是是 Kafka broker dead,併發生 partition leader 切換,如何保證在新的 leader 上這個 offset 仍然有效?
Kafka 用一種機制,即 committed offset,來保證這種一致性,以下圖(Ref.2)
log 除了有 log end offset 來表示 log 的末端,還有一個 committed offset, 表示有效的 offset;
committed offset 只有在全部 replica 都同步完該 offset 後,纔會被置爲該offset;
因此圖中 committed 置爲2, 由於 broker3 上的 replica 尚未完成 offset 3 的同步;
因此這時,offset 3 的 message 對 consumer 是不可見的,consumer最多隻能讀到 offset 2。
若是此時,leader dead,不管哪一個 follower 從新選舉成 leader,都不會影響數據的一致性,由於consumer可見的offset最多爲2,而這個offset在全部的replica上都是一致的。
因此在通常正常狀況下,當 kafka 發生 failover 的時候,consumer 是不會讀到不一致數據的。特例的狀況就是,當前 leader 是惟一有效的 replica,其餘replica都處在徹底不一樣步狀態,這樣發生 leader 切換,必定是會丟數據的,並會發生 offset 不一致。
Kafka 首先對於 zookeeper 是強依賴,因此 zookeeper 發生異常時,會對數據形成如何的影響?
若是 zookeeper dead,broker 是沒法啓動的,報以下的異常:
這種異常,有多是 zookeeper dead,也有多是網絡不通,總之就是連不上 zookeeper。
這種 case,kafka徹底不工做,直到能夠連上 zookeeper 爲止。
其實上面這種狀況比較簡單,比較麻煩的是 zookeeper hang,能夠說 kafka 的80%以上問題都是因爲這個緣由
zookeeper hang 的緣由有不少,主要是 zk 負載太重,zk 所在主機 cpu,memeory 或網絡資源不夠等
zookeeper hang 帶來的主要問題就是 session timeout,這樣會觸發以下的問題,
a. Controller Fail,Controller 發生從新選舉和切換,具體過程參考下文。
b. Broker Fail,致使partition的leader發生切換或partition offline,具體過程參考下文。
c. Broker 被 hang 住 。
這是一種比較特殊的 case,出現時在 server.log 會出現以下的log,
server.log:
「INFO I wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709 63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)」
這個問題自己是因爲 zookeeper 的一個 bug,參考:https://issues.apache.org/jira/browse/ZOOKEEPER-1740
問題在於「The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.」
即 zk 的 session 過時和 ephemeral node 刪除並非一個原子操做;
出現的case以下:
因此這裏作的處理是,在前面發現 NodeExists 時,while true 等待,一直等到 zk 從 hang 中恢復刪除該節點,而後建立新節點成功,纔算完;
這樣作的結果是這個broker也會被一直卡在這兒,等待該節點被成功建立。
Broker 的 Failover,能夠分爲兩個過程,一個是 broker failure, 一個是 broker startup。
在談failover以前,咱們先看一個更簡單的過程,就是新加一個全新的 broker:
首先明確,新加的 broker 對現存全部的 topic 和 partition,不會有任何影響;
由於一個 topic 的 partition 的全部 replica 的 assignment 狀況,在建立時就決定了,並不會自動發生變化,除非你手動的去作 reassignment。
因此新加一個 broker,所須要作的只是你們同步一下元數據,你們都知道來了一個新的 broker,當你建立新的 topic 或 partition 的時候,它會被用上。
首先明確,這裏的 broker failure,並不必定是 broker server 真正的 dead了, 只是指該 broker 所對應的 zk ephemeral node ,好比/brokers/ids/1,發生 session timeout;
固然發生這個的緣由,除了server dead,還有不少,好比網絡不通;可是咱們不關心,只要出現 sessioin timeout,咱們就認爲這個 broker 不工做了;
會出現以下的log,
controller.log:
「INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)」
「INFO [Controller 1]: Broker failure callback for 4 (kafka.controller.KafkaController)」
當一個 broker failure 會影響什麼,其實對於多 replicas 場景,通常對最終客戶沒啥影響。
只會影響哪些 leader replica 在該 broker 的 partitions; 須要從新作 leader election,若是沒法選出一個新的 leader,會致使 partition offline。
由於若是隻是 follow replica failure,不會影響 partition 的狀態,仍是能夠服務的,只是可用 replica 少了一個;須要注意的是,kafka 是不會自動補齊失敗的replica的,即壞一個少一個;
可是對於 leader replica failure,就須要從新再 elect leader,前面已經討論過,新選取出的 leader 是能夠保證 offset 一致性的;
Note: 其實這裏的一致性是有前提的,即除了 fail 的 leader,在 ISR(in-sync replicas) 裏面還存在其餘的 replica;顧名思義,ISR,就是能 catch up with leader 的 replica。
雖然 partition 在建立的時候,會分配一個 AR(assigned replicas),可是在運行的過程當中,可能會有一些 replica 因爲各類緣由沒法跟上 leader,這樣的 replica 會被從 ISR 中去除。
因此 ISR <= AR;
若是,ISR 中 沒有其餘的 replica,而且容許 unclean election,那麼能夠從 AR 中選取一個 leader,但這樣必定是丟數據的,沒法保證 offset 的一致性。
這裏的 startup,就是指 failover 中的 startup,會出現以下的log,
controller.log:
「INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)」
「INFO [Controller 1]: New broker startup callback for 3 (kafka.controller.KafkaController)」
過程也不復雜,先將該 broker 上的全部的 replica 設爲 online,而後觸發 offline partition 或 new partition 的 state 轉變爲 online;
因此 broker startup,只會影響 offline partition 或 new partition,讓他們有可能成爲 online。
那麼對於普通的已經 online partition,影響只是多一個可用的 replica,那仍是在它完成catch up,被加入 ISR 後的事。
Note: Partition 的 leader 在 broker failover 後,不會立刻自動切換回來,這樣會產生的問題是,broker間負載不均衡,由於全部的讀寫都須要經過 leader。
爲了解決這個問題,在server的配置中有個配置,auto.leader.rebalance.enable,將其設爲true;
這樣 Controller 會啓動一個 scheduler 線程,按期去爲每一個 broker 作 rebalance,即發現若是該 broker 上的 imbalance ratio 達到必定比例,就會將其中的某些 partition 的 leader,進行從新 elect 到原先的 broker 上。
前面說明過,某個 broker server 會被選出做爲 Controller,這個選舉的過程就是依賴於 zookeeper 的 ephemeral node,誰能夠先在"/controller"目錄建立節點,誰就是 controller;
因此反之,咱們也是 watch 這個目錄來判斷 Controller 是否發生 failover 或 變化。Controller 發生 failover 時,會出現以下 log:
controller.log:
「INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)」
Controller 主要是做爲 master 來仲裁 partition 的 leader 的,並維護 partition 和 replicas 的狀態機,以及相應的 zk 的 watcher 註冊;
Controller 的 failover 過程以下:
能夠看到,單純 Controller 發生 failover,是不會影響正常數據讀寫的,只是 partition 的 leader 沒法被從新選舉,若是此時有 partition 的 leader fail,會致使 partition offline;
可是 Controller 的 dead,每每是伴隨着 broker 的 dead,因此在 Controller 發生 failover 的過程當中,每每會出現 partition offline, 致使數據暫時不可用。
Kafka 提供一些工具來方便的查看信息,參考:Kafka Tools
a, 驗證topic 是否work?
最簡單的方式,就是用 producer 和 consumer console 來測試
Producer console,以下能夠往 localhost 的 topic test,插入兩條 message,
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Consumer console,以下就能夠把剛寫入的 message 讀出,
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
若是整個過程沒有報錯,ok,說明你的topic是能夠工做的
b, 再看看topic是否健康?
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
這樣會打印出 topic test 的 detail 信息,如圖,
從這個圖能夠說明幾個問題:
首先,topic 有幾個 partitions,而且 replicas factor 是多少,即有幾個 replica?
圖中分別有32個 partitions,而且每一個 partition 有兩個 replica。
再者,每一個 partition 的 replicas 都被分配到哪些 brokers 上,而且該 partition 的 leader 是誰?
好比,圖中的 partition0,replicas 被分配到 brokers 4和1上面,其中 leader replica 在 broker 1 上。
最後,是否健康?
從如下幾個方面依次代表健康程度,
c,最後就是看kafka的日誌,kafka/logs
主要是看 controller.log 和 server.log,分別記錄 controller 和 broker server 的日誌。
而後根據前面我給的每種異常的日誌,你能夠看出來究竟是出現什麼問題。
1. https://kafka.apache.org/08/design.html
2. Neha Narkhede,Hands-free Kafka Replication: A lesson in operational simplicity
3. Kafka Tools