ISR:一個Partition中的Leader的全部follower(replication)集合。node
AR:分配給此Partition的全部replication,統稱爲Assigned Replicas(AR)。ISR是AR的子集。算法
Controller:broker中的Leader。併發
HW:消費端能看到的broker上消息的位置,客戶端只能消費到HW位置。負載均衡
LEO:消息在log文件中最新的位置。異步
segment:replication對應一個文件夾中,實際存消息的文件socket
多個broker組成了kafka集羣。性能
1. Broker均可以向producer提供metadata信息,metadata包括:集羣中存活的broker列表;partitions leader列表等信息。當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層"。spa
2. 使用zookeeper用來註冊broker信息,監測partition leader存活性。線程
一個consumer group下,不管有多少個consumer,這個group必定是把這個topic下全部的partition都消費了。設計
1) 當group中的consumer數量小於topic的partition數量:一個consumer消費多個partition。
2)當group中的consumer數量等於topic的partition數量:一個consumer消費一個partition。這時的效率最高。
3)當group中的consumer數量大於topic的partition數量:這時就會有consumer空閒,形成資源的浪費。
在設置consumer group時,只須要指定裏面consumer數量,consumer會自動進行rebalance。
Consumer rebalance觸發條件:
1)Consumer增長或刪除會觸發 Consumer Group的Rebalance。
2)Broker的增長或者減小都會觸發 Consumer Rebalance。Consumer消費partition中的message是順序讀取,因此必需要維護上次讀取message的位置。
1、舊版本的(在0.8.2以前):
1)high level API:offset存在於zookeeper中: Consumer默認是讀完message先commmit再處理message,autocommit默認是是true,這時候先commit就會更新offset+1,一旦處理失敗,offset已經+1,這個時候就會丟message;若是還有offset+1。那麼consumer重啓後就會重複消費這個message。也能夠配置成讀完消息處理再commit,這種狀況下consumer端的響應就會比較慢的,須要等處理完才行。
2) low level API:由應用本身維護offset。
2、0.8.2以後:
1) 新的Comsumer API再也不有high-level、low-level之分了,而是本身維護offset。這樣作的好處是避免應用出現異常時,數據未消費成功,但Position已經提交,致使消息未消費的狀況發生。默認將消費的offset遷入到kafka一個名爲__consumer_offsets 的Topic中。原理:利用kafka自身的topic,以消費的group,topic以及partition作爲組合key,全部的消費offset都提交寫入到名爲__consumer_offsets 的Topic中。
Consumer消費的message是在topic下的某個partition的leader上。
當Consumer啓動時,觸發的操做:
1)進行"Consumer id Registry"。
2)在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管失效consumer的partition)。
3)在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance。
使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。
1)將全部Broker(假設共n個Broker)和待分配的Partition排序。
2)將第i個Partition分配到第(i mod n)個Broker上。
3)將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上。
一個Partition的一個replication對應一個文件夾,文件夾中包含索引文件(每一個segment的offset範圍)和多個segment文件(以第一條消息的offset命名)。
Partition和replication的關係:
topic:
Kafka的高可用性(HA)體如今:replication和Leader election(replication之間)。
在沒有replication的狀況下,由於partition在某一臺broker上,一旦某一個broker宕機,那麼這臺broker上的Partition的數據都不能被消費,同時producer也不能提交消息到此topic的該Partition下。這樣就違背了kafka的高可用性。因此引入了Replication機制。特別是在生產環境中。
引入Replication以後,那麼一個partition就會有多個Replication,由於不一樣的Replication存在於不一樣的broker中,這樣在一臺broker宕機後,在其餘broker上的該partition的Replication還能夠提供服務。這樣就保持了kafka的高可用。
有了Replication就必須考慮數據的一致性,這樣才能保證在一個broker宕機後其餘的Replication提供服務的時候數據不會丟失。kafka多個Replication,引入Leader,Producer和Consumer只與這個Leader打交道,其餘的Replication做爲此leader的follower,而且從leader拉取數據。若是不存在Leaader的話,全部的Replication都是能夠同時讀/寫的,就必須保證多個Replication之間的數據同步,它們直接就要互相的同步數據(N*N條路)。這樣的設計就至關的複雜,因此引入Leader後,讓Leader來負責讀/寫,其餘的Replication做爲follower就從leader同步數據,這樣高效簡單。
Broker是否還存活必須知足2個條件:第一:必須維護與ZK的心跳機制,第二:上面的Follower必須能快速的從它的Leader那邊同步消息過來,不能落後Leader太多。Leader會跟蹤與其保持同步的Replication列表(ISR:in-sync Replication)。若是Follower宕機或者拖後太多,Leader將會把它從ISR列表中移除。
消息複製機制若是使用同步機制的話,就要求Leader全部的Follower都完成複製(Follower從Leader中pull數據,Follower收到數據並寫入到log後,向Leader發送ACK),這樣很影響吞吐率。
消息複製機制若是採用異步複製機制,Follower異步從Leader複製數據,數據只要被Leader寫入log就認爲消息是commit的狀態,這種狀況下若是全部的Follower都落後與Leader,那麼在Leader宕機後,消息就會出現丟失。
Producer發佈消息到topic的時候(實際上是發送到Partition中),先經過zookeeper找到該Partition的Leader。Producer只把消息發送到該Partition的Leader中。Leader會將消息寫入其本地的log,每一個Follower都從Leader pull數據,這樣Follower存儲信息的順序就和Leader同樣了,Follower在pull到消息後也會把消息存放到本身的log中,向Leader發送ACK。一旦Leader收到了全部ISR列表中的Replication的ACK,這條消息就是已經commit(能夠被消費了)的了,這時Leader將增長HW並想Producer發送ACK。這樣有個問題,就是Leader要等到全部ISR中Replication的ACK,那麼在ISR列表不少或者其中一個的ACK回來的比較慢的時候,這樣就是影響總體的吞吐率。爲了提升性能,每一個Follower獲得pull消息後就立馬給Leader發送ACK,不會等到放入log中。因此commit的消息,只能保證它存在於多個Replication的內存中,不能保證它被持久化到磁盤中。就不能徹底保證在出現異常後,這條消息能被消費。可是這個問題剛好適用於「該問題剛好不解決」。比較很是的少見,因此在性能和可用性上作了一個平衡。
Consumer讀取消息是從Leader中讀取,而且必須是Commit的消息才能被消費。只能消費到HW位置。
Partition有了Leader後,由於Leader只會有一臺,那麼在這臺機子宕機後,就須要在剩下的follower中從新選擇出一個擁有最新數據的follower來變成Leader來對外提供服務。在election中,kafka沒有使用「Majority Vote」(「少數服從多數」)的算法,它的劣勢是能容忍的失敗的follower個數比較少。在kafka的parttion的ISR模式下,保證在不丟失已經commit的消息的前提下,能容忍F個Replica失敗(總個數是F+1),由於ISR列表中裏面全部的Replica都有leader中的數據。因此只要有一個Replica還在,數據就是所有的commit數據。因此直接在ISR中選舉Leader。
如何選舉?在集羣中全部的broker中選出一個controller(集羣中只會有一個controller),集羣中全部Partition的Leader選舉都由該broker解決,這個controller主要負責1:parttion的Leader變化事件。2:新建立和刪除一個topic。3:從新分配parttion。4:管理分區的狀態機和副本的狀態機。當controller決定一個Partition的Leader和ISR後,會將此決定持久化到ZK節點中,而且向全部受到影響的Broker經過RPC的形式直接發送新的決策。
在broker宕機,Controller註冊在ZK的/brokers/ids的Watcher會觸發調用onBrokerFailure,這臺宕機的broker上的Replica多是某個parttion的Leader或者是某個parttion的follower,若是是parttion的Leader,那麼該Controller要確保有其餘的broker成爲這個parttion的Leader,若是是parttion的follower,不須要從新選舉該parttion的Leader,可是該parttion的ISR有可能會發生變化(由於這臺follower的broker宕機了,要從ISR列表中移除)。因此Controller首先會讀取ZK中該parttion的ISR/AR選舉新的Leader和ISR,而後把這個信息先保存到ZK中,最後把包含了新Leader和ISR的LeaderAndISR指令發送給受到影響的Brokers。收到指令的Broker若是Controller命令它成爲某個Partition的Leader,那麼原來爲Follower的Replica所在的Broker就成爲了Partition的Leader。,收到指令的broker原先的Replica是Follower收到指令沒有讓它成爲Leader那麼它依然是的Replica,becomeFollower。
例如過程以下:
1:有三個broker:brokerA,brokerB,brokerC
2:Partition1有三個副本:Replica1,Replica2,Replica3。其中Replica1是Leader,ISR=[1,2,3]
3:Replica1所在的BrokerA掛掉了(Partition1沒有了Leader),Controller註冊的Watcher會觸發調用onBrokerFailure
4:Controller會讀取ZK中的leaderAndISR,選舉出新的leader和ISR:Leader=Replica2,ISR=[2,3]
5:Controller將最新的leaderAndISR寫到ZK中(leader=Replica2,ISR=[2,3])
6:Controller將最新的leaderAndISR構形成LeaderAndISRRequest命令發送給Broker2,Broker3
7:Replica2所在的BrokerB收到指令,由於最新的leader指示Replica2是Leader,因此Replica2成爲Partition1的Leader
8: Replica3所在的BrokerC收到指令,Replica3仍然是follower,而且在ISR中,becomeFollower
Broker宕機後Controller端的處理步驟以下:
1:從ZK中讀取現存的brokers
2:broker宕機,引發partition的Leader或ISR變化,獲取在宕機的broker上的partitions:set_p。
3:循環set_p的每一個Partition P:
從ZK的leaderAndISR節點讀取P的當前ISR
決定P的新Leader和新ISR(優先級分別是ISR中存活的broker,AR中任意存活的做爲Leader)
將P最新的leader,ISR,回寫到ZK的leaderAndISR節點
4:將set_p中每一個Partition的LeaderAndISR指令(裏面包括最新的leaderAndISR數據)發送給受到影響的brokers
假設AR=[1,2,3,4,5],ISR=[1,2,3],可是存活Brokers=[2,3,5]。選擇Leader的方式是ISR中目前存活的Brokers,好比目前存活的Broker是[2,3,5],因此ISR中的副本1是不能做爲Leader的,也不會再做爲ISR了。Leader的選舉是選舉目前還存活的[2,3]中的一個,ISR的肯定是選舉在當前ISR中仍然存活的Broker=[2,3]。因此最後Leader=2,ISR=[2,3]。
假設AR=[1,2,3,4,5],ISR=[1,2,3],可是存活的Brokers=[4,6,7]。由於ISR中沒有一個Broker在當前處於存活狀態,因此只能退而求其次從AR中選擇。幸運的是AR中的4目前是存活的,因此Leader=4,ISR=[4]。因爲4再也不ISR中,因此這種狀況有可能會形成數據丟失,由於只有選舉處於ISR中的,纔不會丟失數據,可是如今ISR中的沒有一個存活,因此也只好選擇有可能丟失的Broekr,總比找不到任何的Broker要好。
Partition有多個Replica,Replica是分佈在Broker上的。因此一個Broker上受到影響的Replica的Partition確定還有其餘的Replica分佈在其餘Broker上。因此含有宕機Broker的Partition的Replica的節點都是受到影響的broker。
假如Broker1上有三個Replica,第一個Replica是Partition1的Leader,第二個Replica是Partition2的Follower,第三個Replica是Partition3的Follower。若是是Leader,則受影響到的broker要被Controller負責選出這個Replica對應的Partition的新Leader。若是是follower,也有可能影響了Partition的ISR,因此Leader要負責更新ISR。
因爲Controller是從Brokers中選舉出來的,因此Controller所在的節點也會做爲Partition的存儲節點的。當Controller掛掉後,Controller自己做爲Broker也會觸發新的Controller調用on_broker_change。可是在尚未選舉出新的Controller以前,掛掉的Broker的on_broker_change不會被新的Controller調用(由於根本就沒有可用的Controller)。因此對於掛掉的Controller節點,最緊迫的任務是首先選舉出新的Controller,而後再由新的Controller觸發掛掉的那個Controller的on_broker_change。
Broker失敗和Controller失敗是不一樣的:Broker的failover是在Controller端處理的,由於咱們知道Broker掛掉了,Controller負責在掛掉Broker和受影響的Broker之間更新數據(將新的leaderAndISR發送給受影響的Broker)。而Controller的failover則是在Broker處理的(成功建立Controller的那一個Broker)。
1:新建立一個topic,會同時指定Partition的個數,每一個Partition都有AR信息寫到ZK中。
2:爲新建立的每一個Partition初始化leader
選擇AR中的一個存活的Broker做爲新Leader,ISR=AR
將新Leader和ISR寫到ZK的leaderAndISR節點
3:發送LeaderAndISRCommand給受到影響的brokers
4:若是是刪除一個topic,則發送StopReplicaCommand給受影響的brokers。
Controller會在/brokers/topics上註冊Watcher,因此有新topic建立/刪除時,Controller會經過Watch獲得新建立/刪除的Topic的Partition/Replica分配。對於新建立的Topic,分配給Partition 的AR中全部的Replica都尚未數據,可認爲它們都是同步的,也即都在ISR中(ISR=AR),任意一個Replica均可做爲Leader。
建立或刪除topic的過程和onBrokerFailure相似都要通過三個步驟:1) 選舉Leader和ISR;2) 將leaderAndISR寫到ZK中;3) 將最新leaderAndISR的LeaderAndISR指令發送給受到影響的Brokers。這是由於brokerChange致使Partition的Leader或者ISR發生變化,而新建立topic時,根本就沒有Leader和ISR,因此二者都須要爲Partition選擇Leader和ISR。
存在這樣一個狀況:有多個broker同時聲稱是一個Partition的leader。好比brokerA是partition的初始Leader,partition的ISR是{A,B,C}。有一天brokerA由於某種緣由失去它在ZK中的註冊信息。這時controller會認爲brokerA當掉了,並將partition的leader分配給了brokerA,設置新的ISR爲{B,C},並寫到ZK中(每次partition的leader發生變化,epoch也會增長)。在brokerB成爲leader的同時,brokerA從緣由中恢復過來,可是沒有接收到controller發送的leadershipi變化指令。這樣如今brokerA和brokerB都認爲本身是partition的Leader,若是咱們容許broker A和B能同時提交消息那就很是不幸了,由於全部副本之間的數據同步就會不一致了。不過當前的設計中其實是不會容許出現這種狀況的:當brokerB成爲leader以後,brokerA是沒法再提交任何新的消息的。
Kafka是怎麼作的到?假設容許存在兩個leader,生產者會同時往這兩個leader寫數據,可是能不能提交消息就相似於兩階段提交協議了:kafka的leader要可以提交一個消息的保證是ISR中的全部副本都複製成功。對於brokerA爲了保證能提交消息m,它須要ISR中的每一個副本(A,B,C)都要接收到消息m,而這個時候broker A仍然認爲ISR是{A,B,C},(這是broker A的一份本地拷貝,雖然這個時候ZK中的ISR已經被broker B改變了:ISR是{B,C}),可是ISR中的brokerB是不會再接收到消息m的。由於在brokerB成爲Leader的時候,它會首先關掉到以前到舊的brokerA中抓取數據的線程(brokerB以前是follower,會向leader抓取數據,只有follower抓取數據,leader才能判斷消息是否能提交),由於brokerB的抓取線程被關閉了,brokerA會認爲B沒法遇上Leader,既然由於B受到影響不能提交消息m,broker A乾脆就想要把B從ISR中移除,這個時候broker A要將本身認爲的最新的ISR寫到ZK中。不過不幸的是broker A並不能完成這個操做:由於在寫ZK的時候broker A會發現本身的epoch版本和ZK中的當前值並不匹配(broker B在選舉爲Leader以後會寫到ZK中,並將epoch增長1,任何新的寫操做的epoch都不能比當前epoch小),直到這個時刻,broker A才意識到它已經再也不是partition的leader了。這個時候broker A只能接受本身再也不是該partition的Leader的事實了。
controller首先將受影響的partitions的新leader寫到zk的leaderAndISR節點,而後纔會向brokers發送leader改變的命令,brokers收到命令後會對leader改變的事件做出響應。因爲客戶端請求會使用leaderAndISR的數據來鏈接leader所在的節點,客戶端的請求被路由到新leader所在的broker節點,但若是那個broker尚未準備好成爲leader,就存在一個時間窗口對於客戶端而言是不可用的。kafka這裏則是controller先更新元數據(寫入leaderAndISR)後才發送命令給broker,所以可能元數據已經更新,可是broker還沒收到命令,或者收到命令後還沒準備好成爲leader。因此可能你會認爲不要讓controller先更新leaderAndISR節點,而是發送指令給brokers,每一個broker在收到指令並處理完成後才,讓每一個broker來更新這個節點的數據。不過這裏讓controller來更新leaderAndISR節點是有緣由的:咱們是依賴ZK的leaderAndISR節點來保持controller和partition的leader的同步。當controller選舉出新的leader以後,它不但願新的Leader的ISR被舊的Leader改變,不然的話(假設ISR能夠被舊leader改變),新選舉出來的leader在接管正式成爲leader以前可能會被當前leader從ISR中剔除出去。經過當即將新leader發佈到leaderAndISR節點,controller可以防止當前leader更新ISR(選舉新的leader在寫到ZK時,epoch增長,而若是當前leader想要更新ISR好比將新選舉的leader從ISR中剔除掉,由於epoch不匹配,因此當前leader就再也不有機會更新ISR了)。
客戶端能夠利用自身的失敗重連等機制來實現路由。
一般狀況下,follower的HW老是落後於leader的HW。因此在leader變化時,消費者發送的offset中可能會落在新leader的HW和LEO之間(由於消費者的消費進度依賴於Leader的HW,舊Leader的HW比較高,而原先的follower做爲新Leader,它的HW仍是落後於舊Leader,因此消費者的offset雖然比舊Leader的HW低,可是有可能比新Leader的HW要大)。若是沒有發生leader變化,服務端會返回OffsetOutOfRangeException給客戶端。若是請求的offset介於HW和LEO之間,服務端會返回空消息集給消費者。
LeaderAndISRCommand:
1:讀取命令中的partition
2:處理每一個partition
若是P在本地不存在,調用startReplica()建立一個新的Replia。
指令要求這個Broker成爲P的新Leader,調用becomeLeader。
指令要求這個Broker做爲Leader l的follower,調用becomeFollower。
3:若是指令中有INIT標記,則刪除不在set_p中的全部本地partitions
becomeLeader指的是當前接收LeaderAndISRCommand指令的Broker,原先是一個follower,如今要轉變爲Leader。因爲做爲Follower期間,它會從Leader抓取數據,而如今Leader不在了,因此首先要中止抓取數據線程。follower轉變爲Leader以後,要負責讀寫數據,因此要啓動提交線程負責將消息存儲到本地日誌文件中。
becomeFollower對於要轉變爲Follower的replica,原先若是是Leader的話,則要中止提交線程,因爲當前Replica的leader可能會發生變化,因此在開始時要中止抓取線程,在最後要新建立到Replica最新leader的抓取線程,這中間還要截斷日誌到Replica的HW位置。
注意有可能新leader的HW會比以前的leader的HW要落後,這是由於新leader有多是ISR,也有多是AR中的replica。而原先做爲Follower的replica,它的HW只會在向Leader發送抓取請求時,Leader在抓取響應中除了返回消息也會附帶本身的HW給follower,Follower收到消息和HW後,纔會更新本身的replica的HW,這中間有必定的時間間隔會致使Follower的HW會比Leader的HW要低。所以Follower在轉變爲Leader以後,它的HW是有可能比老的Leader的HW要低的。若是在leader角色轉變以後,一個消費者客戶端請求的offset可能比新的Leader的HW要大(由於消費者最多隻消費到Leader的HW位置,可是消費者並不關心Leader到底有沒有變化,因此若是舊的Leader的HW=10,那麼客戶端就能夠消費到offset=10這個位置,而Leader發生轉變後,HW可能下降爲9,而這個時候客戶端繼續發送offset=10,就有可能比Leader的HW要大了!)。這種狀況下,若是消費者要消費Leader的HW到LEO之間的數據,Broker會返回空的集合,而若是消費者請求的offset比LEO還要大,就會拋出OffsetOutofRangeException(LEO表示的是日誌的最新位置,HW比LEO要小,客戶端只能消費到HW位置,更不可能消費到LEO了)。
startReplica表示啓動一個Replica,若是不存在Partition目錄,則建立。並啓動Replica的HW checkpoint線程,咱們已經知道了Follower的HW是經過發送抓取請求,接收應答中包含了Leader的HW,設置爲Follower Replica的HW(而Leader的HW又是由ISR提交來決定的,因此說ISR決定了HW可以增長,而Follower的HW則來自於Leader的HW)。
StopReplicaCommand
1:從指令中讀取partitions集合
2:對每一個partition P的每一個Replica ,調用stopReplica
中止和r關聯的抓取線程(固然針對的是Follower Replica)
中止r的HW checkpoint線程
刪除partition目錄