字節二面:能說說Kafka處理請求的流程麼?越詳細越好

你們好,我是 yes。面試

這是個人第三篇Kafka源碼分析文章。編程

今天來說講 Kafka Broker端處理請求的全流程,剖析下底層的網絡通訊是如何實現的、Reactor在kafka上的應用。api

再說說社區爲什麼在2.3版本將請求類型劃分紅兩大類,又是如何實現兩類請求處理的優先級。網絡

叨叨

不過在進入今天主題以前我想先叨叨幾句,就源碼這個事兒,不一樣人有不一樣的見解。多線程

有些人聽到源碼這兩個詞就被嚇到了,這麼多代碼怎麼看。奔進去就像無頭蒼蠅,一路斷點跟下來,跳來跳去,算了拜拜了您嘞。架構

而有些人以爲源碼有啥用,看了和沒看同樣,看了也用不上。socket

其實上面兩種想法我都有過,哈哈哈。那爲何我會開始看Kafka源碼呢?源碼分析

其實就是我有個同事在自學go,而後想用go寫個消息隊列,在畫架構圖的時候就來問我,這消息隊列好像有點東西啊,消息收發,元數據管理,消息如何持久一堆問題過來,我直呼頂不住。性能

這市面上KafkaRocketMQ都是現成的方案,因而乎我就看起了源碼。.net

因此促使我看源碼的初始動力,居然是爲了在同事前面裝逼!!

我是先看了RocketMQ,由於畢竟是Java寫的,而Kafka Broker都是scala寫的。

梳理了一波RocketMQ以後,我又想看看Kafka是怎麼作的,因而乎我又看起了Kafka

在源碼分析以前我先總結性的說了說Kafka底層的通訊模型。應對面試官詢問Kafka請求全過程已經夠了。

Reactor模式

在扯到Kafka以前咱們先來講說Reactor模式,基本上只要是底層的高性能網絡通訊就離不開Reactor模式。像Netty、Redis都是使用Reactor模式

像咱們之前剛學網絡編程的時候如下代碼但是很是的熟悉,新來一個請求,要麼在當前線程直接處理了,要麼新起一個線程處理。

在早期這樣的編程是沒問題的,可是隨着互聯網的快速發展,單線程處理不過來,也不能充分的利用計算機資源。

而每一個請求都新起一個線程去處理,資源的要求就過高了,而且建立線程也是一個重操做。

說到這有人想到了,那搞個線程池不就完事了嘛,還要啥Reactor

池化技術確實能緩解資源的問題,可是池子是有限的,池子裏的一個線程不仍是得候着某個鏈接,等待指示嘛。如今的互聯網時代早已突破C10K了。

所以引入的IO多路複用,由一個線程來監視一堆鏈接,同步等待一個或多個IO事件的到來,而後將事件分發給對應的Handler處理,這就叫Reactor模式

網絡通訊模型的發展以下 > 單線程 => 多線程 => 線程池 => Reactor模型

Kafka所採用的Reactor模型以下 圖來自Doug Lea大神的 Scalable IO in Java

Kafka Broker 網絡通訊模型

簡單來講就是,Broker 中有個Acceptor(mainReactor)監聽新鏈接的到來,與新鏈接建連以後輪詢選擇一個Processor(subReactor)管理這個鏈接。

Processor會監聽其管理的鏈接,當事件到達以後,讀取封裝成Request,並將Request放入共享請求隊列中。

而後IO線程池不斷的從該隊列中取出請求,執行真正的處理。處理完以後將響應發送到對應的Processor的響應隊列中,而後由ProcessorResponse返還給客戶端。

每一個listener只有一個Acceptor線程,由於它只是做爲新鏈接建連再分發,沒有過多的邏輯,很輕量,一個足矣。

Processor 在Kafka中稱之爲網絡線程,默認網絡線程池有3個線程,對應的參數是num.network.threads。而且能夠根據實際的業務動態增減。

還有個 IO 線程池,即KafkaRequestHandlerPool,執行真正的處理,對應的參數是num.io.threads,默認值是 8。IO線程處理完以後會將Response放入對應的Processor中,由Processor將響應返還給客戶端。

能夠看到網絡線程和IO線程之間利用的經典的生產者 - 消費者模式,不管是用於處理Request的共享請求隊列,仍是IO處理完返回的Response。

這樣的好處是什麼?生產者和消費者之間解耦了,能夠對生產者或者消費者作獨立的變動和擴展。而且能夠平衡二者的處理能力,例如消費不過來了,我多加些IO線程。

若是你看過其餘中間件源碼,你會發現生產者-消費者模式真的是太常見了,因此面試題常常會有手寫一波生產者-消費者。

源碼級別剖析網絡通訊模型

Kafka 網絡通訊組件主要由兩大部分構成:SocketServerKafkaRequestHandlerPool

SocketServer

能夠看出SocketServer旗下管理着,Acceptor 線程Processor 線程RequestChannel 等對象。

data-planecontrol-plane稍後再作分析,先看看RequestChannel是什麼。

RequestChannel

關鍵的屬性和方法都已經在下面代碼中註釋了,能夠看出這個對象主要就是管理Processor做爲傳輸RequestResponse的中轉站

Acceptor

接下來咱們再看看Acceptor

能夠看到它繼承了AbstractServerThread,接下來再看看它run些啥

再來看看accept(key) 作了啥

很簡單,標準selector的處理,獲取準備就緒事件,調用serverSocketChannel.accept()獲得socketChannel,將socketChannel交給經過輪詢選擇出來的Processor,以後由它來處理IO事件。 ##Processor 接下來咱們再看看Processor,相對而言比Acceptor 複雜一些。

先來看看三個關鍵的成員

再來看看主要的處理邏輯。

能夠看到Processor主要是將底層讀事件IO數據封裝成Request存入隊列中,而後將IO線程塞入的Response,返還給客戶端,並處理Response 的回調邏輯。

#KafkaRequestHandlerPool

IO線程池,實際處理請求的線程。

再來看看IO線程都幹了些啥

很簡單,核心就是不斷的從requestChannel拿請求,而後調用handle處理請求。

handle方法是位於KafkaApis類中,能夠理解爲經過switch,根據請求頭裏面不一樣的apikey調用不一樣的handle來處理請求。

咱們再舉例看下較爲簡單的處理LIST_OFFSETS的過程,即handleListOffsetRequest,來完成一個請求的閉環。

我用紅色箭頭標示了調用鏈。代表處理完請求以後是塞給對應的Processor的。

最後再來個更詳細的總覽圖,把源碼分析到的類基本上都對應的加上去了。

請求處理優先級

上面提到的data-planecontrol-plane是時候揭開面紗了。這兩個對應的就是數據類請求和控制類請求。

爲何須要分兩類請求呢?直接在請求裏面用key標明請求是要讀寫數據啊仍是更新元數據不就好了嗎?

簡單點的說好比咱們想刪除某個topic,咱們確定是想這個topic立刻被刪除的,而此時producer還一直往這個topic寫數據,那這個狀況多是咱們的刪除請求排在第N個...等前面的寫入請求處理好了才輪到刪除的請求。實際上前面哪些往這個topic寫入的請求都是沒用的,平白的消耗資源。

再或者說進行Preferred Leader選舉時候,producerack設置爲all時候,老leader還在等着follower寫完數據向他報告呢,誰知follower已經成爲了新leader,而通知它leader已經變動的請求因爲被一堆數據類型請求堵着呢,老leader就傻傻的在等着,直到超時。

就是爲了解決這種狀況,社區將請求分爲兩類。

那如何讓控制類的請求優先被處理?優先隊列?

社區採起的是兩套Listener,即數據類型一個listener,控制類一個listener

對應的就是咱們上面講的網絡通訊模型,在kafka中有兩套! kafka經過兩套監聽變相的實現了請求優先級,畢竟數據類型請求確定不少,控制類確定少,這樣看來控制類確定比大部分數據類型先被處理!

迂迴戰術啊。

控制類的和數據類區別就在於,就一個Porcessor線程,而且請求隊列寫死的長度爲20。

最後

看源碼主要就是得耐心,耐心跟下去。而後再跳出來看。你會發現不過如此,哈哈哈。

歡迎關注個人公衆號【yes的練級攻略】,更多硬核文章等你來讀。

相關文章
相關標籤/搜索