Kafka--學習總結

  • 首選介紹幾個名詞:

                 ISR:一個Partition中的Leader的全部follower(replication)集合。node

               AR:分配給此Partition的全部replication,統稱爲Assigned Replicas(AR)。ISR是AR的子集。算法

                 Controller:broker中的Leader。併發

                 HW:消費端能看到的broker上消息的位置,客戶端只能消費到HW位置。負載均衡

                 LEO:消息在log文件中最新的位置。異步

                 segment:replication對應一個文件夾中,實際存消息的文件socket

  • 整體

          多個broker組成了kafka集羣。性能

          

  • Broker

        1.    Broker均可以向producer提供metadata信息,metadata包括:集羣中存活的broker列表;partitions leader列表等信息。當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層"。spa

         2.    使用zookeeper用來註冊broker信息,監測partition leader存活性。線程

  • Consumer

  1. ConsumerGroup:多個consumer(consumer線程)組成一個Group。一個Partition中的每一個message只能被同一個Group中的一個consumer消費(在不一樣的Group中的consumer能夠消費這個Partition中的消息)。多個consumer消費partition中的message都必須是順序讀取消息,新啓動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。在kafka消費消息中沒有鎖的概念,因此就只容許同一個consumer group下的consumer線程去訪問一個partition。若是以爲效率不高,須要橫向擴展partition,那麼橫向擴展的partition能夠經過加新的consumer去消費。若是不少不一樣的業務都在這個topic裏面,那就啓動多個consumer group,讓多個group順序讀取message。
  2.  一個consumer group下,不管有多少個consumer,這個group必定是把這個topic下全部的partition都消費了。設計

    1) 當group中的consumer數量小於topic的partition數量:一個consumer消費多個partition。

    2)當group中的consumer數量等於topic的partition數量:一個consumer消費一個partition。這時的效率最高。

    3)當group中的consumer數量大於topic的partition數量:這時就會有consumer空閒,形成資源的浪費。

    在設置consumer group時,只須要指定裏面consumer數量,consumer會自動進行rebalance。

    Consumer rebalance觸發條件:

    1)Consumer增長或刪除會觸發 Consumer Group的Rebalance。

    2)Broker的增長或者減小都會觸發 Consumer Rebalance。
  3. Consumer消費partition中的message是順序讀取,因此必需要維護上次讀取message的位置。

    1、舊版本的(在0.8.2以前):

    1)high level API:offset存在於zookeeper中: Consumer默認是讀完message先commmit再處理message,autocommit默認是是true,這時候先commit就會更新offset+1,一旦處理失敗,offset已經+1,這個時候就會丟message;若是還有offset+1。那麼consumer重啓後就會重複消費這個message。也能夠配置成讀完消息處理再commit,這種狀況下consumer端的響應就會比較慢的,須要等處理完才行。

    2) low level API:由應用本身維護offset。

    2、0.8.2以後:

    1)  新的Comsumer API再也不有high-level、low-level之分了,而是本身維護offset。這樣作的好處是避免應用出現異常時,數據未消費成功,但Position已經提交,致使消息未消費的狀況發生。默認將消費的offset遷入到kafka一個名爲__consumer_offsets 的Topic中。原理:利用kafka自身的topic,以消費的group,topic以及partition作爲組合key,全部的消費offset都提交寫入到名爲__consumer_offsets 的Topic中。

  4. Consumer消費的message是在topic下的某個partition的leader上。

  5. 當Consumer啓動時,觸發的操做:

    1)進行"Consumer id Registry"。

    2)在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管失效consumer的partition)。

    3)在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance。

  6. 使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。

  • Producer

  1. producer發送message不用維護message的offset信息,由於這個時候,offset就至關於一個自增id,produce只管發送message。
  2. producer通常都是大批量的batch發送message,向一個topic一次性發送一大批message,load balance到一個partition上,offset做爲自增id本身增長就好。
  3. producer端發送的message必須指定是發送到哪一個topic,可是不須要指定topic下的哪一個partition。Kafka會把收到的message進行load balance,均勻的分佈到這個topic下的不一樣的partition上:hash(message)%broker數量。
  4.  producer發送的message是發送到topic下的某個partition的leader上。
  5. producer使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息。
  • Topic&Partition

  1. Topic被分紅一個或者多個Partition,一個Partition能夠有多個replication,一個Partition的不一樣replication依據算法被分配到不一樣的broker上。
  2. 分配算法以下:

         1)將全部Broker(假設共n個Broker)和待分配的Partition排序。

         2)將第i個Partition分配到第(i mod n)個Broker上。

         3)將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上。

一個Partition的一個replication對應一個文件夾,文件夾中包含索引文件(每一個segment的offset範圍)和多個segment文件(以第一條消息的offset命名)。

           Partition和replication的關係:

                   

          topic:

                  

  • kafka的High Available(高可用性)

              Kafka的高可用性(HA)體如今:replication和Leader election(replication之間)。

              Replication的做用:

                      在沒有replication的狀況下,由於partition在某一臺broker上,一旦某一個broker宕機,那麼這臺broker上的Partition的數據都不能被消費,同時producer也不能提交消息到此topic的該Partition下。這樣就違背了kafka的高可用性。因此引入了Replication機制。特別是在生產環境中。

                      引入Replication以後,那麼一個partition就會有多個Replication,由於不一樣的Replication存在於不一樣的broker中,這樣在一臺broker宕機後,在其餘broker上的該partition的Replication還能夠提供服務。這樣就保持了kafka的高可用。

             Partition 中的Leader:

                 有了Replication就必須考慮數據的一致性,這樣才能保證在一個broker宕機後其餘的Replication提供服務的時候數據不會丟失。kafka多個Replication,引入Leader,Producer和Consumer只與這個Leader打交道,其餘的Replication做爲此leader的follower,而且從leader拉取數據。若是不存在Leaader的話,全部的Replication都是能夠同時讀/寫的,就必須保證多個Replication之間的數據同步,它們直接就要互相的同步數據(N*N條路)。這樣的設計就至關的複雜,因此引入Leader後,讓Leader來負責讀/寫,其餘的Replication做爲follower就從leader同步數據,這樣高效簡單。

             Kafka複製消息的機制:

                    Broker是否還存活必須知足2個條件:第一:必須維護與ZK的心跳機制,第二:上面的Follower必須能快速的從它的Leader那邊同步消息過來,不能落後Leader太多。Leader會跟蹤與其保持同步的Replication列表(ISR:in-sync Replication)。若是Follower宕機或者拖後太多,Leader將會把它從ISR列表中移除。

                       消息複製機制若是使用同步機制的話,就要求Leader全部的Follower都完成複製(Follower從Leader中pull數據,Follower收到數據並寫入到log後,向Leader發送ACK),這樣很影響吞吐率。

                      消息複製機制若是採用異步複製機制,Follower異步從Leader複製數據,數據只要被Leader寫入log就認爲消息是commit的狀態,這種狀況下若是全部的Follower都落後與Leader,那麼在Leader宕機後,消息就會出現丟失。

                    Producer發佈消息到topic的時候(實際上是發送到Partition中),先經過zookeeper找到該Partition的Leader。Producer只把消息發送到該Partition的Leader中。Leader會將消息寫入其本地的log,每一個Follower都從Leader pull數據,這樣Follower存儲信息的順序就和Leader同樣了,Follower在pull到消息後也會把消息存放到本身的log中,向Leader發送ACK。一旦Leader收到了全部ISR列表中的Replication的ACK,這條消息就是已經commit(能夠被消費了)的了,這時Leader將增長HW並想Producer發送ACK。這樣有個問題,就是Leader要等到全部ISR中Replication的ACK,那麼在ISR列表不少或者其中一個的ACK回來的比較慢的時候,這樣就是影響總體的吞吐率。爲了提升性能,每一個Follower獲得pull消息後就立馬給Leader發送ACK,不會等到放入log中。因此commit的消息,只能保證它存在於多個Replication的內存中,不能保證它被持久化到磁盤中。就不能徹底保證在出現異常後,這條消息能被消費。可是這個問題剛好適用於「該問題剛好不解決」。比較很是的少見,因此在性能和可用性上作了一個平衡。

                     Consumer讀取消息是從Leader中讀取,而且必須是Commit的消息才能被消費。只能消費到HW位置。

            Partition Leader的Election:

                      Partition有了Leader後,由於Leader只會有一臺,那麼在這臺機子宕機後,就須要在剩下的follower中從新選擇出一個擁有最新數據的follower來變成Leader來對外提供服務。在election中,kafka沒有使用「Majority Vote」(「少數服從多數」)的算法,它的劣勢是能容忍的失敗的follower個數比較少。在kafka的parttion的ISR模式下,保證在不丟失已經commit的消息的前提下,能容忍F個Replica失敗(總個數是F+1),由於ISR列表中裏面全部的Replica都有leader中的數據。因此只要有一個Replica還在,數據就是所有的commit數據。因此直接在ISR中選舉Leader。

                      如何選舉?在集羣中全部的broker中選出一個controller(集羣中只會有一個controller),集羣中全部Partition的Leader選舉都由該broker解決,這個controller主要負責1:parttion的Leader變化事件。2:新建立和刪除一個topic。3:從新分配parttion。4:管理分區的狀態機和副本的狀態機。當controller決定一個Partition的Leader和ISR後,會將此決定持久化到ZK節點中,而且向全部受到影響的Broker經過RPC的形式直接發送新的決策。

             Broker Failover:

                   在broker宕機,Controller註冊在ZK的/brokers/ids的Watcher會觸發調用onBrokerFailure,這臺宕機的broker上的Replica多是某個parttion的Leader或者是某個parttion的follower,若是是parttion的Leader,那麼該Controller要確保有其餘的broker成爲這個parttion的Leader,若是是parttion的follower,不須要從新選舉該parttion的Leader,可是該parttion的ISR有可能會發生變化(由於這臺follower的broker宕機了,要從ISR列表中移除)。因此Controller首先會讀取ZK中該parttion的ISR/AR選舉新的Leader和ISR,而後把這個信息先保存到ZK中,最後把包含了新Leader和ISR的LeaderAndISR指令發送給受到影響的Brokers。收到指令的Broker若是Controller命令它成爲某個Partition的Leader,那麼原來爲Follower的Replica所在的Broker就成爲了Partition的Leader。,收到指令的broker原先的Replica是Follower收到指令沒有讓它成爲Leader那麼它依然是的Replica,becomeFollower。

                 例如過程以下:

                      1:有三個broker:brokerA,brokerB,brokerC

                      2:Partition1有三個副本:Replica1,Replica2,Replica3。其中Replica1是Leader,ISR=[1,2,3]

                     3:Replica1所在的BrokerA掛掉了(Partition1沒有了Leader),Controller註冊的Watcher會觸發調用onBrokerFailure

                     4:Controller會讀取ZK中的leaderAndISR,選舉出新的leader和ISR:Leader=Replica2,ISR=[2,3]

                      5:Controller將最新的leaderAndISR寫到ZK中(leader=Replica2,ISR=[2,3])

                   6:Controller將最新的leaderAndISR構形成LeaderAndISRRequest命令發送給Broker2,Broker3

                 7:Replica2所在的BrokerB收到指令,由於最新的leader指示Replica2是Leader,因此Replica2成爲Partition1的Leader

             8: Replica3所在的BrokerC收到指令,Replica3仍然是follower,而且在ISR中,becomeFollower

               Broker宕機後Controller端的處理步驟以下:

                      1:從ZK中讀取現存的brokers

                    2:broker宕機,引發partition的Leader或ISR變化,獲取在宕機的broker上的partitions:set_p。

                      3:循環set_p的每一個Partition P:

                                  從ZK的leaderAndISR節點讀取P的當前ISR

                                 決定P的新Leader和新ISR(優先級分別是ISR中存活的broker,AR中任意存活的做爲Leader)

                                 將P最新的leader,ISR,回寫到ZK的leaderAndISR節點

                      4:將set_p中每一個Partition的LeaderAndISR指令(裏面包括最新的leaderAndISR數據)發送給受到影響的brokers

              如何決定partition的新Leader和新ISR:

                            假設AR=[1,2,3,4,5],ISR=[1,2,3],可是存活Brokers=[2,3,5]。選擇Leader的方式是ISR中目前存活的Brokers,好比目前存活的Broker是[2,3,5],因此ISR中的副本1是不能做爲Leader的,也不會再做爲ISR了。Leader的選舉是選舉目前還存活的[2,3]中的一個,ISR的肯定是選舉在當前ISR中仍然存活的Broker=[2,3]。因此最後Leader=2,ISR=[2,3]。

                        假設AR=[1,2,3,4,5],ISR=[1,2,3],可是存活的Brokers=[4,6,7]。由於ISR中沒有一個Broker在當前處於存活狀態,因此只能退而求其次從AR中選擇。幸運的是AR中的4目前是存活的,因此Leader=4,ISR=[4]。因爲4再也不ISR中,因此這種狀況有可能會形成數據丟失,由於只有選舉處於ISR中的,纔不會丟失數據,可是如今ISR中的沒有一個存活,因此也只好選擇有可能丟失的Broekr,總比找不到任何的Broker要好。

              什麼叫作受到影響的brokers:

                            Partition有多個Replica,Replica是分佈在Broker上的。因此一個Broker上受到影響的Replica的Partition確定還有其餘的Replica分佈在其餘Broker上。因此含有宕機Broker的Partition的Replica的節點都是受到影響的broker。

                             假如Broker1上有三個Replica,第一個Replica是Partition1的Leader,第二個Replica是Partition2的Follower,第三個Replica是Partition3的Follower。若是是Leader,則受影響到的broker要被Controller負責選出這個Replica對應的Partition的新Leader。若是是follower,也有可能影響了Partition的ISR,因此Leader要負責更新ISR。

             Controller Failover:

                           因爲Controller是從Brokers中選舉出來的,因此Controller所在的節點也會做爲Partition的存儲節點的。當Controller掛掉後,Controller自己做爲Broker也會觸發新的Controller調用on_broker_change。可是在尚未選舉出新的Controller以前,掛掉的Broker的on_broker_change不會被新的Controller調用(由於根本就沒有可用的Controller)。因此對於掛掉的Controller節點,最緊迫的任務是首先選舉出新的Controller,而後再由新的Controller觸發掛掉的那個Controller的on_broker_change。

                         Broker失敗和Controller失敗是不一樣的:Broker的failover是在Controller端處理的,由於咱們知道Broker掛掉了,Controller負責在掛掉Broker和受影響的Broker之間更新數據(將新的leaderAndISR發送給受影響的Broker)。而Controller的failover則是在Broker處理的(成功建立Controller的那一個Broker)。

             建立或刪除topics:

                         1:新建立一個topic,會同時指定Partition的個數,每一個Partition都有AR信息寫到ZK中。

                         2:爲新建立的每一個Partition初始化leader 

                                  選擇AR中的一個存活的Broker做爲新Leader,ISR=AR

                                  將新Leader和ISR寫到ZK的leaderAndISR節點

                         3:發送LeaderAndISRCommand給受到影響的brokers

                         4:若是是刪除一個topic,則發送StopReplicaCommand給受影響的brokers。

                         Controller會在/brokers/topics上註冊Watcher,因此有新topic建立/刪除時,Controller會經過Watch獲得新建立/刪除的Topic的Partition/Replica分配。對於新建立的Topic,分配給Partition            的AR中全部的Replica都尚未數據,可認爲它們都是同步的,也即都在ISR中(ISR=AR),任意一個Replica均可做爲Leader。

                         建立或刪除topic的過程和onBrokerFailure相似都要通過三個步驟:1) 選舉Leader和ISR;2) 將leaderAndISR寫到ZK中;3) 將最新leaderAndISR的LeaderAndISR指令發送給受到影響的Brokers。這是由於brokerChange致使Partition的Leader或者ISR發生變化,而新建立topic時,根本就沒有Leader和ISR,因此二者都須要爲Partition選擇Leader和ISR。

             如何處理多個leader:

                       存在這樣一個狀況:有多個broker同時聲稱是一個Partition的leader。好比brokerA是partition的初始Leader,partition的ISR是{A,B,C}。有一天brokerA由於某種緣由失去它在ZK中的註冊信息。這時controller會認爲brokerA當掉了,並將partition的leader分配給了brokerA,設置新的ISR爲{B,C},並寫到ZK中(每次partition的leader發生變化,epoch也會增長)。在brokerB成爲leader的同時,brokerA從緣由中恢復過來,可是沒有接收到controller發送的leadershipi變化指令。這樣如今brokerA和brokerB都認爲本身是partition的Leader,若是咱們容許broker A和B能同時提交消息那就很是不幸了,由於全部副本之間的數據同步就會不一致了。不過當前的設計中其實是不會容許出現這種狀況的:當brokerB成爲leader以後,brokerA是沒法再提交任何新的消息的。

                          Kafka是怎麼作的到?假設容許存在兩個leader,生產者會同時往這兩個leader寫數據,可是能不能提交消息就相似於兩階段提交協議了:kafka的leader要可以提交一個消息的保證是ISR中的全部副本都複製成功。對於brokerA爲了保證能提交消息m,它須要ISR中的每一個副本(A,B,C)都要接收到消息m,而這個時候broker A仍然認爲ISR是{A,B,C},(這是broker A的一份本地拷貝,雖然這個時候ZK中的ISR已經被broker B改變了:ISR是{B,C}),可是ISR中的brokerB是不會再接收到消息m的。由於在brokerB成爲Leader的時候,它會首先關掉到以前到舊的brokerA中抓取數據的線程(brokerB以前是follower,會向leader抓取數據,只有follower抓取數據,leader才能判斷消息是否能提交),由於brokerB的抓取線程被關閉了,brokerA會認爲B沒法遇上Leader,既然由於B受到影響不能提交消息m,broker A乾脆就想要把B從ISR中移除,這個時候broker A要將本身認爲的最新的ISR寫到ZK中。不過不幸的是broker A並不能完成這個操做:由於在寫ZK的時候broker A會發現本身的epoch版本和ZK中的當前值並不匹配(broker B在選舉爲Leader以後會寫到ZK中,並將epoch增長1,任何新的寫操做的epoch都不能比當前epoch小),直到這個時刻,broker A才意識到它已經再也不是partition的leader了。這個時候broker A只能接受本身再也不是該partition的Leader的事實了。

             broker故障客戶端如何路由:

                       controller首先將受影響的partitions的新leader寫到zk的leaderAndISR節點,而後纔會向brokers發送leader改變的命令,brokers收到命令後會對leader改變的事件做出響應。因爲客戶端請求會使用leaderAndISR的數據來鏈接leader所在的節點,客戶端的請求被路由到新leader所在的broker節點,但若是那個broker尚未準備好成爲leader,就存在一個時間窗口對於客戶端而言是不可用的。kafka這裏則是controller先更新元數據(寫入leaderAndISR)後才發送命令給broker,所以可能元數據已經更新,可是broker還沒收到命令,或者收到命令後還沒準備好成爲leader。因此可能你會認爲不要讓controller先更新leaderAndISR節點,而是發送指令給brokers,每一個broker在收到指令並處理完成後才,讓每一個broker來更新這個節點的數據。不過這裏讓controller來更新leaderAndISR節點是有緣由的:咱們是依賴ZK的leaderAndISR節點來保持controller和partition的leader的同步。當controller選舉出新的leader以後,它不但願新的Leader的ISR被舊的Leader改變,不然的話(假設ISR能夠被舊leader改變),新選舉出來的leader在接管正式成爲leader以前可能會被當前leader從ISR中剔除出去。經過當即將新leader發佈到leaderAndISR節點,controller可以防止當前leader更新ISR(選舉新的leader在寫到ZK時,epoch增長,而若是當前leader想要更新ISR好比將新選舉的leader從ISR中剔除掉,由於epoch不匹配,因此當前leader就再也不有機會更新ISR了)。

                        客戶端能夠利用自身的失敗重連等機制來實現路由。

             leadership改變時,offset超過HW:

                          一般狀況下,follower的HW老是落後於leader的HW。因此在leader變化時,消費者發送的offset中可能會落在新leader的HW和LEO之間(由於消費者的消費進度依賴於Leader的HW,舊Leader的HW比較高,而原先的follower做爲新Leader,它的HW仍是落後於舊Leader,因此消費者的offset雖然比舊Leader的HW低,可是有可能比新Leader的HW要大)。若是沒有發生leader變化,服務端會返回OffsetOutOfRangeException給客戶端。若是請求的offset介於HW和LEO之間,服務端會返回空消息集給消費者。

            Broker處理Controller發送的Command

                         LeaderAndISRCommand:

                              1:讀取命令中的partition

                              2:處理每一個partition

                                      若是P在本地不存在,調用startReplica()建立一個新的Replia。

                                      指令要求這個Broker成爲P的新Leader,調用becomeLeader。

                                      指令要求這個Broker做爲Leader l的follower,調用becomeFollower。

                             3:若是指令中有INIT標記,則刪除不在set_p中的全部本地partitions

                           becomeLeader指的是當前接收LeaderAndISRCommand指令的Broker,原先是一個follower,如今要轉變爲Leader。因爲做爲Follower期間,它會從Leader抓取數據,而如今Leader不在了,因此首先要中止抓取數據線程。follower轉變爲Leader以後,要負責讀寫數據,因此要啓動提交線程負責將消息存儲到本地日誌文件中。

                            becomeFollower對於要轉變爲Follower的replica,原先若是是Leader的話,則要中止提交線程,因爲當前Replica的leader可能會發生變化,因此在開始時要中止抓取線程,在最後要新建立到Replica最新leader的抓取線程,這中間還要截斷日誌到Replica的HW位置。

                           注意有可能新leader的HW會比以前的leader的HW要落後,這是由於新leader有多是ISR,也有多是AR中的replica。而原先做爲Follower的replica,它的HW只會在向Leader發送抓取請求時,Leader在抓取響應中除了返回消息也會附帶本身的HW給follower,Follower收到消息和HW後,纔會更新本身的replica的HW,這中間有必定的時間間隔會致使Follower的HW會比Leader的HW要低。所以Follower在轉變爲Leader以後,它的HW是有可能比老的Leader的HW要低的。若是在leader角色轉變以後,一個消費者客戶端請求的offset可能比新的Leader的HW要大(由於消費者最多隻消費到Leader的HW位置,可是消費者並不關心Leader到底有沒有變化,因此若是舊的Leader的HW=10,那麼客戶端就能夠消費到offset=10這個位置,而Leader發生轉變後,HW可能下降爲9,而這個時候客戶端繼續發送offset=10,就有可能比Leader的HW要大了!)。這種狀況下,若是消費者要消費Leader的HW到LEO之間的數據,Broker會返回空的集合,而若是消費者請求的offset比LEO還要大,就會拋出OffsetOutofRangeException(LEO表示的是日誌的最新位置,HW比LEO要小,客戶端只能消費到HW位置,更不可能消費到LEO了)。

                           startReplica表示啓動一個Replica,若是不存在Partition目錄,則建立。並啓動Replica的HW checkpoint線程,咱們已經知道了Follower的HW是經過發送抓取請求,接收應答中包含了Leader的HW,設置爲Follower Replica的HW(而Leader的HW又是由ISR提交來決定的,因此說ISR決定了HW可以增長,而Follower的HW則來自於Leader的HW)。

                      StopReplicaCommand

                           1:從指令中讀取partitions集合

                           2:對每一個partition P的每一個Replica ,調用stopReplica

                                   中止和r關聯的抓取線程(固然針對的是Follower Replica)

                                   中止r的HW checkpoint線程

                                   刪除partition目錄

相關文章
相關標籤/搜索