消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構,是大型分佈式系統不可缺乏的中間件。javascript
本場 Chat 主要內容:css
相關內容連接:html
基於 Kafka-ZooKeeper 的分佈式消息隊列系統整體架構以下:java
如上圖所示,一個典型的 Kafka 體系架構包括若干 Producer(消息生產者),若干 broker(做爲 Kafka 節點的服務器),若干 Consumer(Group),以及一個 ZooKeeper 集羣。Kafka經過 ZooKeeper 管理集羣配置、選舉 Leader 以及在 consumer group 發生變化時進行 Rebalance(即消費者負載均衡,在下一課介紹)。Producer 使用 push(推)模式將消息發佈到 broker,Consumer 使用 pull(拉)模式從 broker 訂閱並消費消息。git
上圖僅描摹了一個整體架構,並無對做爲 Kafka 節點的 broker 進行深刻刻畫,事實上,它的內部細節至關複雜,以下圖所示,Kafka 節點涉及 Topic、Partition 兩個重要概念。算法
在 Kafka 架構中,有幾個術語:apache
一個 topic 能夠認爲是一類消息,每一個 topic 將被分紅多個 partition,每一個 partition 在存儲層面是 append log 文件。任何發佈到此 partition 的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱爲 offset(偏移量),offset 爲一個 long 型的數字,它惟一標記一條消息。 Kafka 機制中,producer push 來的消息是追加(append)到 partition 中的,這是一種順序寫磁盤的機制,效率遠高於隨機寫內存,以下示意圖:bootstrap
簡而言之:負載均衡 + 水平擴展。
前已述及,Topic 只是邏輯概念,面向的是 producer 和 consumer;而 Partition 則是物理概念。能夠想象,若是 Topic 不進行分區,而將 Topic 內的消息存儲於一個 broker,那麼關於該 Topic 的全部讀寫請求都將由這一個 broker 處理,吞吐量很容易陷入瓶頸,這顯然是不符合高吞吐量應用場景的。有了 Partition 概念之後,假設一個 Topic 被分爲 10 個 Partitions,Kafka 會根據必定的算法將 10 個 Partition 儘量均勻的分佈到不一樣的 broker(服務器)上,當 producer 發佈消息時,producer 客戶端能夠採用 random
、key-hash
及 輪詢
等算法選定目標 partition,若不指定,Kafka 也將根據必定算法將其置於某一分區上。Partiton 機制能夠極大的提升吞吐量,而且使得系統具有良好的水平擴展能力。服務器
在建立 topic 時能夠在 $KAFKA_HOME/config/server.properties
中指定這個 partition 的數量(以下所示),固然能夠在 topic 建立以後去修改 partition 的數量。網絡
# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=3
在發送一條消息時,能夠指定這個消息的 key,producer 根據這個 key 和 partition 機制來判斷這個消息發送到哪一個partition。partition 機制能夠經過指定 producer 的 partition.class 這一參數來指定(即支持自定義),該 class 必須實現 kafka.producer.Partitioner 接口。
有關 topic 與 partition 的更多細節,能夠參考下面的「Kafka 文件存儲機制」這一節。
談及可靠性,最常規、最有效的策略就是 「副本(replication)機制」 ,Kafka 實現高可靠性一樣採用了該策略。經過調節副本相關參數,可以使 Kafka 在性能和可靠性之間取得平衡。本節先從 Kafka 文件存儲機制入手,從最底層瞭解 Kafka 的存儲細節,進而對消息的存儲有個微觀的認知。以後經過介紹 Kafka 的複製原理和同步方式來闡述宏觀層面的概念。最後介紹 ISR,HW 和 leader 選舉。
Kafka 中消息是以 topic 進行分類的,生產者經過 topic 向 Kafka broker 發送消息,消費者經過 topic 讀取數據。然而 topic 在物理層面又能以 partition 爲分組,一個 topic 能夠分紅若干個 partition。事實上,partition 並非最終的存儲粒度,partition 還能夠細分爲 segment,一個 partition 物理上由多個 segment 組成,那麼這些 segment 又是什麼呢?
爲了便於說明問題,假設這裏只有一個 Kafka 集羣,且這個集羣只有一個 Kafka broker,即只有一臺物理機。在這個 Kafka broker 中配置 log.dirs=/tmp/kafka-logs
,以此來設置 Kafka 消息文件存儲目錄;與此同時,經過命令建立一個 topic:mytopic_test,partition 的數量配置爲 4(建立 topic 的命令請見上一課)。以後,能夠在 /tmp/kafka-logs
目錄中能夠看到生成了 4 個目錄:
drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-0 drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-1 drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-2 drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-3
在 Kafka 文件存儲中,同一個 topic 下有多個不一樣的 partition,每一個 partiton 爲一個目錄,partition 的名稱規則爲:topic 名稱 + 有序序號,第一個序號從 0 開始計,最大的序號爲 partition 數量減 1,partition 是實際物理上的概念,而 topic 是邏輯上的概念。
問題 1:爲何不能以 partition 做爲存儲單位?
上面提到 partition 還能夠細分爲 segment,這個 segment 又是什麼?若是就以 partition 爲最小存儲單位,能夠想象,當 Kafka producer 不斷髮送消息,必然會引發 partition 文件的無限擴張,將對消息文件的維護以及已消費的消息的清理帶來嚴重的影響,所以,需以 segment 爲單位將 partition 進一步細分。每一個 partition(目錄)至關於一個巨型文件被平均分配到多個大小相等的 segment(段)數據文件中(每一個 segment 文件中消息數量不必定相等)這種特性也方便 old segment 的刪除,即方便已被消費的消息的清理,提升磁盤的利用率。每一個 partition 只須要支持順序讀寫就行,segment 的文件生命週期由服務端配置參數(log.segment.bytes,log.roll.{ms,hours} 等若干參數)決定。
問題 2:segment 的工做原理是怎樣的?
segment 文件由兩部分組成,分別爲 「.index」 文件和 「.log」 文件,分別表示爲 segment 索引文件和數據文件。這兩個文件的命令規則爲:partition 全局的第一個 segment 從 0 開始,後續每一個 segment 文件名爲上一個 segment 文件最後一條消息的 offset 值,數值大小爲 64 位,20 位數字字符長度,沒有數字用 0 填充,以下:
00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index 00000000000000239430.log
以上面的 segment 文件爲例,展現出 segment:00000000000000170410 的 「.index」 文件和 「.log」 文件的對應的關係,以下圖:
如上圖,「.index」 索引文件存儲大量的元數據,「.log」 數據文件存儲大量的消息,索引文件中的元數據指向對應數據文件中 message 的物理偏移地址。其中以 「.index」 索引文件中的元數據 [3, 348] 爲例,在 「.log」 數據文件表示第 3 個消息,即在全局 partition 中表示 170410+3=170413 個消息,該消息的物理偏移地址爲 348。
問題 3:如何從 partition 中經過 offset 查找 message 呢?
以上圖爲例,讀取 offset=170418 的消息,首先查找 segment 文件,其中 00000000000000000000.index 爲最開始的文件,第二個文件爲 00000000000000170410.index(起始偏移爲 170410+1=170411),而第三個文件爲 00000000000000239430.index(起始偏移爲 239430+1=239431),因此這個 offset=170418 就落到了第二個文件之中。其它後續文件能夠依次類推,以其偏移量命名並排列這些文件,而後根據二分查找法就能夠快速定位到具體文件位置。其次根據 00000000000000170410.index 文件中的 [8,1325] 定位到 00000000000000170410.log 文件中的 1325 的位置進行讀取。
要是讀取 offset=170418 的消息,從 00000000000000170410.log 文件中的 1325 的位置進行讀取,那麼,如何肯定什麼時候讀完本條消息呢?(不然就讀到下一條消息的內容了)
這個問題由消息的物理結構解決,消息都具備固定的物理結構,包括:offset(8 Bytes)、消息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,能夠肯定一條消息的大小,即讀取到哪裏截止。
Kafka 中 topic 的每一個 partition 有一個預寫式的日誌文件,雖然 partition 能夠繼續細分爲若干個 segment 文件,可是對於上層應用來講,仍然能夠將 partition 當作最小的存儲單元(一個有多個 segment 文件拼接的 「巨型」 文件),每一個 partition 都由一些列有序的、不可變的消息組成,這些消息被連續的追加到 partition 中。
上圖中有兩個新名詞:HW 和 LEO。這裏先介紹下 LEO,LogEndOffset 的縮寫,表示每一個 partition 的 log 最後一條 Message 的位置。HW 是 HighWatermark 的縮寫,是指 consumer 可以看到的此 partition 的位置,這個涉及到多副本的概念,這裏先說起一下,下文再詳述。
言歸正傳,爲了提升消息的可靠性,Kafka 每一個 topic 的 partition 有 N 個副本(replicas),其中 N(大於等於 1)是 topic 的複製因子(replica fator)的個數。Kafka 經過多副本機制實現故障自動轉移,當 Kafka 集羣中出現 broker 失效時,副本機制可保證服務可用。對於任何一個 partition,它的 N 個 replicas 中,其中一個 replica 爲 leader,其餘都爲 follower,leader 負責處理 partition 的全部讀寫請求,follower 則負責被動地去複製 leader 上的數據。以下圖所示,Kafka 集羣中有 4 個 broker,某 topic 有 3 個 partition,且複製因子即副本個數也爲 3:
若是 leader 所在的 broker 發生故障或宕機,對應 partition 將因無 leader 而不能處理客戶端請求,這時副本的做用就體現出來了:一個新 leader 將從 follower 中被選舉出來並繼續處理客戶端的請求。
如何確保新選舉出的 leader 是優選呢?
一個 partition 有多個副本(replicas),爲了提升可靠性,這些副本分散在不一樣的 broker 上,因爲帶寬、讀寫性能、網絡延遲等因素,同一時刻,這些副本的狀態一般是不一致的:即 followers 與 leader 的狀態不一致。那麼,如何保證新選舉出的 leader 是優選呢? Kafka 機制中,leader 將負責維護和跟蹤一個 ISR(In-Sync Replicas)列表,即同步副本隊列,這個列表裏面的副本與 leader 保持同步,狀態一致。若是新的 leader 從 ISR 列表中的副本中選出,那麼就能夠保證新 leader 爲優選。固然,這不是惟一的策略,下文將繼續解讀。
上一節中講到了同步副本隊列 ISR(In-Sync Replicas)。雖然副本極大的加強了可用性,可是副本數量對 Kafka 的吞吐率有必定影響。默認狀況下 Kafka 的 replica 數量爲 1,即每一個 partition 都只有惟一的 leader,無 follower,沒有容災能力。爲了確保消息的可靠性,生產環境中,一般將其值(由 broker 的參數 offsets.topic.replication.factor 指定)大小設置爲大於 1,好比 3。 全部的副本(replicas)統稱爲 Assigned Replicas,即 AR。ISR 是 AR 中的一個子集,由 leader 維護 ISR 列表,follower 從 leader 同步數據有一些延遲(由參數 replica.lag.time.max.ms 設置超時閾值),超過閾值的 follower 將被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也會先存放在 OSR 中。AR=ISR+OSR。
注:ISR中包括:leader + 與leader保持同步的followers。
上面一節還涉及到一個概念,即 HW。HW 俗稱高水位,HighWatermark 的縮寫,取一個 partition 對應的 ISR 中最小的 LEO 做爲 HW,consumer 最多隻能消費到 HW 所在的位置。另外每一個 replica 都有 HW,leader 和 follower 各自負責更新本身的 HW 的狀態。對於 leader 新寫入的消息,consumer 不能馬上消費,leader 會等待該消息被全部 ISR 中的 replicas 同步後更新 HW,此時消息才能被 consumer 消費。這樣就保證了若是 leader 所在的 broker 失效,該消息仍然能夠重新選舉的 leader 中獲取。對於來自內部 broker 的讀取請求,沒有 HW 的限制。
下圖詳細的說明了當 producer 生產消息至 broker 後,ISR 以及 HW 和 LEO 的流轉過程:
因而可知,Kafka 的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,同步複製要求全部能工做的 follower 都複製完,這條消息纔會被 commit,這種複製方式受限於複製最慢的 follower,會極大的影響吞吐率。而異步複製方式下,follower 異步的從 leader 複製數據,數據只要被 leader 寫入 log 就被認爲已經 commit,這種狀況下若是 follower 都尚未複製完,落後於 leader 時,忽然 leader 宕機,則會丟失數據,下降可靠性。而 Kafka 使用 ISR 的策略則在可靠性和吞吐率方面取得了較好的平衡。
Kafka 的 ISR 的管理最終都會反饋到 ZooKeeper 節點上,具體位置爲:
/brokers/topics/[topic]/partitions/[partition]/state
目前,有兩個地方會對這個 ZooKeeper 的節點進行維護。
Controller 來維護:Kafka 集羣中的其中一個 Broker 會被選舉爲 Controller,主要負責 Partition 管理和副本狀態管理,也會執行相似於重分配 partition 之類的管理任務。在符合某些特定條件下,Controller 下的 LeaderSelector 會選舉新的 leader,ISR 和新的 leader_epoch
及 controller_epoch
寫入 ZooKeeper 的相關節點中。同時發起 LeaderAndIsrRequest 通知全部的 replicas。
leader 來維護:leader 有單獨的線程按期檢測 ISR 中 follower 是否脫離 ISR,若是發現 ISR 變化,則會將新的 ISR 的信息返回到 ZooKeeper 的相關節點中。
當 producer 向 leader 發送數據時,能夠經過 request.required.acks 參數來設置數據可靠性的級別:
1. request.required.acks = 1
這是默認狀況,即:producer 發送數據到 leader,leader 寫本地日誌成功,返回客戶端成功;此時 ISR 中的其它副本尚未來得及拉取該消息,若是此時 leader 宕機了,那麼這次發送的消息就會丟失。
2. request.required.acks = 0
producer 不停向leader發送數據,而不須要 leader 反饋成功消息,這種狀況下數據傳輸效率最高,可是數據可靠性確是最低的。可能在發送過程當中丟失數據,可能在 leader 宕機時丟失數據。
3. request.required.acks = -1(all)
producer 發送數據給 leader,leader 收到數據後要等到 ISR 列表中的全部副本都同步數據完成後(強一致性),才向生產者返回成功消息,若是一直收不到成功消息,則認爲發送數據失敗會自動重發數據。這是可靠性最高的方案,固然,性能也會受到必定影響。
**注意:參數 min.insync.replicas **
若是要提升數據的可靠性,在設置 request.required.acks=-1 的同時,還需參數 min.insync.replicas 配合,如此才能發揮最大的功效。min.insync.replicas 這個參數用於設定 ISR 中的最小副本數,默認值爲1,當且僅當 request.required.acks 參數設置爲-1時,此參數才生效。當 ISR 中的副本數少於 min.insync.replicas 配置的數量時,客戶端會返回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required
。不難理解,若是 min.insync.replicas 設置爲 2,當 ISR 中實際副本數爲 1 時(只有leader),將沒法保證可靠性,此時拒絕客戶端的寫請求以防止消息丟失。
考慮這樣一種場景:acks=-1,部分 ISR 副本完成同步,此時leader掛掉,以下圖所示:follower1 同步了消息 四、5,follower2 同步了消息 4,與此同時 follower2 被選舉爲 leader,那麼此時 follower1 中的多出的消息 5 該作如何處理呢?
這裏就須要 HW 的協同配合了。如前所述,一個 partition 中的 ISR 列表中,leader 的 HW 是全部 ISR 列表裏副本中最小的那個的 LEO。相似於木桶原理,水位取決於最低那塊短板。
如上圖,某個 topic 的某 partition 有三個副本,分別爲 A、B、C。A 做爲 leader 確定是 LEO 最高,B 緊隨其後,C 機器因爲配置比較低,網絡比較差,故而同步最慢。這個時候 A 機器宕機,這時候若是 B 成爲 leader,假如沒有 HW,在 A 從新恢復以後會作同步(makeFollower) 操做,在宕機時 log 文件以後直接作追加操做,而假如 B 的 LEO 已經達到了 A 的 LEO,會產生數據不一致的狀況,因此使用 HW 來避免這種狀況。 A 在作同步操做的時候,先將 log 文件截斷到以前本身的 HW 的位置,即 3,以後再從 B 中拉取消息進行同步。
若是失敗的 follower 恢復過來,它首先將本身的 log 文件截斷到上次 checkpointed 時刻的 HW 的位置,以後再從 leader 中同步消息。leader 掛掉會從新選舉,新的 leader 會發送 「指令」 讓其他的 follower 截斷至自身的 HW 的位置而後再拉取新的消息。
當 ISR 中的個副本的 LEO 不一致時,若是此時 leader 掛掉,選舉新的 leader 時並非按照 LEO 的高低進行選舉,而是按照 ISR 中的順序選舉。
爲了保證可靠性,對於任意一條消息,只有它被 ISR 中的全部 follower 都從 leader 複製過去纔會被認爲已提交,並返回信息給 producer。如此,能夠避免因部分數據被寫進 leader,而還沒有被任何 follower 複製就宕機的狀況下而形成數據丟失。對於 producer 而言,它能夠選擇是否等待消息 commit,這能夠經過參數 request.required.acks 來設置。這種機制能夠確保:只要 ISR 中有一個或者以上的 follower,一條被 commit 的消息就不會丟失。
問題 1:如何在保證可靠性的前提下避免吞吐量降低?
有一個很重要的問題是當 leader 宕機了,怎樣在 follower 中選舉出新的 leader,由於 follower 可能落後不少或者直接 crash 了,因此必須確保選擇 「最新」 的 follower 做爲新的 leader。一個基本的原則就是,若是 leader 掛掉,新的 leader 必須擁有原來的 leader 已經 commit 的全部消息,這不就是 ISR 中副本的特徵嗎?
可是,存在一個問題,ISR 列表維持多大的規模合適呢?換言之,leader 在一個消息被 commit 前須要等待多少個 follower 確認呢?等待 follower 的數量越多,與 leader 保持同步的 follower 就越多,可靠性就越高,但這也會形成吞吐率的降低。
少數服從多數的選舉原則
一種經常使用的選舉 leader 的策略是 「少數服從多數」 ,不過,Kafka 並非採用這種方式。這種模式下,若是有 2f+1 個副本,那麼在 commit 以前必須保證有 f+1 個 replica 複製完消息,同時爲了保證能正確選舉出新的 leader,失敗的副本數不能超過 f 個。這種方式有個很大的優點,系統的延遲取決於最快的幾臺機器,也就是說好比副本數爲 3,那麼延遲就取決於最快的那個 follower 而不是最慢的那個。
「少數服從多數」 的策略也有一些劣勢,爲了保證 leader 選舉的正常進行,它所能容忍的失敗的 follower 數比較少,若是要容忍 1 個 follower 掛掉,那麼至少要 3 個以上的副本,若是要容忍 2 個 follower 掛掉,必需要有 5 個以上的副本。也就是說,在生產環境下爲了保證較高的容錯率,必需要有大量的副本,而大量的副本又會在大數據量下致使性能的急劇降低。這種算法更多用在 ZooKeeper 這種共享集羣配置的系統中,而不多在須要大量數據的系統中使用。
Kafka 選舉 leader 的策略是怎樣的?
實際上,leader 選舉的算法很是多,好比 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka 所使用的 leader 選舉算法更像是微軟的 PacificA 算法。
Kafka 在 ZooKeeper 中爲每個 partition 動態的維護了一個 ISR,這個 ISR 裏的全部 replica 都與 leader 保持同步,只有 ISR 裏的成員纔能有被選爲 leader 的可能(經過參數配置:unclean.leader.election.enable=false
)。在這種模式下,對於 f+1 個副本,一個 Kafka topic 能在保證不丟失已經 commit 消息的前提下容忍 f 個副本的失敗,在大多數使用場景下,這種模式是十分有利的。事實上,對於任意一條消息,只有它被 ISR 中的全部 follower 都從 leader 複製過去纔會被認爲已提交,並返回信息給 producer,從而保證可靠性。但與 「少數服從多數」 策略不一樣的是,Kafka ISR 列表中副本的數量不須要超過副本總數的一半,即不須要知足 「多數派」 原則,一般,ISR 列表副本數大於等於 2 便可,如此,便在可靠性和吞吐量方面取得平衡。
極端狀況下的 leader 選舉策略
前已述及,當 ISR 中至少有一個 follower 時(ISR 包括 leader),Kafka 能夠確保已經 commit 的消息不丟失,但若是某一個 partition 的全部 replica 都掛了,天然就沒法保證數據不丟失了。這種狀況下如何進行 leader 選舉呢?一般有兩種方案:
如何選擇呢?這就須要在可用性和一致性當中做出抉擇。若是必定要等待 ISR 中的 replica 恢復過來,不可用的時間就可能會相對較長。並且若是 ISR 中全部的 replica 都沒法恢復了,或者數據丟失了,這個 partition 將永遠不可用。
選擇第一個恢復過來的 replica 做爲 leader,若是這個 replica 不是 ISR 中的 replica,那麼,它可能並不具有全部已經 commit 的消息,從而形成消息丟失。默認狀況下,Kafka 採用第二種策略,即 unclean.leader.election.enable=true
,也能夠將此參數設置爲 false 來啓用第一種策略。
unclean.leader.election.enable
這個參數對於 leader 的選舉、系統的可用性以及數據的可靠性都有相當重要的影響。生產環境中應慎重權衡。
ZooKeeper 是一個分佈式的、開放源碼的分佈式應用程序協調服務,是 Google 的 Chubby 一個開源的實現。分佈式應用程序能夠基於它實現統一命名服務、狀態同步服務、集羣管理、分佈式應用配置項的管理等工做。在基於 Kafka 的分佈式消息隊列中,ZooKeeper 的做用有:broker 註冊、topic 註冊、producer 和 consumer 負載均衡、維護 partition 與 consumer 的關係、記錄消息消費的進度以及 consumer 註冊等。
/brokers/ids/{broker.id}
;在 Kafka 中,全部 topic 與 broker 的對應關係都由 ZooKeeper 進行維護,在 ZooKeeper 中,創建專門的節點來記錄這些信息,其節點路徑爲 /brokers/topics/{topic_name}
。 前面說過,爲了保障數據的可靠性,每一個 Topic 的 Partitions 其實是存在備份的,而且備份的數量由 Kafka 機制中的 replicas 來控制。那麼問題來了:以下圖所示,假設某個 TopicA 被分爲 2 個 Partitions,而且存在兩個備份,因爲這 2 個 Partitions(1-2)被分佈在不一樣的 broker 上,同一個 partiton 與其備份不能(也不該該)存儲於同一個 broker 上。以 Partition1 爲例,假設它被存儲於 broker2,其對應的備份分別存儲於 broker1 和 broker4,有了備份,可靠性獲得保障,但數據一致性倒是個問題。
爲了保障數據的一致性,ZooKeeper 機制得以引入。基於 ZooKeeper,Kafka 爲每個 partition 找一個節點做爲 leader,其他備份做爲 follower;接續上圖的例子,就 TopicA 的 partition1 而言,若是位於 broker2(Kafka 節點)上的 partition1 爲 leader,那麼位於 broker1 和 broker4 上面的 partition1 就充當 follower,則有下圖:
基於上圖的架構,當 producer push 的消息寫入 partition(分區) 時,做爲 leader 的 broker(Kafka 節點) 會將消息寫入本身的分區,同時還會將此消息複製到各個 follower,實現同步。若是,某個follower 掛掉,leader 會再找一個替代並同步消息;若是 leader 掛了,follower 們會選舉出一個新的 leader 替代,繼續業務,這些都是由 ZooKeeper 完成的。
註冊新的消費者分組
當新的消費者組註冊到 ZooKeeper 中時,ZooKeeper 會建立專用的節點來保存相關信息,其節點路徑爲 ls/consumers/{group_id}
,其節點下有三個子節點,分別爲 [ids, owners, offsets]
。
註冊新的消費者
當新的消費者註冊到 Kafka 中時,會在 /consumers/{group_id}/ids
節點下建立臨時子節點,並記錄相關信息。
監聽消費者分組中消費者的變化
每一個消費者都要關注其所屬消費者組中消費者數目的變化,即監聽 /consumers/{group_id}/ids
下子節點的變化。一單發現消費者新增或減小,就會觸發消費者的負載均衡。
對於同一個 topic 的不一樣 partition,Kafka會盡力將這些 partition 分佈到不一樣的 broker 服務器上,這種均衡策略其實是基於 ZooKeeper 實現的。在一個 broker 啓動時,會首先完成 broker 的註冊過程,並註冊一些諸如 「有哪些可訂閱的 topic」 之類的元數據信息。producers 啓動後也要到 ZooKeeper 下注冊,建立一個臨時節點來監聽 broker 服務器列表的變化。因爲在 ZooKeeper 下 broker 建立的也是臨時節點,當 brokers 發生變化時,producers 能夠獲得相關的通知,從改變本身的 broker list。其它的諸如 topic 的變化以及broker 和 topic 的關係變化,也是經過 ZooKeeper 的這種 Watcher 監聽實現的。
在生產中,必須指定 topic;可是對於 partition,有兩種指定方式:
RD_KAFKA_PARTITION_UA
,則 Kafka 會回調 partitioner 進行均衡選取,partitioner 方法須要本身實現。能夠輪詢或者傳入 key 進行 hash。未實現則採用默認的隨機方法 rd_kafka_msg_partitioner_random
隨機選擇。Kafka 保證同一 consumer group 中只有一個 consumer 可消費某條消息,實際上,Kafka 保證的是穩定狀態下每個 consumer 實例只會消費某一個或多個特定的數據,而某個 partition 的數據只會被某一個特定的 consumer 實例所消費。這樣設計的劣勢是沒法讓同一個 consumer group 裏的 consumer 均勻消費數據,優點是每一個 consumer 不用都跟大量的 broker 通訊,減小通訊開銷,同時也下降了分配難度,實現也更簡單。另外,由於同一個 partition 裏的數據是有序的,這種設計能夠保證每一個 partition 裏的數據也是有序被消費。
consumer 數量不等於 partition 數量
若是某 consumer group 中 consumer 數量少於 partition 數量,則至少有一個 consumer 會消費多個 partition 的數據;若是 consumer 的數量與 partition 數量相同,則正好一個 consumer 消費一個 partition 的數據,而若是 consumer 的數量多於 partition 的數量時,會有部分 consumer 沒法消費該 topic 下任何一條消息。
藉助 ZooKeeper 實現負載均衡
關於負載均衡,對於某些低級別的 API,consumer 消費時必須指定 topic 和 partition,這顯然不是一種友好的均衡策略。基於高級別的 API,consumer 消費時只需制定 topic,藉助 ZooKeeper 能夠根據 partition 的數量和 consumer 的數量作到均衡的動態配置。
consumers 在啓動時會到 ZooKeeper 下以本身的 conusmer-id 建立臨時節點 /consumer/[group-id]/ids/[conusmer-id]
,並對 /consumer/[group-id]/ids
註冊監聽事件,當消費者發生變化時,同一 group 的其他消費者會獲得通知。固然,消費者還要監聽 broker 列表的變化。librdkafka 一般會將 partition 進行排序後,根據消費者列表,進行輪流的分配。
在 consumer 對指定消息 partition 的消息進行消費的過程當中,須要定時地將 partition 消息的消費進度 Offset 記錄到 ZooKeeper上,以便在該 consumer 進行重啓或者其它 consumer 從新接管該消息分區的消息消費權後,可以從以前的進度開始繼續進行消息消費。Offset 在 ZooKeeper 中由一個專門節點進行記錄,其節點路徑爲:
#節點內容就是Offset的值。 /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推薦將 consumer 的 Offset 信息保存在 Kafka 內部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
而且默認提供了 kafka_consumer_groups.sh
腳本供用戶查看consumer 信息(命令:sh kafka-consumer-groups.sh –bootstrap-server * –describe –group *
)。在當前版本中,offset 存儲方式要麼存儲在本地文件中,要麼存儲在 broker 端,具體的存儲方式取決 offset.store.method
的配置,默認是存儲在 broker 端。
consumer group 下有多個 consumer(消費者),對於每一個消費者組(consumer group),Kafka都會爲其分配一個全局惟一的 group ID,group 內部的全部消費者共享該 ID。訂閱的 topic 下的每一個分區只能分配給某個 group 下的一個consumer(固然該分區還能夠被分配給其它 group)。同時,Kafka 爲每一個消費者分配一個 consumer ID,一般採用 hostname:UUID
形式表示。
在Kafka中,規定了每一個 partition 只能被同組的一個消費者進行消費,所以,須要在 ZooKeeper 上記錄下 partition 與 consumer 之間的關係,每一個 consumer 一旦肯定了對一個 partition 的消費權力,須要將其 consumer ID 寫入到 ZooKeeper 對應消息分區的臨時節點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id
] 就是一個消息分區的標識,節點內容就是該消息分區 消費者的 consumer ID。
producer 採用 push 模式將消息發佈到 broker,每條消息都被 append 到 patition 中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪個 partition。
其路由機制爲:
寫入流程:
物理上把 topic 分紅一個或多個 patition,每一個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的全部消息和索引文件)
high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內的一個 consumer 所消費,且 consumer 消費消息時不關注 offset,最後一個 offset 由 ZooKeeper 保存(下次消費時,該group 中的consumer將從offset記錄的位置開始消費)。
注意:
consumer 採用 pull 模式從 broker 中讀取數據。
push 模式很難適應消費速率不一樣的消費者,由於消息發送速率是由 broker 決定的。它的目標是儘量以最快速度傳遞消息,可是這樣很容易形成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則能夠根據 consumer 的消費能力以適當的速率消費消息。
對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
本內容轉載自https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html