文章首發於公衆號【大數據學徒】,搜索 dashujuxuetu 或文末掃碼關注html
本文總結了 broker 是如何處理用戶的各類請求,包括常見的 Produce 請求、Fetch 請求、Metadata 請求,清晰易懂。apache
文章思惟導圖:bootstrap
Kafka 在 TCP 之上有一套本身的二進制協議,定義瞭如何處理各類請求的規範,詳細的協議規範能夠參見 協議文檔。Kafka 用戶請求有一些公共字段,好比請求類型、協議版本、客戶端 ID、請求 ID 等等。緩存
broker 對於每一個監聽的端口會啓動一個 acceptor 線程用來接收用戶請求,而後有若干個 processor 和 handler 線程處理這些請求,processor 負責將請求放入一個請求隊列,以及從響應隊列中取出響應返回給用戶,handler 是實際處理請求(讀寫各個 topic 的消息等)的線程。網絡
Produce 請求(來自生產者生產數據的請求)和 Fetch 請求(來自消費者消費數據和 follower 同步數據的請求)都是由每一個分區副本的 leader 來處理的。若是用戶把對於一個分區的請求發到了 follower 上,那麼會獲得一個「我不是 Leader」的錯誤,將請求發送 leader 這一點由客戶端本身保證,怎麼保證呢?用 metadata 請求。性能
metadata 請求中包含了若干個 topic 的名稱,客戶端能夠向任意 broker 發送 metadata 請求,broker 會告訴客戶端這些 topic 有哪些分區,分區有哪些副本,以及誰是副本 leader,客戶端會緩存這些元信息。緩存有過時時間,對應可配置參數 metadata.max.age.ms
,默認是 5 分鐘,過時後,客戶端會從新發送此請求更新元信息。此外,若是客戶端收到了「我不是 Leader」這種錯誤,那麼也會從新發送 metadata 請求,由於這個錯誤說明它的元信息已通過期。fetch
爲何客戶端訪問 Kafka 時不須要指定所有的 broker,就是由於只要能鏈接上一個 broker,客戶端就能夠獲取到全部須要訪問的 broker 信息,因此叫 bootstrap-server。大數據
Produce 請求中會包含一個可配置參數 acks
,表示須要多少個 broker 確認接收到這些消息才能夠認爲寫入成功。這個參數有三個合法值:0,1 和 all,0 表示不確認,1 表示 leader 確認收到就行,all 表示必須 ISR 中的全部 broker 都確認才行,顯然數據越多,數據的可靠性更強,其中 all 等同於-1。線程
當一個 leader 收到 produce 請求時,它會作如下判斷:日誌
acks
參數的值是否合法,若是不是 0,1 或 all,則返回錯誤。acks
被設置爲 all,則須要判斷 ISR 中是否有足夠的 broker,若是沒有則返回一個 NotEnoughReplicas
的錯誤,怎麼判斷是否足夠呢?有一個參數叫作 min.insync.replicas
,表示 ISR 中 broker 數目所容許的最小值。若是有足夠的 broker,則 leader 會先將請求中的消息寫入磁盤,而後這個請求會被放在一個緩衝區中,等待全部 ISR中的 follower 確認已經同步了這個請求中的消息,最後向客戶端返回寫入成功的響應。消費者和 follower 的 Fetch 請求內容大體相似於:請把這個 topic 的這些分區分別從這些 offset 開始的消息給我,很是直接,但指定的 offset 必須是存在的,不然會返回錯誤。
Fetch 請求分別能夠指定兩個參數: fetch.max.bytes
和 fetch.min.bytes
,用來限制 leader 返回消息的最大值和最小值,默認值分別是 50M 和 1B,限制最大值是防止撐爆客戶端的緩存,限制最小值是避免消息很少浪費網絡開銷。對於這兩個限制還有兩個參數分別和它們有關係,對於最大值,有一個參數叫作 max.partition.fetch.bytes
用來限制單個分區所能返回的數據,默認值是 1M,對於最小值,有一個參數叫作 fetch.max.wait.ms
,表示等待最大的時間,即便沒有知足最小值的要求也要返回響應,默認值是 500ms。
leader 處理 Fetch 請求使用了「零拷貝」技術,即消息是直接從日誌文件寫到網絡的,中間沒有通過任何緩衝區,免去了不少拷貝和管理緩存區的開銷,性能極高。
此外,若是 Fetch 請求中指定的 offset 只在 leader 上存在尚未同步到其它全部 ISR 中的 follower,leader 也不會返回這些消息而是返回一個空響應。這樣作的緣由是:若是這時容許客戶端讀取這些消息,若是這臺 leader 忽然掛掉,而一臺沒有這些消息的 follower 成爲 leader,若是客戶端就沒法再消費這些消息,這種數據不一致的狀況是一個消息系統要避免的。
因爲 Kafka 的協議一直在演進,不少協議格式有新老多個版本,這時候就涉及到版本兼容的問題。總的來講,在 0.10.0 以後的版本,Kafka 的網絡協議是先後兼容的,即高版本的客戶端能夠和低版本的 broker 通訊,低版本的客戶端也能夠和高版本的 broker 通訊(高版本的遷就低版本的),0.10.0 以前的 broker 收到高版本的客戶端請求可能會報錯。所以,若是老版本(0.10.0 以前)的系統須要升級,先升級 broker,再升級客戶端,會比較穩妥。
歡迎交流討論,吐槽建議,分享收藏。