若是隻是爲了開發 Kafka 應用程序,或者只是在生產環境使用 Kafka,那麼瞭解 Kafka 的內部工做原理不是必須的。不過,瞭解 Kafka 的內部工做原理有助於理解 Kafka 的行爲,也利用快速診斷問題。下面咱們來探討一下這三個問題node
Kafka 是如何進行復制的
Kafka 是如何處理來自生產者和消費者的請求的
Kafka 的存儲細節是怎樣的程序員
若是感興趣的話,就請花費你一些時間,耐心看完這篇文章。瀏覽器
集羣成員間的關係緩存
咱們知道,Kafka 是運行在 ZooKeeper 之上的,由於 ZooKeeper 是以集羣形式出現的,因此 Kafka 也能夠以集羣形式出現。這也就涉及到多個生產者和多個消費者如何協調的問題,這個維護集羣間的關係也是由 ZooKeeper 來完成的。若是你看過我以前的文章(真的,關於 Kafka 入門看這一篇就夠了),你應該會知道,Kafka 集羣間會有多個 主機(broker),每一個 broker 都會有一個 broker.id,每一個 broker.id 都有一個惟一的標識符用來區分,這個標識符能夠在配置文件裏手動指定,也能夠自動生成。
Kafka 能夠經過 broker.id.generation.enable 和 reserved.broker.max.id 來配合生成新的 broker.id。
broker.id.generation.enable參數是用來配置是否開啓自動生成 broker.id 的功能,默認狀況下爲true,即開啓此功能。自動生成的broker.id有一個默認值,默認值爲1000,也就是說默認狀況下自動生成的 broker.id 從1001開始。
Kafka 在啓動時會在 ZooKeeper 中 /brokers/ids 路徑下注冊一個與當前 broker 的 id 相同的臨時節點。Kafka 的健康狀態檢查就依賴於此節點。當有 broker 加入集羣或者退出集羣時,這些組件就會得到通知。
若是你要啓動另一個具備相同 ID 的 broker,那麼就會獲得一個錯誤 —— 新的 broker 會試着進行註冊,但不會成功,由於 ZooKeeper 裏面已經有一個相同 ID 的 broker。
在 broker 停機、出現分區或者長時間垃圾回收停頓時,broker 會從 ZooKeeper 上斷開鏈接,此時 broker 在啓動時建立的臨時節點會從 ZooKeeper 中移除。監聽 broker 列表的 Kafka 組件會被告知該 broker 已移除。
在關閉 broker 時,它對應的節點也會消失,不過它的 ID 會繼續存在其餘數據結構中,例如主題的副本列表中,副本列表複製咱們下面再說。在徹底關閉一個 broker 以後,若是使用相同的 ID 啓動另外一個全新的 broker,它會馬上加入集羣,並擁有一個與舊 broker 相同的分區和主題。安全
Broker Controller 的做用服務器
咱們以前在講 Kafka Rebalance 重平衡的時候,提過一個羣組協調器,負責協調羣組間的關係,那麼 broker 之間也有一個控制器組件(Controller),它是 Kafka 的核心組件。它的主要做用是在 ZooKeeper 的幫助下管理和協調整個 Kafka 集羣,集羣中的每一個 broker 均可以稱爲 controller,可是在 Kafka 集羣啓動後,只有一個 broker 會成爲 Controller 。既然 Kafka 集羣是依賴於 ZooKeeper 集羣的,因此有必要先介紹一下 ZooKeeper 是什麼,能夠參考做者的這一篇文章(ZooKeeper不只僅是註冊中心,你還知道有哪些?)詳細瞭解,在這裏就簡單提一下 znode 節點的問題。
ZooKeeper 的數據是保存在節點上的,每一個節點也被稱爲znode,znode 節點是一種樹形的文件結構,它很像 Linux 操做系統的文件路徑,ZooKeeper 的根節點是 /。網絡
znode 根據數據的持久化方式可分爲臨時節點和持久性節點。持久性節點不會由於 ZooKeeper 狀態的變化而消失,可是臨時節點會隨着 ZooKeeper 的重啓而自動消失。
znode 節點有一個 Watcher 機制:當數據發生變化的時候, ZooKeeper 會產生一個 Watcher 事件,而且會發送到客戶端。Watcher 監聽機制是 Zookeeper 中很是重要的特性,咱們基於 Zookeeper 上建立的節點,能夠對這些節點綁定監聽事件,好比能夠監聽節點數據變動、節點刪除、子節點狀態變動等事件,經過這個事件機制,能夠基於 ZooKeeper 實現分佈式鎖、集羣管理等功能。
控制器的選舉
Kafka 當前選舉控制器的規則是:Kafka 集羣中第一個啓動的 broker 經過在 ZooKeeper 裏建立一個臨時節點 /controller 讓本身成爲 controller 控制器。其餘 broker 在啓動時也會嘗試建立這個節點,可是因爲這個節點已存在,因此後面想要建立 /controller 節點時就會收到一個 節點已存在 的異常。而後其餘 broker 會在這個控制器上註冊一個 ZooKeeper 的 watch 對象,/controller 節點發生變化時,其餘 broker 就會收到節點變動通知。這種方式能夠確保只有一個控制器存在。那麼只有單獨的節點必定是有個問題的,那就是單點問題。
若是控制器關閉或者與 ZooKeeper 斷開連接,ZooKeeper 上的臨時節點就會消失。集羣中的其餘節點收到 watch 對象發送控制器下線的消息後,其餘 broker 節點都會嘗試讓本身去成爲新的控制器。其餘節點的建立規則和第一個節點的建立原則一致,都是第一個在 ZooKeeper 裏成功建立控制器節點的 broker 會成爲新的控制器,那麼其餘節點就會收到節點已存在的異常,而後在新的控制器節點上再次建立 watch 對象進行監聽。
控制器的做用
那麼說了這麼多,控制是什麼呢?控制器的做用是什麼呢?或者說控制器的這麼一個組件被設計用來幹什麼?彆着急,接下來咱們就要說一說。
Kafka 被設計爲一種模擬狀態機的多線程控制器,它能夠做用有下面這幾點
控制器至關於部門(集羣)中的部門經理(broker controller),用於管理部門中的部門成員(broker)
控制器是全部 broker 的一個監視器,用於監控 broker 的上線和下線
在 broker 宕機後,控制器可以選舉新的分區 Leader
控制器可以和 broker 新選取的 Leader 發送消息
再細分一下能夠具體分爲以下 5 點
主題管理 : Kafka Controller 能夠幫助咱們完成對 Kafka 主題建立、刪除和增長分區的操做,簡而言之就是對分區擁有最高行使權。
換句話說,當咱們執行kafka-topics 腳本時,大部分的後臺工做都是控制器來完成的。
分區重分配: 分區重分配主要是指,kafka-reassign-partitions 腳本提供的對已有主題分區進行細粒度的分配功能。這部分功能也是控制器實現的。
Prefered 領導者選舉 : Preferred 領導者選舉主要是 Kafka 爲了不部分 Broker 負載太重而提供的一種換 Leader 的方案。
集羣成員管理: 主要管理 新增 broker、broker 關閉、broker 宕機
數據服務: 控制器的最後一大類工做,就是向其餘 broker 提供數據服務。控制器上保存了最全的集羣元數據信息,其餘全部 broker 會按期接收控制器發來的元數據更新請求,從而更新其內存中的緩存數據。這些數據咱們會在下面討論
當控制器發現一個 broker 離開集羣(經過觀察相關 ZooKeeper 路徑),控制器會收到消息:這個 broker 所管理的那些分區須要一個新的 Leader。控制器會依次遍歷每一個分區,肯定誰可以做爲新的 Leader,而後向全部包含新 Leader 或現有 Follower 的分區發送消息,該請求消息包含誰是新的 Leader 以及誰是 Follower 的信息。隨後,新的 Leader 開始處理來自生產者和消費者的請求,Follower 用於重新的 Leader 那裏進行復制。
這就很像外包公司的一個部門,這個部門就是專門出差的,每一個人在不一樣的地方辦公,可是中央總部有一個部門經理,如今部門經理忽然離職了。公司不打算外聘人員,決定從部門內部選一個能力強的人當領導,而後當上領導的人須要向本身的組員發送消息,這條消息就是任命消息和明確他管理了哪些人,你們都知道了,而後再各自給部門幹活。
當控制器發現一個 broker 加入集羣時,它會使用 broker ID 來檢查新加入的 broker 是否包含現有分區的副本。若是有控制器就會把消息發送給新加入的 broker 和 現有的 broker。
上面這塊關於分區複製的內容咱們接下來會說到。
broker controller 數據存儲
上面咱們介紹到 broker controller 會提供數據服務,用於保存大量的 Kafka 集羣數據。以下圖
能夠對上面保存信息歸類,主要分爲三類
broker 上的全部信息,包括 broker 中的全部分區,broker 全部分區副本,當前都有哪些運行中的 broker,哪些正在關閉中的 broker 。
全部主題信息,包括具體的分區信息,好比領導者副本是誰,ISR 集合中有哪些副本等。
全部涉及運維任務的分區。包括當前正在進行 Preferred 領導者選舉以及分區重分配的分區列表。
Kafka 是離不開 ZooKeeper的,因此這些數據信息在 ZooKeeper 中也保存了一份。每當控制器初始化時,它都會從 ZooKeeper 上讀取對應的元數據並填充到本身的緩存中。
broker controller 故障轉移
咱們在前面說過,第一個在 ZooKeeper 中的 /brokers/ids下建立節點的 broker 做爲 broker controller,也就是說 broker controller 只有一個,那麼必然會存在單點失效問題。kafka 爲考慮到這種狀況提供了故障轉移功能,也就是 Fail Over。以下圖
最一開始,broker1 會搶先註冊成功成爲 controller,而後因爲網絡抖動或者其餘緣由導致 broker1 掉線,ZooKeeper 經過 Watch 機制覺察到 broker1 的掉線,以後全部存活的 brokers 開始競爭成爲 controller,這時 broker3 搶先註冊成功,此時 ZooKeeper 存儲的 controller 信息由 broker1 -> broker3,以後,broker3 會從 ZooKeeper 中讀取元數據信息,並初始化到本身的緩存中。
注意:ZooKeeper 中存儲的不是緩存信息,broker 中存儲的纔是緩存信息。
broker controller 存在的問題
在 Kafka 0.11 版本以前,控制器的設計是至關繁瑣的。咱們上面提到過一句話:Kafka controller 被設計爲一種模擬狀態機的多線程控制器,這種設計實際上是存在一些問題的
controller 狀態的更改由不一樣的監聽器併發執行,所以須要進行很複雜的同步,而且容易出錯並且難以調試。
狀態傳播不一樣步,broker 可能在時間不肯定的狀況下出現多種狀態,這會致使沒必要要的額外的數據丟失。
controller 控制器還會爲主題刪除建立額外的 I/O 線程,致使性能損耗。
controller 的多線程設計還會訪問共享數據,咱們知道,多線程訪問共享數據是線程同步最麻煩的地方,爲了保護數據安全性,控制器不得不在代碼中大量使用ReentrantLock 同步機制,這就進一步拖慢了整個控制器的處理速度。
broker controller 內部設計原理
在 Kafka 0.11 以後,Kafka controller 採用了新的設計,把多線程的方案改爲了單線程加事件隊列的方案。以下圖所示
主要所作的改變有下面這幾點
第一個改進是增長了一個 Event Executor Thread,事件執行線程,從圖中能夠看出,無論是 Event Queue 事件隊列仍是 Controller context 控制器上下文都會交給事件執行線程進行處理。將原來執行的操做所有建模成一個個獨立的事件,發送到專屬的事件隊列中,供此線程消費。
第二個改進是將以前同步的 ZooKeeper 所有改成異步操做。ZooKeeper API 提供了兩種讀寫的方式:同步和異步。以前控制器操做 ZooKeeper 都是採用的同步方式,此次把同步方式改成異步,據測試,效率提高了10倍。
第三個改進是根據優先級處理請求,以前的設計是 broker 會公平性的處理全部 controller 發送的請求。什麼意思呢?公平性難道還很差嗎?在某些狀況下是的,好比 broker 在排隊處理 produce 請求,這時候 controller 發出了一個 StopReplica 的請求,你會怎麼辦?還在繼續處理 produce 請求嗎?這個 produce 請求還有用嗎?此時最合理的處理順序應該是,賦予 StopReplica 請求更高的優先級,使它可以獲得搶佔式的處理。
副本機制session
複製功能是 Kafka 架構的核心功能,在 Kafka 文檔裏面 Kafka 把本身描述爲 一個分佈式的、可分區的、可複製的提交日誌服務。複製之因此這麼關鍵,是由於消息的持久存儲很是重要,這可以保證在主節點宕機後依舊可以保證 Kafka 高可用。副本機制也能夠稱爲備份機制(Replication),一般指分佈式系統在多臺網絡交互的機器上保存有相同的數據備份/拷貝。
Kafka 使用主題來組織數據,每一個主題又被分爲若干個分區,分區會部署在一到多個 broker 上,每一個分區都會有多個副本,因此副本也會被保存在 broker 上,每一個 broker 可能會保存成千上萬個副本。下圖是一個副本複製示意圖
如上圖所示,爲了簡單我只畫出了兩個 broker ,每一個 broker 指保存了一個 Topic 的消息,在 broker1 中分區0 是Leader,它負責進行分區的複製工做,把 broker1 中的分區0複製一個副本到 broker2 的主題 A 的分區0。同理,主題 A 的分區1也是同樣的道理。
副本類型分爲兩種:一種是 Leader(領導者) 副本,一種是Follower(跟隨者)副本。
Leader 副本
Kafka 在建立分區的時候都要選舉一個副本,這個選舉出來的副本就是 Leader 領導者副本。
Follower 副本
除了 Leader 副本之外的副本統稱爲 Follower 副本,Follower 不對外提供服務。下面是 Leader 副本的工做方式
這幅圖須要注意如下幾點
Kafka 中,Follower 副本也就是追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的請求。全部的請求都是由領導者副原本處理。或者說,全部的請求都必須發送到 Leader 副本所在的 broker 中,Follower 副本只是用做數據拉取,採用異步拉取的方式,並寫入到本身的提交日誌中,從而實現與 Leader 的同步
當 Leader 副本所在的 broker 宕機後,Kafka 依託於 ZooKeeper 提供的監控功能可以實時感知到,並開啓新一輪的選舉,從追隨者副本中選一個做爲 Leader。若是宕機的 broker 重啓完成後,該分區的副本會做爲 Follower 從新加入。
首領的另外一個任務是搞清楚哪一個跟隨者的狀態與本身是一致的。跟隨者爲了保證與領導者的狀態一致,在有新消息到達以前先嚐試從領導者那裏複製消息。爲了與領導者保持一致,跟隨者向領導者發起獲取數據的請求,這種請求與消費者爲了讀取消息而發送的信息是同樣的。
跟隨者向領導者發送消息的過程是這樣的,先請求消息1,而後再接收到消息1,在時候到請求1以後,發送請求2,在收到領導者給發送給跟隨者以前,跟隨者是不會繼續發送消息的。這個過程以下
跟隨者副本在收到響應消息前,是不會繼續發送消息,這一點很重要。經過查看每一個跟隨者請求的最新偏移量,首領就會知道每一個跟隨者複製的進度。若是跟隨者在10s 內沒有請求任何消息,或者雖然跟隨者已經發送請求,可是在10s 內沒有收到消息,就會被認爲是不一樣步的。若是一個副本沒有與領導者同步,那麼在領導者掉線後,這個副本將不會稱爲領導者,由於這個副本的消息不是所有的。
與之相反的,若是跟隨者同步的消息和領導者副本的消息一致,那麼這個跟隨者副本又被稱爲同步的副本。也就是說,若是領導者掉線,那麼只有同步的副本可以稱爲領導者。
關於副本機制咱們說了這麼多,那麼副本機制的好處是什麼呢?
可以馬上看到寫入的消息,就是你使用生產者 API 成功向分區寫入消息後,立刻使用消費者就能讀取剛纔寫入的消息
可以實現消息的冪等性,啥意思呢?就是對於生產者產生的消息,在消費者進行消費的時候,它每次都會看到消息存在,並不會存在消息不存在的狀況
同步複製和異步複製
我在學習副本機制的時候,有個疑問,既然領導者副本和跟隨者副本是發送 - 等待機制的,這是一種同步的複製方式,那麼爲何說跟隨者副本同步領導者副本的時候是一種異步操做呢?
我認爲是這樣的,跟隨者副本在同步領導者副本後會把消息保存在本地 log 中,這個時候跟隨者會給領導者副本一個響應消息,告訴領導者本身已經保存成功了,同步複製的領導者會等待全部的跟隨者副本都寫入成功後,再返回給 producer 寫入成功的消息。而異步複製是領導者副本不須要關心跟隨者副本是否寫入成功,只要領導者副本本身把消息保存到本地 log ,就會返回給 producer 寫入成功的消息。下面是同步複製和異步複製的過程
同步複製
producer 通知 ZooKeeper 識別領導者
producer 向領導者寫入消息
領導者收到消息後會把消息寫入到本地 log
跟隨者會從領導者那裏拉取消息
跟隨者向本地寫入 log
跟隨者向領導者發送寫入成功的消息
領導者會收到全部的跟隨者發送的消息
領導者向 producer 發送寫入成功的消息
異步複製
和同步複製的區別在於,領導者在寫入本地log以後,直接向客戶端發送寫入成功消息,不須要等待全部跟隨者複製完成。
ISR
Kafka動態維護了一個同步狀態的副本的集合(a set of In-Sync Replicas),簡稱ISR,ISR 也是一個很重要的概念,咱們以前說過,追隨者副本不提供服務,只是按期的異步拉取領導者副本的數據而已,拉取這個操做就至關因而複製,ctrl-c + ctrl-v你們確定用的熟。那麼是否是說 ISR 集合中的副本消息的數量都會與領導者副本消息數量同樣呢?那也不必定,判斷的依據是 broker 中參數 replica.lag.time.max.ms 的值,這個參數的含義就是跟隨者副本可以落後領導者副本最長的時間間隔。
replica.lag.time.max.ms 參數默認的時間是 10秒,若是跟隨者副本落後領導者副本的時間不超過 10秒,那麼 Kafka 就認爲領導者和跟隨者是同步的。即便此時跟隨者副本中存儲的消息要小於領導者副本。若是跟隨者副本要落後於領導者副本 10秒以上的話,跟隨者副本就會從 ISR 被剔除。假若該副本後面慢慢地追上了領導者的進度,那麼它是可以從新被加回 ISR 的。這也代表,ISR 是一個動態調整的集合,而非靜態不變的。
Unclean 領導者選舉
既然 ISR 是能夠動態調整的,那麼必然會出現 ISR 集合中爲空的狀況,因爲領導者副本是必定出如今 ISR 集合中的,那麼 ISR 集合爲空必然說明領導者副本也掛了,因此此時 Kafka 須要從新選舉一個新的領導者,那麼該如何選舉呢?如今你須要轉變一下思路,咱們上面說 ISR 集合中必定是與領導者同步的副本,那麼再也不 ISR 集合中的副本必定是不與領導者同步的副本了,也就是再也不 ISR 列表中的跟隨者副本會丟失一些消息。若是你開啓 broker 端參數 unclean.leader.election.enable的話,下一個領導者就會在這些非同步的副本中選舉。這種選舉也叫作Unclean 領導者選舉。
若是你接觸過度布式項目的話你必定知道 CAP 理論,那麼這種 Unclean 領導者選舉實際上是犧牲了數據一致性,保證了 Kafka 的高可用性。
你能夠根據你的實際業務場景決定是否開啓 Unclean 領導者選舉,通常不建議開啓這個參數,由於數據的一致性要比可用性重要的多。
Kafka 請求處理流程數據結構
broker 的大部分工做是處理客戶端、分區副本和控制器發送給分區領導者的請求。這種請求通常都是請求/響應式的,我猜想你接觸最先的請求/響應的方式應該就是 HTTP 請求了。事實上,HTTP 請求能夠是同步能夠是異步的。通常正常的 HTTP 請求都是同步的,同步方式最大的一個特色是提交請求->等待服務器處理->處理完畢返回 這個期間客戶端瀏覽器不能作任何事。而異步方式最大的特色是 請求經過事件觸發->服務器處理(這時瀏覽器仍然能夠作其餘事情)-> 處理完畢。
那麼我也能夠說同步請求就是順序處理的,而異步請求的執行方式則不肯定,由於異步須要建立多個執行線程,而每一個線程的執行順序不一樣。
這裏須要注意一點,咱們只是使用 HTTP 請求來舉例子,而 Kafka 採用的是 TCP 基於 Socket 的方式進行通信
那麼這兩種方式有什麼缺點呢?
我相信聰明的你應該能立刻想到,同步的方式最大的缺點就是吞吐量太差,資源利用率極低,因爲只能順序處理請求,所以,每一個請求都必須等待前一個請求處理完畢才能獲得處理。這種方式只適用於請求發送很是不頻繁的系統。
異步的方式的缺點就是爲每一個請求都建立線程的作法開銷極大,在某些場景下甚至會壓垮整個服務。
響應式模型
說了這麼半天,Kafka 採用同步仍是異步的呢?都不是,Kafka 採用的是一種 響應式(Reactor)模型,那麼什麼是響應式模型呢?簡單的說,Reactor 模式是事件驅動架構的一種實現方式,特別適合應用於處理多個客戶端併發向服務器端發送請求的場景,以下圖所示
Kafka 的 broker 端有個 SocketServer組件,相似於處理器,SocketServer 是基於 TCP 的 Socket 鏈接的,它用於接受客戶端請求,全部的請求消息都包含一個消息頭,消息頭中都包含以下信息
Request type (也就是 API Key)
Request version(broker 能夠處理不一樣版本的客戶端請求,並根據客戶版本作出不一樣的響應)
Correlation ID --- 一個具備惟一性的數字,用於標示請求消息,同時也會出如今響應消息和錯誤日誌中(用於診斷問題)
Client ID --- 用於標示發送請求的客戶端
broker 會在它所監聽的每個端口上運行一個 Acceptor 線程,這個線程會建立一個鏈接,並把它交給 Processor(網絡線程池), Processor 的數量可使用 num.network.threads 進行配置,其默認值是3,表示每臺 broker 啓動時會建立3個線程,專門處理客戶端發送的請求。
Acceptor 線程會採用輪詢的方式將入棧請求公平的發送至網絡線程池中,所以,在實際使用過程當中,這些線程一般具備相同的機率被分配到待處理請求隊列中,而後從響應隊列獲取響應消息,把它們發送給客戶端。Processor 網絡線程池中的請求 - 響應的處理仍是比較複雜的,下面是網絡線程池中的處理流程圖
Processor 網絡線程池接收到客戶和其餘 broker 發送來的消息後,網絡線程池會把消息放到請求隊列中,注意這個是共享請求隊列,由於網絡線程池是多線程機制的,因此請求隊列的消息是多線程共享的區域,而後由 IO 線程池進行處理,根據消息的種類判斷作何處理,好比 PRODUCE 請求,就會將消息寫入到 log 日誌中,若是是FETCH請求,則從磁盤或者頁緩存中讀取消息。也就是說,IO線程池是真正作判斷,處理請求的一個組件。在IO 線程池處理完畢後,就會判斷是放入響應隊列中仍是 Purgatory 中,Purgatory 是什麼咱們下面再說,如今先說一下響應隊列,響應隊列是每一個線程所獨有的,由於響應式模型中不會關心請求發往何處,所以把響應回傳的事情就交給每一個線程了,因此也就沒必要共享了。
注意:IO 線程池能夠經過 broker 端參數 num.io.threads 來配置,默認的線程數是8,表示每臺 broker 啓動後自動建立 8 個IO 處理線程。
請求類型
下面是幾種常見的請求類型
生產請求
我在 真的,關於 Kafka 入門看這一篇就夠了 文章中提到過 acks 這個配置項的含義
簡單來說就是不一樣的配置對寫入成功的界定是不一樣的,若是 acks = 1,那麼只要領導者收到消息就表示寫入成功,若是acks = 0,表示只要領導者發送消息就表示寫入成功,根本不用考慮返回值的影響。若是 acks = all,就表示領導者須要收到全部副本的消息後才表示寫入成功。
在消息被寫入分區的首領後,若是 acks 配置的值是 all,那麼這些請求會被保存在 煉獄(Purgatory)的緩衝區中,直到領導者副本發現跟隨者副本都複製了消息,響應纔會發送給客戶端。
獲取請求
broker 獲取請求的方式與處理生產請求的方式相似,客戶端發送請求,向 broker 請求主題分區中特定偏移量的消息,若是偏移量存在,Kafka 會採用 零複製 技術向客戶端發送消息,Kafka 會直接把消息從文件中發送到網絡通道中,而不須要通過任何的緩衝區,從而得到更好的性能。
客戶端能夠設置獲取請求數據的上限和下限,上限指的是客戶端爲接受足夠消息分配的內存空間,這個限制比較重要,若是上限太大的話,頗有可能直接耗盡客戶端內存。下限能夠理解爲攢足了數據包再發送的意思,這就至關於項目經理給程序員分配了 10 個bug,程序員每次改一個 bug 就會向項目經理彙報一下,有的時候改好了有的時候可能還沒改好,這樣就增長了溝通成本和時間成本,因此下限值得就是程序員你改完10個 bug 再向我彙報!!!以下圖所示
如圖你能夠看到,在拉取消息 ---> 消息 之間是有一個等待消息積累這麼一個過程的,這個消息積累你能夠把它想象成超時時間,不過超時會跑出異常,消息積累超時後會響應回執。延遲時間能夠經過 replica.lag.time.max.ms 來配置,它指定了副本在複製消息時可被容許的最大延遲時間。
元數據請求
生產請求和響應請求都必須發送給領導者副本,若是 broker 收到一個針對某個特定分區的請求,而該請求的首領在另一個 broker 中,那麼發送請求的客戶端會收到非分區首領的錯誤響應;若是針對某個分區的請求被髮送到不含有領導者的 broker 上,也會出現一樣的錯誤。Kafka 客戶端須要把請求和響應發送到正確的 broker 上。這不是廢話麼?我怎麼知道要往哪發送?
事實上,客戶端會使用一種 元數據請求 ,這種請求會包含客戶端感興趣的主題列表,服務端的響應消息指明瞭主題的分區,領導者副本和跟隨者副本。元數據請求能夠發送給任意一個 broker,由於全部的 broker 都會緩存這些信息。
通常狀況下,客戶端會把這些信息緩存,並直接向目標 broker 發送生產請求和相應請求,這些緩存須要隔一段時間就進行刷新,使用metadata.max.age.ms 參數來配置,從而知道元數據是否發生了變動。好比,新的 broker 加入後,會觸發重平衡,部分副本會移動到新的 broker 上。這時候,若是客戶端收到 不是首領的錯誤,客戶端在發送請求以前刷新元數據緩存。
Kafka 重平衡流程多線程
我在 真的,關於 Kafka 入門看這一篇就夠了 中關於消費者描述的時候大體說了一下消費者組和重平衡之間的關係,實際上,概括爲一點就是讓組內全部的消費者實例就消費哪些主題分區達成一致。
咱們知道,一個消費者組中是要有一個羣組協調者(Coordinator)的,而重平衡的流程就是由 Coordinator 的幫助下來完成的。
這裏須要先聲明一下重平衡發生的條件
消費者訂閱的任何主題發生變化
消費者數量發生變化
分區數量發生變化
若是你訂閱了一個還還沒有建立的主題,那麼重平衡在該主題建立時發生。若是你訂閱的主題發生刪除那麼也會發生重平衡
消費者被羣組協調器認爲是 DEAD 狀態,這多是因爲消費者崩潰或者長時間處於運行狀態下發生的,這意味着在配置合理時間的範圍內,消費者沒有向羣組協調器發送任何心跳,這也會致使重平衡的發生。
在瞭解重平衡以前,你須要知道這兩個角色
羣組協調器(Coordinator):羣組協調器是一個可以從消費者羣組中收到全部消費者發送心跳消息的 broker。在最先期的版本中,元數據信息是保存在 ZooKeeper 中的,可是目前元數據信息存儲到了 broker 中。每一個消費者組都應該和羣組中的羣組協調器同步。當全部的決策要在應用程序節點中進行時,羣組協調器能夠知足 JoinGroup 請求並提供有關消費者組的元數據信息,例如分配和偏移量。羣組協調器還有權知道全部消費者的心跳,消費者羣組中還有一個角色就是領導者,注意把它和領導者副本和 kafka controller 進行區分。領導者是羣組中負責決策的角色,因此若是領導者掉線了,羣組協調器有權把全部消費者踢出組。所以,消費者羣組的一個很重要的行爲是選舉領導者,並與協調器讀取和寫入有關分配和分區的元數據信息。
消費者領導者:每一個消費者羣組中都有一個領導者。若是消費者中止發送心跳了,協調者會觸發重平衡。
在瞭解重平衡以前,你須要知道狀態機是什麼
Kafka 設計了一套消費者組狀態機(State Machine) ,來幫助協調者完成整個重平衡流程。消費者狀態機主要有五種狀態它們分別是 Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
瞭解了這些狀態的含義以後,下面咱們用幾條路徑來表示一下消費者狀態的輪轉
消費者組一開始處於 Empty 狀態,當重平衡開啓後,它會被置於 PreparingRebalance 狀態等待新消費者的加入,一旦有新的消費者加入後,消費者羣組就會處於 CompletingRebalance 狀態等待分配,只要有新的消費者加入羣組或者離開,就會觸發重平衡,消費者的狀態處於 PreparingRebalance 狀態。等待分配機制指定好後完成分配,那麼它的流程圖是這樣的
在上圖的基礎上,當消費者羣組都到達 Stable 狀態後,一旦有新的消費者加入/離開/心跳過時,那麼觸發重平衡,消費者羣組的狀態從新處於 PreparingRebalance 狀態。那麼它的流程圖是這樣的
在上圖的基礎上,消費者羣組處於 PreparingRebalance 狀態後,很不幸,沒人玩兒了,全部消費者都離開了,這時候還可能會保留有消費者消費的位移數據,一旦位移數據過時或者被刷新,那麼消費者羣組就處於 Dead 狀態了。它的流程圖是這樣的
在上圖的基礎上,咱們分析了消費者的重平衡,在 PreparingRebalance或者 CompletingRebalance 或者 Stable 任意一種狀態下發生位移主題分區 Leader 發生變動,羣組會直接處於 Dead 狀態,它的全部路徑以下
這裏面須要注意兩點:
通常出現 Required xx expired offsets in xxx milliseconds 就代表Kafka 極可能就把該組的位移數據刪除了
只有 Empty 狀態下的組,纔會執行過時位移刪除的操做。
重平衡流程
上面咱們瞭解到了消費者羣組狀態的轉化過程,下面咱們真正開始介紹 Rebalance 的過程。重平衡過程能夠從兩個方面去看:消費者端和協調者端,首先咱們先看一下消費者端
從消費者看重平衡
從消費者看重平衡有兩個步驟:分別是 消費者加入組 和 等待領導者分配方案。這兩個步驟後分別對應的請求是 JoinGroup 和 SyncGroup。
新的消費者加入羣組時,這個消費者會向協調器發送 JoinGroup 請求。在該請求中,每一個消費者成員都須要將本身消費的 topic 進行提交,咱們上面描述羣組協調器中說過,這麼作的目的就是爲了讓協調器收集足夠的元數據信息,來選取消費者組的領導者。一般狀況下,第一個發送 JoinGroup 請求的消費者會自動稱爲領導者。領導者的任務是收集全部成員的訂閱信息,而後根據這些信息,制定具體的分區消費分配方案。如圖
在全部的消費者都加入進來並把元數據信息提交給領導者後,領導者作出分配方案併發送 SyncGroup請求給協調者,協調者負責下發羣組中的消費策略。下圖描述了 SyncGroup 請求的過程
當全部成員都成功接收到分配方案後,消費者組進入到 Stable 狀態,即開始正常的消費工做。
從協調者來看重平衡
從協調者角度來看重平衡主要有下面這幾種觸發條件,
新成員加入組
組成員主動離開
組成員崩潰離開
組成員提交位移
咱們分別來描述一下,先重新成員加入組開始
新成員入組
咱們討論的場景消費者集羣狀態處於Stable 等待分配的過程,這時候若是有新的成員加入組的話,重平衡的過程
從這個角度來看,協調者的過程和消費者相似,只是剛剛從消費者的角度去看,如今從領導者的角度去看
組成員離開
組成員離開消費者羣組指的是消費者實例調用 close() 方法主動通知協調者它要退出。這裏又會有一個新的請求出現 LeaveGroup()請求 。以下圖所示
組成員崩潰
組成員崩潰是指消費者實例出現嚴重故障,宕機或者一段時間未響應,協調者接收不到消費者的心跳,就會被認爲是組成員崩潰,崩潰離組是被動的,協調者一般須要等待一段時間才能感知到,這段時間通常是由消費者端參數 session.timeout.ms 控制的。以下圖所示
重平衡時提交位移這個過程咱們就再也不用圖形來表示了,大體描述一下就是 消費者發送 JoinGroup 請求後,羣組中的消費者必須在指定的時間範圍內提交各自的位移,而後再開啓正常的 JoinGroup/SyncGroup 請求發送。