Kafka學習之路 (三)Kafka的高可用

1、高可用的由來

1.1 爲什麼須要Replication

在Kafka在0.8之前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上全部的Partition數據都不可被消費,這與Kafka數據持久性及Delivery Guarantee的設計目標相悖。同時Producer都不能再將數據存於這些Partition中。html

若是Producer使用同步模式則Producer會在嘗試從新發送message.send.max.retries(默認值爲3)次後拋出Exception,用戶能夠選擇中止發送後續數據也可選擇繼續選擇發送。而前者會形成數據的阻塞,後者會形成本應發往該Broker的數據的丟失。node

若是Producer使用異步模式,則Producer會嘗試從新發送message.send.max.retries(默認值爲3)次後記錄該異常並繼續發送後續數據,這會形成數據丟失而且用戶只能經過日誌發現該問題。同時,Kafka的Producer並未對異步模式提供callback接口。算法

因而可知,在沒有Replication的狀況下,一旦某機器宕機或者某個Broker中止工做則會形成整個系統的可用性下降。隨着集羣規模的增長,整個集羣中出現該類異常的概率大大增長,所以對於生產系統而言Replication機制的引入很是重要。apache

1.2 Leader Election

引入Replication以後,同一個Partition可能會有多個Replica,而這時須要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica做爲Follower從Leader中複製數據。網絡

由於須要保證同一個Partition的多個Replica之間的數據一致性(其中一個宕機後其它Replica必需要能繼續服務而且即不能形成數據重複也不能形成數據丟失)。若是沒有一個Leader,全部Replica均可同時讀/寫數據,那就須要保證多個Replica之間互相(N×N條通路)同步數據,數據的一致性和有序性很是難保證,大大增長了Replication實現的複雜性,同時也增長了出現異常的概率。而引入Leader後,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。session

2、Kafka HA設計解析

2.1 如何將全部Replica均勻分佈到整個集羣

爲了更好的作負載均衡,Kafka儘可能將全部的Partition均勻分配到整個集羣上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。同時爲了提升Kafka的容錯能力,也須要將同一個Partition的Replica儘可能分散到不一樣的機器。實際上,若是全部的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的全部Replica都沒法工做,也就達不到HA的效果。同時,若是某個Broker宕機了,須要保證它上面的負載能夠被均勻的分配到其它倖存的全部Broker上。多線程

Kafka分配Replica的算法以下:app

一、 將全部Broker(假設共n個Broker)和待分配的Partition排序
二、 將第i個Partition分配到第(i mod n)個Broker上
三、 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上負載均衡

2.2 Data Replication(副本策略)

Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。異步

2.2.1 消息傳遞同步策略

Producer在發佈消息到某個Partition時,先經過ZooKeeper找到該Partition的Leader,而後不管該Topic的Replication Factor爲多少,Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每一個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的全部Replica的ACK,該消息就被認爲已經commit了,Leader將增長HW而且向Producer發送ACK。

爲了提升性能,每一個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。所以,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能徹底保證異常發生後該條消息必定能被Consumer消費。

Consumer讀消息也是從Leader讀取,只有被commit過的消息纔會暴露給Consumer。

Kafka Replication的數據流以下圖所示:
Kafka學習之路 (三)Kafka的高可用

2.2.2 ACK前須要保證有多少個備份

對於Kafka而言,定義一個Broker是否「活着」包含兩個條件:

一是它必須維護與ZooKeeper的session(這個經過ZooKeeper的Heartbeat機制來實現)。
二是Follower必須可以及時將Leader的消息複製過來,不能「落後太大」。

Leader會跟蹤與其保持同步的Replica列表,該列表稱爲ISR(即in-sync Replica)。若是一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裏所描述的「落後太多」指Follower複製的消息落後於Leader後的條數超過預約值(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.max.messages配置,其默認值是4000)或者Follower超過必定時間(該值可在$KAFKA_HOME/config/server.properties中經過replica.lag.time.max.ms來配置,其默認值是10000)未向Leader發送fetch請求。

Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,徹底同步複製要求全部能工做的Follower都複製完,這條消息纔會被認爲commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka很是重要的一個特性)。而異步複製方式下,Follower異步的從Leader複製數據,數據只要被Leader寫入log就被認爲已經commit,這種狀況下若是Follower都複製完都落後於Leader,而若是Leader忽然宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower能夠批量的從Leader複製數據,這樣極大的提升複製性能(批量寫磁盤),極大減小了Follower與Leader的差距。

須要說明的是,Kafka只解決fail/recover,不處理「Byzantine」(「拜占庭」)問題。一條消息只有被ISR裏的全部Follower都從Leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而形成數據丟失(Consumer沒法消費這些數據)。而對於Producer而言,它能夠選擇是否等待消息commit,這能夠經過request.required.acks來設置。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。

2.2.3 Leader Election算法

Leader選舉本質上是一個分佈式鎖,有兩種方式實現基於ZooKeeper的分佈式鎖:

節點名稱惟一性:多個客戶端建立一個節點,只有成功建立節點的客戶端才能得到鎖
臨時順序節點:全部客戶端在某個目錄下建立本身的臨時順序節點,只有序號最小的纔得到鎖

一種很是經常使用的選舉leader的方式是「Majority Vote」(「少數服從多數」),但Kafka並未採用這種方式。這種模式下,若是咱們有2f+1個Replica(包含Leader和Follower),那在commit以前必須保證有f+1個Replica複製完消息,爲了保證正確選出新的Leader,fail的Replica不能超過f個。由於在剩下的任意f+1個Replica裏,至少有一個Replica包含有最新的全部消息。這種方式有個很大的優點,系統的latency只取決於最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,爲了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。若是要容忍1個follower掛掉,必需要有3個以上的Replica,若是要容忍2個Follower掛掉,必需要有5個以上的Replica。也就是說,在生產環境下爲了保證較高的容錯程度,必需要有大量的Replica,而大量的Replica又會在大數據量下致使性能的急劇降低。這就是這種算法更多用在ZooKeeper這種共享集羣配置的系統中而不多在須要存儲大量數據的系統中使用的緣由。例如HDFS的HA Feature是基於majority-vote-based journal,可是它的數據存儲並無使用這種方式。

Kafka在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR裏的全部Replica都跟上了leader,只有ISR裏的成員纔有被選爲Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是很是有利的。事實上,爲了容忍f個Replica的失敗,Majority Vote和ISR在commit前須要等待的Replica數量是同樣的,可是ISR須要的總的Replica的個數幾乎是Majority Vote的一半。

雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優點,可是Kafka做者認爲Kafka能夠經過Producer選擇是否被commit阻塞來改善這一問題,而且節省下來的Replica和磁盤使得ISR模式仍然值得。

2.2.4 如何處理全部Replica都不工做

在ISR中至少有一個follower時,Kafka能夠確保已經commit的數據不丟失,但若是某個Partition的全部Replica都宕機了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:

一、 等待ISR中的任一個Replica「活」過來,而且選它做爲Leader
二、 選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader

這就須要在可用性和一致性當中做出一個簡單的折衷。若是必定要等待ISR中的Replica「活」過來,那不可用的時間就可能會相對較長。並且若是ISR中的全部Replica都沒法「活」過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個「活」過來的Replica做爲Leader,而這個Replica不是ISR中的Replica,那即便它並不保證已經包含了全部已commit的消息,它也會成爲Leader而做爲consumer的數據源(前文有說明,全部讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在之後的版本中,Kafka支持用戶經過配置選擇這兩種方式中的一種,從而根據不一樣的使用場景選擇高可用性仍是強一致性。

2.2.5 選舉Leader

最簡單最直觀的方案是,全部Follower都在ZooKeeper上設置一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時全部Follower都嘗試建立該節點,而建立成功者(ZooKeeper保證只有一個能建立成功)便是新的Leader,其它Replica即爲Follower。

可是該方法會有3個問題:

一、 split-brain 這是由ZooKeeper的特性引發的,雖然ZooKeeper能保證全部Watch按順序觸發,但並不能保證同一時刻全部Replica「看」到的狀態是同樣的,這就可能形成不一樣Replica的響應不一致
二、 herd effect 若是宕機的那個Broker上的Partition比較多,會形成多個Watch被觸發,形成集羣內大量的調整
三、 ZooKeeper負載太重 每一個Replica都要爲此在ZooKeeper上註冊一個Watch,當集羣規模增長到幾千個Partition時ZooKeeper負載會太重。

Kafka 0.8.*的Leader Election方案解決了上述問題,它在全部broker中選出一個controller,全部Partition的Leader選舉都由controller決定。controller會將Leader的改變直接經過RPC的方式(比ZooKeeper Queue的方式更高效)通知需爲爲此做爲響應的Broker。同時controller也負責增刪Topic以及Replica的從新分配。

3、HA相關ZooKeeper結構

Kafka學習之路 (三)Kafka的高可用

3.1 admin

該目錄下znode只有在有相關操做時纔會存在,操做結束時會將其刪除

/admin/reassign_partitions用於將一些Partition分配到不一樣的broker集合上。對於每一個待從新分配的Partition,Kafka會在該znode上存儲其全部的Replica和相應的Broker id。該znode由管理進程建立而且一旦從新分配成功它將會被自動移除。

3.2 broker

即/brokers/ids/[brokerId])存儲「活着」的broker信息。

topic註冊信息(/brokers/topics/[topic]),存儲該topic的全部partition的全部replica所在的broker id,第一個replica即爲preferred replica,對一個給定的partition,它在同一個broker上最多隻有一個replica,所以broker id可做爲replica id。

3.3 controller

/controller -> int (broker id of the controller)存儲當前controller的信息。
/controller_epoch -> int (epoch)直接以整數形式存儲controller epoch,而非像其它znode同樣以JSON字符串形式存儲。

4、producer發佈消息

4.1 寫入方式

producer 採用 push 模式將消息發佈到 broker,每條消息都被 append 到 patition 中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。

4.2 消息路由

producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪個 partition。其路由機制爲:

一、 指定了 patition,則直接使用;
二、 未指定 patition 但指定 key,經過對 key 的 value 進行hash 選出一個 patition
三、 patition 和 key 都未指定,使用輪詢選出一個 patition。

4.3 寫入流程

producer 寫入消息序列圖以下所示:
Kafka學習之路 (三)Kafka的高可用
流程說明:

一、 producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
二、 producer 將消息發送給該 leader
三、 leader 將消息寫入本地 log
四、 followers 從 leader pull 消息,寫入本地 log 後 leader 發送 ACK
五、 leader 收到全部 ISR 中的 replica 的 ACK 後,增長 HW(high watermark,最後 commit 的 offset) 並向 producer 發送 ACK

5、broker保存消息

5.1 存儲方式

物理上把 topic 分紅一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每一個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的全部消息和索引文件),以下:
Kafka學習之路 (三)Kafka的高可用
不管消息是否被消費,kafka 都會保留全部消息。有兩種策略能夠刪除舊數據:

一、 基於時間:log.retention.hours=168
二、 基於大小:log.retention.bytes=1073741824

6、Topic的建立和刪除

6.1 建立topic

建立 topic 的序列圖以下所示
Kafka學習之路 (三)Kafka的高可用
流程說明:

一、 controller 在 ZooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被建立,則 controller 會經過 watch 獲得該 topic 的 partition/replica 分配。
二、 controller從 /brokers/ids 讀取當前全部可用的 broker 列表,對於 set_p 中的每個 partition:
2.一、 從分配給該 partition 的全部 replica(稱爲AR)中任選一個可用的 broker 做爲新的 leader,並將AR設置爲新的 ISR
2.二、 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state
三、 controller 經過 RPC 向相關的 broker 發送 LeaderAndISRRequest。

6.2 刪除topic

刪除 topic 的序列圖以下所示:
Kafka學習之路 (三)Kafka的高可用
流程說明:

一、 controller 在 zooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被刪除,則 controller 會經過 watch 獲得該 topic 的 partition/replica 分配。
二、 若 delete.topic.enable=false,結束;不然 controller 註冊在 /admin/delete_topics 上的 watch 被 fire,controller 經過回調向對應的 broker 發送 StopReplicaRequest。

7、broker failover

kafka broker failover 序列圖以下所示:
Kafka學習之路 (三)Kafka的高可用
流程說明

一、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點註冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
二、 controller 從 /brokers/ids 節點讀取可用broker
三、 controller決定set_p,該集合包含宕機 broker 上的全部 partition
四、 對 set_p 中的每個 partition
4.一、 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR
4.二、 決定新 leader
4.三、 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點
五、 經過 RPC 向相關 broker 發送 leaderAndISRRequest 命令

8、controller failover

當 controller 宕機時會觸發 controller failover。每一個 broker 都會在 zookeeper 的 "/controller" 節點註冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,全部存活的 broker 收到 fire 的通知,每一個 broker 都嘗試建立新的 controller path,只有一個競選成功並當選爲 controller。

當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成以下操做:

一、 讀取並增長 Controller Epoch。
二、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上註冊 watcher。
三、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上註冊 watcher。
四、 經過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上註冊 watcher。
五、 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上註冊 watcher。
六、 經過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上註冊Watch。
七、 初始化 ControllerContext 對象,設置當前全部 topic,「活」着的 broker 列表,全部 partition 的 leader 及 ISR等。
八、 啓動 replicaStateMachine 和 partitionStateMachine。
九、 將 brokerState 狀態設置爲 RunningAsController。
十、 將每一個 partition 的 Leadership 信息發送給全部「活」着的 broker。
十一、 若 auto.leader.rebalance.enable=true(默認值是true),則啓動 partition-rebalance 線程。
十二、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。

9、consumer 消費消息

9.1 consumer API

kafka 提供了兩套 consumer API:

一、 The high-level Consumer API
二、 The SimpleConsumer API

其中 high-level consumer API 提供了一個從 kafka 消費數據的高層抽象,而 SimpleConsumer API 則須要開發人員更多地關注細節。

9.1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內的一個 consumer 所消費,且 consumer 消費消息時不關注 offset,最後一個 offset 由 zookeeper 保存。

使用 high-level consumer API 能夠是多線程的應用,應當注意:

一、 若是消費線程大於 patition 數量,則有些線程將收不到消息
二、 若是 patition 數量大於線程數,則有些線程多收到多個 patition 的消息
三、 若是一個線程消費多個 patition,則沒法保證你收到的消息的順序,而一個 patition 內的消息是有序的

9.1.2 The SimpleConsumer API

若是你想要對 patition 有更多的控制權,那就應該使用 SimpleConsumer API,好比:

一、 屢次讀取一個消息
二、 只消費一個 patition 中的部分消息
三、 使用事務來保證一個消息僅被消費一次

可是使用此 API 時,partition、offset、broker、leader 等對你再也不透明,須要本身去管理。你須要作大量的額外工做:

一、 必須在應用程序中跟蹤 offset,從而肯定下一條應該消費哪條消息
二、 應用程序須要經過程序獲知每一個 Partition 的 leader 是誰
三、 須要處理 leader 的變動

使用 SimpleConsumer API 的通常流程以下:

一、 查找到一個「活着」的 broker,而且找出每一個 partition 的 leader
二、 找出每一個 partition 的 follower
三、 定義好請求,該請求應該能描述應用程序須要哪些數據
四、 fetch 數據
五、 識別 leader 的變化,並對之做出必要的響應

如下針對 high-level Consumer API 進行說明。

9.2 consumer group

如(一)kafka的簡介所說,分配單位是 patition。每一個 consumer 都屬於一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個消息只能被 group 內的一個 consuemr 所消費),可是多個 group 能夠同時消費這個 partition。

kafka 的設計目標之一就是同時實現離線處理和實時處理,根據這一特性,可使用 spark/Storm 這些實時處理系統對消息在線處理,同時使用 Hadoop 批處理系統進行離線處理,還能夠將數據備份到另外一個數據中心,只須要保證這三者屬於不一樣的 consumer group。以下圖所示:
Kafka學習之路 (三)Kafka的高可用

9.3 消費方式

consumer 採用 pull 模式從 broker 中讀取數據。

push 模式很難適應消費速率不一樣的消費者,由於消息發送速率是由 broker 決定的。它的目標是儘量以最快速度傳遞消息,可是這樣很容易形成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則能夠根據 consumer 的消費能力以適當的速率消費消息。

對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。

9.4 consumer delivery guarantee

若是將 consumer 設置爲 autocommit,consumer 一旦讀到數據當即自動 commit。若是隻討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。

但實際使用中應用程序並不是在 consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:

一、 讀完消息先 commit 再處理消息。
這種模式下,若是 consumer 在 commit 後還沒來得及處理消息就 crash 了,下次從新開始工做後就沒法讀到剛剛已提交而未處理的消息,這就對應於 At most once
二、 讀完消息先處理再 commit。
這種模式下,若是在處理完消息以後 commit 以前 consumer crash 了,下次從新開始工做時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應於 At least once。
三、 若是必定要作到 Exactly once,就須要協調 offset 和實際操做的輸出。
精典的作法是引入兩階段提交。若是能讓 offset 和操做輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,由於許多輸出系統可能不支持兩階段提交。好比,consumer 拿到數據後可能把數據放到 HDFS,若是把最新的 offset 和數據自己一塊兒寫到 HDFS,那就能夠保證數據的輸出和 offset 的更新要麼都完成,要麼都不完成,間接實現 Exactly once。(目前就 high-level API而言,offset 是存於Zookeeper 中的,沒法存於HDFS,而SimpleConsuemr API的 offset 是由本身去維護的,能夠將之存於 HDFS 中)

總之,Kafka 默認保證 At least once,而且容許經過設置 producer 異步提交來實現 At most once(見文章《kafka consumer防止數據丟失》)。而 Exactly once 要求與外部存儲系統協做,幸運的是 kafka 提供的 offset 能夠很是直接很是容易得使用這種方式。

更多關於 kafka 傳輸語義的信息請參考《Message Delivery Semantics》

9.5 consumer rebalance

當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。consumer rebalance算法以下:

一、 將目標 topic 下的全部 partirtion 排序,存於PT
二、 對某 consumer group 下全部 consumer 排序,存於 CG,第 i 個consumer 記爲 Ci
三、 N=size(PT)/size(CG),向上取整
四、 解除 Ci 對原來分配的 partition 的消費權(i從0開始)
五、 將第iN到(i+1)N-1個 partition 分配給 Ci

相關文章
相關標籤/搜索