Kafka在0.8之前的版本中,並不提供High Availablity機制,一旦一個或多個Broker宕機,則宕機期間其上全部Partition都沒法繼續提供服務。若該Broker永遠不能再恢 復,亦或磁盤故障,則其上數據將丟失。而Kafka的設計目標之一便是提供數據持久化,同時對於分佈式系統來講,尤爲當集羣規模上升到必定程度後,一臺或 者多臺機器宕機的可能性大大提升,對Failover要求很是高。所以,Kafka從0.8開始提供High Availability機制。本文從Data Replication和Leader Election兩方面介紹了Kafka的HA機制。node
在Kafka在0.8之前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上全部的Partition數據都不可被消 費,這與Kafka數據持久性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將數據存於這些Partition中。web
若是Producer使用同步模式則Producer會在嘗試從新發送message.send.max.retries
(默認值爲3)次後拋出Exception,用戶能夠選擇中止發送後續數據也可選擇繼續選擇發送。而前者會形成數據的阻塞,後者會形成本應發往該Broker的數據的丟失。算法
若是Producer使用異步模式,則Producer會嘗試從新發送message.send.max.retries
(默認值爲3)次後記錄該異常並繼續發送後續數據,這會形成數據丟失而且用戶只能經過日誌發現該問題。同時,Kafka的Producer並未對異步模式提供callback接口。apache
因而可知,在沒有Replication的狀況下,一旦某機器宕機或者某個Broker中止工做則會形成整個系統的可用性下降。隨着集羣規模的增長,整個集羣中出現該類異常的概率大大增長,所以對於生產系統而言Replication機制的引入很是重要。
網絡
注意:本文所述Leader Election主要指Replica之間的Leader Election。session
引入Replication以後,同一個Partition可能會有多個Replica,而這時須要在這些Replication之間選出一個 Leader,Producer和Consumer只與這個Leader交互,其它Replica做爲Follower從Leader中複製數據。數據結構
由於須要保證同一個Partition的多個Replica之間的數據一致性(其中一個宕機後其它Replica必需要能繼續服務而且即不能形成數 據重複也不能形成數據丟失)。若是沒有一個Leader,全部Replica均可同時讀/寫數據,那就須要保證多個Replica之間互相(N×N條通 路)同步數據,數據的一致性和有序性很是難保證,大大增長了Replication實現的複雜性,同時也增長了出現異常的概率。而引入Leader後,只 有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。app
爲了更好的作負載均衡,Kafka儘可能將全部的Partition均勻分配到整個集羣上。一個典型的部署方式是一個Topic的Partition 數量大於Broker的數量。同時爲了提升Kafka的容錯能力,也須要將同一個Partition的Replica儘可能分散到不一樣的機器。實際上,若是 全部的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的全部Replica都沒法工做,也就達不到HA的效 果。同時,若是某個Broker宕機了,須要保證它上面的負載能夠被均勻的分配到其它倖存的全部Broker上。負載均衡
Kafka分配Replica的算法以下:異步
將全部Broker(假設共n個Broker)和待分配的Partition排序
將第i個Partition分配到第(i mod n)個Broker上
將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
Kafka的Data Replication須要解決以下問題:
怎樣Propagate消息
在向Producer發送ACK前須要保證有多少個Replica已經收到該消息
怎樣處理某個Replica不工做的狀況
怎樣處理Failed Replica恢復回來的狀況
Producer在發佈消息到某個Partition時,先經過ZooKeeper找到該Partition的Leader,而後不管該Topic 的Replication Factor爲多少(也即該Partition有多少個Replica),Producer只將該消息發送到該Partition的Leader。 Leader會將該消息寫入其本地Log。每一個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送 ACK。一旦Leader收到了ISR中的全部Replica的ACK,該消息就被認爲已經commit了,Leader將增長HW而且向 Producer發送ACK。
爲了提升性能,每一個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。所以,對於已經commit的消 息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能徹底保證異常發生後該條消息必定能被 Consumer消費。但考慮到這種場景很是少見,能夠認爲這種方式在性能和數據持久化上作了一個比較好的平衡。在未來的版本中,Kafka會考慮提供更 高的持久性。
Consumer讀消息也是從Leader讀取,只有被commit過的消息(offset低於HW的消息)纔會暴露給Consumer。
Kafka Replication的數據流以下圖所示:
和大部分分佈式系統同樣,Kafka處理失敗須要明肯定義一個Broker是否「活着」。對於Kafka而言,Kafka存活包含兩個條件,一是它 必須維護與ZooKeeper的session(這個經過ZooKeeper的Heartbeat機制來實現)。二是Follower必須可以及時將 Leader的消息複製過來,不能「落後太多」。
Leader會跟蹤與其保持同步的Replica列表,該列表稱爲ISR(即in-sync Replica)。若是一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裏所描述的「落後太多」指Follower複製的 消息落後於Leader後的條數超過預約值(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.max.messages
配置,其默認值是4000)或者Follower超過必定時間(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.time.max.ms
來配置,其默認值是10000)未向Leader發送fetch請求。
Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,徹底同步複製要求全部能工做的Follower都複製完,這條消息纔會 被認爲commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka很是重要的一個特性)。而異步複製方式下,Follower異步的從 Leader複製數據,數據只要被Leader寫入log就被認爲已經commit,這種狀況下若是Follower都複製完都落後於Leader,而如 果Leader忽然宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower能夠批量的從 Leader複製數據,這樣極大的提升複製性能(批量寫磁盤),極大減小了Follower與Leader的差距。
須要說明的是,Kafka只解決fail/recover,不處理「Byzantine」(「拜占庭」)問題。一條消息只有被ISR裏的全部 Follower都從Leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower複製就宕機 了,而形成數據丟失(Consumer沒法消費這些數據)。而對於Producer而言,它能夠選擇是否等待消息commit,這能夠經過request.required.acks
來設置。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。
上文說明了Kafka是如何作Replication的,另一個很重要的問題是當Leader宕機了,怎樣在Follower中選舉出新的 Leader。由於Follower可能落後許多或者crash了,因此必須確保選擇「最新」的Follower做爲新的Leader。一個基本的原則就 是,若是Leader不在了,新的Leader必須擁有原來的Leader commit過的全部消息。這就須要做一個折衷,若是Leader在標明一條消息被commit前等待更多的Follower確認,那在它宕機以後就有更 多的Follower能夠做爲新的Leader,但這也會形成吞吐率的降低。
一種很是經常使用的選舉leader的方式是「Majority Vote」(「少數服從多數」),但Kafka並未採用這種方式。這種模式下,若是咱們有2f+1個Replica(包含Leader和 Follower),那在commit以前必須保證有f+1個Replica複製完消息,爲了保證正確選出新的Leader,fail的Replica不 能超過f個。由於在剩下的任意f+1個Replica裏,至少有一個Replica包含有最新的全部消息。這種方式有個很大的優點,系統的latency 只取決於最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,爲了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。若是要容忍1個follower掛掉,必需要有3個以上的 Replica,若是要容忍2個Follower掛掉,必需要有5個以上的Replica。也就是說,在生產環境下爲了保證較高的容錯程度,必需要有大量 的Replica,而大量的Replica又會在大數據量下致使性能的急劇降低。這就是這種算法更多用在ZooKeeper這種共享集羣配置的系統中而不多在須要存儲大量數據的系統中使用的緣由。例如HDFS的HA Feature是基於majority-vote-based journal,可是它的數據存儲並無使用這種方式。
實際上,Leader Election算法很是多,好比ZooKeeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的Leader Election算法更像微軟的PacificA算法。
Kafka在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR裏的全部Replica都跟上了leader,只有ISR裏的成員纔有被選爲Leader的可能。在這種模式下,對於 f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種 模式是很是有利的。事實上,爲了容忍f個Replica的失敗,Majority Vote和ISR在commit前須要等待的Replica數量是同樣的,可是ISR須要的總的Replica的個數幾乎是Majority Vote的一半。
雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優點,可是Kafka做者認爲Kafka能夠經過Producer選擇是否被commit阻塞來改善這一問題,而且節省下來的Replica和磁盤使得ISR模式仍然值得。
上文提到,在ISR中至少有一個follower時,Kafka能夠確保已經commit的數據不丟失,但若是某個Partition的全部Replica都宕機了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:
等待ISR中的任一個Replica「活」過來,而且選它做爲Leader
選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader
這就須要在可用性和一致性當中做出一個簡單的折衷。若是必定要等待ISR中的Replica「活」過來,那不可用的時間就可能會相對較長。並且若是 ISR中的全部Replica都沒法「活」過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個「活」過來的Replica做爲 Leader,而這個Replica不是ISR中的Replica,那即便它並不保證已經包含了全部已commit的消息,它也會成爲Leader而做爲 consumer的數據源(前文有說明,全部讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在之後的版本 中,Kafka支持用戶經過配置選擇這兩種方式中的一種,從而根據不一樣的使用場景選擇高可用性仍是強一致性。
最簡單最直觀的方案是,全部Follower都在ZooKeeper上設置一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時全部Follower都嘗試建立該節點,而建立成功者(ZooKeeper保證只有一個能建立成功)便是新的Leader,其 它Replica即爲Follower。
可是該方法會有3個問題:
split-brain 這是由ZooKeeper的特性引發的,雖然ZooKeeper能保證全部Watch按順序觸發,但並不能保證同一時刻全部Replica「看」到的狀態是同樣的,這就可能形成不一樣Replica的響應不一致
herd effect 若是宕機的那個Broker上的Partition比較多,會形成多個Watch被觸發,形成集羣內大量的調整
ZooKeeper負載太重 每一個Replica都要爲此在ZooKeeper上註冊一個Watch,當集羣規模增長到幾千個Partition時ZooKeeper負載會太重。
Kafka 0.8.*的Leader Election方案解決了上述問題,它在全部broker中選出一個controller,全部Partition的Leader選舉都由 controller決定。controller會將Leader的改變直接經過RPC的方式(比ZooKeeper Queue的方式更高效)通知需爲爲此做爲響應的Broker。同時controller也負責增刪Topic以及Replica的從新分配。
首先聲明本節所示ZooKeeper結構中,實線框表明路徑名是固定的,而虛線框表明路徑名與業務相關
admin (該目錄下znode只有在有相關操做時纔會存在,操做結束時會將其刪除)
/admin/preferred_replica_election數據結構
{ "fields":[ { "name":"version", "type":"int", "doc":"version id" }, { "name":"partitions", "type":{ "type":"array", "items":{ "fields":[ { "name":"topic", "type":"string", "doc":"topic of the partition for which preferred replica election should be triggered" }, { "name":"partition", "type":"int", "doc":"the partition for which preferred replica election should be triggered" } ], } "doc":"an array of partitions for which preferred replica election should be triggered" } } ] } Example: { "version": 1, "partitions": [ { "topic": "topic1", "partition": 8 }, { "topic": "topic2", "partition": 16 } ] }
/admin/reassign_partitions
用於將一些Partition分配到不一樣的broker集合上。 對於每一個待從新分配的Partition,Kafka會在該znode上存儲其全部的Replica和相應的Broker id。該znode由管理進程建立而且一旦從新分配成功它將會被自動移除。其數據結構以下:
{ "fields":[ { "name":"version", "type":"int", "doc":"version id" }, { "name":"partitions", "type":{ "type":"array", "items":{ "fields":[ { "name":"topic", "type":"string", "doc":"topic of the partition to be reassigned" }, { "name":"partition", "type":"int", "doc":"the partition to be reassigned" }, { "name":"replicas", "type":"array", "items":"int", "doc":"a list of replica ids" } ], } "doc":"an array of partitions to be reassigned to new replicas" } } ] }
Example: { "version": 1, "partitions": [ { "topic": "topic3", "partition": 1, "replicas": [1, 2, 3] } ] }
/admin/delete_topics數據結構:
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "topics", "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"} } ] } Example: { "version": 1, "topics": ["topic4", "topic5"] }
brokers
broker(即/brokers/ids/[brokerId]
)存儲「活着」的broker信息。數據結構以下:
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "host", "type": "string", "doc": "ip address or host name of the broker"}, {"name": "port", "type": "int", "doc": "port of the broker"}, {"name": "jmx_port", "type": "int", "doc": "port for jmx"} ] } Example: { "jmx_port":-1, "host":"node1", "version":1, "port":9092 }
topic註冊信息(/brokers/topics/[topic]
),存儲該topic的全部partition的 全部replica所在的broker id,第一個replica即爲preferred replica,對一個給定的partition,它在同一個broker上最多隻有一個replica,所以broker id可做爲replica id。數據結構以下:
Schema: { "fields" : [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "partitions", "type": {"type": "map", "values": {"type": "array", "items": "int", "doc": "a list of replica ids"}, "doc": "a map from partition id to replica list"}, } ] } Example: { "version":1, "partitions": {"12":[6], "8":[2], "4":[6], "11":[5], "9":[3], "5":[7], "10":[4], "6":[8], "1":[3], "0":[2], "2":[4], "7":[1], "3":[5]} }
partition state(/brokers/topics/[topic]/partitions/[partitionId]/state
) 結構以下:
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "isr", "type": {"type": "array", "items": "int", "doc": "an array of the id of replicas in isr"} }, {"name": "leader", "type": "int", "doc": "id of the leader replica"}, {"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"}, {"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"} ] } Example: { "controller_epoch":29, "leader":2, "version":1, "leader_epoch":48, "isr":[2] }
controller /controller -> int (broker id of the controller)
存儲當前controller的信息
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "brokerid", "type": "int", "doc": "broker id of the controller"} ] } Example: { "version":1, "brokerid":8 }
/controller_epoch -> int (epoch)
直接以整數形式存儲controller epoch,而非像其它znode同樣以JSON字符串形式存儲。
Controller在ZooKeeper註冊Watch,一旦有Broker宕機(這是用宕機表明任何讓系統認爲其die的情景,包括但不 限於機器斷電,網絡不可用,GC致使的Stop The World,進程crash等),其在ZooKeeper對應的znode會自動被刪除,ZooKeeper會fire Controller註冊的watch,Controller讀取最新的倖存的Broker。
Controller決定set_p,該集合包含了宕機的全部Broker上的全部Partition。
對set_p中的每個Partition
3.1 從/brokers/topics/[topic]/partitions/[partition]/state
讀取該Partition當前的ISR
3.2 決定該Partition的新Leader。若是當前ISR中有至少一個Replica還倖存,則選擇其中一個做爲新Leader,新的ISR則包含當前 ISR中全部倖存的Replica。不然選擇該Partition中任意一個倖存的Replica做爲新的Leader以及ISR(該場景下可能會有潛在 的數據丟失)。若是該Partition的全部Replica都宕機了,則將新的Leader設置爲-1。
3.3 將新的Leader,ISR和新的leader_epoch
及controller_epoch
寫入/brokers/topics/[topic]/partitions/[partition]/state
。注意,該操做只有其version在3.1至3.3的過程當中無變化時纔會執行,不然跳轉到3.1
直接經過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller能夠在一個RPC操做中發送多個命令從而提升效率。
broker failover順序圖以下所示。