你們好,我是 yes。面試
這是個人第三篇Kafka源碼分析文章。編程
今天來說講 Kafka Broker
端處理請求的全流程,剖析下底層的網絡通訊是如何實現的、Reactor在kafka上的應用。api
再說說社區爲什麼在2.3版本將請求類型劃分紅兩大類,又是如何實現兩類請求處理的優先級。網絡
叨叨
不過在進入今天主題以前我想先叨叨幾句,就源碼這個事兒,不一樣人有不一樣的見解。多線程
有些人聽到源碼這兩個詞就被嚇到了,這麼多代碼怎麼看。奔進去就像無頭蒼蠅,一路斷點跟下來,跳來跳去,算了拜拜了您嘞。架構
而有些人以爲源碼有啥用,看了和沒看同樣,看了也用不上。socket
其實上面兩種想法我都有過,哈哈哈。那爲何我會開始看Kafka
源碼呢?源碼分析
其實就是我有個同事在自學go
,而後想用go寫個消息隊列,在畫架構圖的時候就來問我,這消息隊列好像有點東西啊,消息收發,元數據管理,消息如何持久一堆問題過來,我直呼頂不住。性能
這市面上Kafka
、RocketMQ
都是現成的方案,因而乎我就看起了源碼。.net
因此促使我看源碼的初始動力,居然是爲了在同事前面裝逼!!
我是先看了RocketMQ
,由於畢竟是Java
寫的,而Kafka Broker
都是scala
寫的。
梳理了一波RocketMQ
以後,我又想看看Kafka
是怎麼作的,因而乎我又看起了Kafka
。
在源碼分析以前我先總結性的說了說Kafka
底層的通訊模型。應對面試官詢問Kafka
請求全過程已經夠了。
Reactor模式
在扯到Kafka
以前咱們先來講說Reactor模式
,基本上只要是底層的高性能網絡通訊就離不開Reactor模式
。像Netty、Redis都是使用Reactor模式
。
像咱們之前剛學網絡編程的時候如下代碼但是很是的熟悉,新來一個請求,要麼在當前線程直接處理了,要麼新起一個線程處理。
在早期這樣的編程是沒問題的,可是隨着互聯網的快速發展,單線程處理不過來,也不能充分的利用計算機資源。
而每一個請求都新起一個線程去處理,資源的要求就過高了,而且建立線程也是一個重操做。
說到這有人想到了,那搞個線程池不就完事了嘛,還要啥Reactor
。
池化技術確實能緩解資源的問題,可是池子是有限的,池子裏的一個線程不仍是得候着某個鏈接,等待指示嘛。如今的互聯網時代早已突破C10K
了。
所以引入的IO多路複用
,由一個線程來監視一堆鏈接,同步等待一個或多個IO事件的到來,而後將事件分發給對應的Handler
處理,這就叫Reactor模式
。
網絡通訊模型的發展以下 > 單線程 => 多線程 => 線程池 => Reactor模型
Kafka所採用的Reactor模型
以下
Kafka Broker 網絡通訊模型
簡單來講就是,Broker 中有個Acceptor(mainReactor)
監聽新鏈接的到來,與新鏈接建連以後輪詢選擇一個Processor(subReactor)
管理這個鏈接。
而Processor
會監聽其管理的鏈接,當事件到達以後,讀取封裝成Request
,並將Request
放入共享請求隊列中。
而後IO線程池不斷的從該隊列中取出請求,執行真正的處理。處理完以後將響應發送到對應的Processor
的響應隊列中,而後由Processor
將Response
返還給客戶端。
每一個listener
只有一個Acceptor線程
,由於它只是做爲新鏈接建連再分發,沒有過多的邏輯,很輕量,一個足矣。
Processor
在Kafka中稱之爲網絡線程,默認網絡線程池有3個線程,對應的參數是num.network.threads
。而且能夠根據實際的業務動態增減。
還有個 IO 線程池,即KafkaRequestHandlerPool
,執行真正的處理,對應的參數是num.io.threads
,默認值是 8。IO線程處理完以後會將Response
放入對應的Processor
中,由Processor
將響應返還給客戶端。
能夠看到網絡線程和IO線程之間利用的經典的生產者 - 消費者模式,不管是用於處理Request的共享請求隊列,仍是IO處理完返回的Response。
這樣的好處是什麼?生產者和消費者之間解耦了,能夠對生產者或者消費者作獨立的變動和擴展。而且能夠平衡二者的處理能力,例如消費不過來了,我多加些IO線程。
若是你看過其餘中間件源碼,你會發現生產者-消費者模式真的是太常見了,因此面試題常常會有手寫一波生產者-消費者。
源碼級別剖析網絡通訊模型
Kafka 網絡通訊組件主要由兩大部分構成:SocketServer 和 KafkaRequestHandlerPool。
SocketServer
能夠看出
SocketServer
旗下管理着,Acceptor 線程
、Processor 線程
和 RequestChannel
等對象。
data-plane
和control-plane
稍後再作分析,先看看RequestChannel
是什麼。
RequestChannel
關鍵的屬性和方法都已經在下面代碼中註釋了,能夠看出這個對象主要就是管理
Processor
和做爲傳輸Request
和Response
的中轉站。
Acceptor
接下來咱們再看看Acceptor
能夠看到它繼承了AbstractServerThread
,接下來再看看它run些啥
再來看看
accept(key)
作了啥
很簡單,標準selector
的處理,獲取準備就緒事件,調用serverSocketChannel.accept()
獲得socketChannel
,將socketChannel
交給經過輪詢選擇出來的Processor
,以後由它來處理IO事件。 ##Processor 接下來咱們再看看Processor
,相對而言比Acceptor
複雜一些。
先來看看三個關鍵的成員
再來看看主要的處理邏輯。
能夠看到Processor
主要是將底層讀事件IO數據封裝成Request
存入隊列中,而後將IO線程塞入的Response
,返還給客戶端,並處理Response
的回調邏輯。
#KafkaRequestHandlerPool
IO線程池,實際處理請求的線程。
再來看看IO線程都幹了些啥
很簡單,核心就是不斷的從requestChannel
拿請求,而後調用handle處理請求。
handle
方法是位於KafkaApis
類中,能夠理解爲經過switch
,根據請求頭裏面不一樣的apikey
調用不一樣的handle
來處理請求。
咱們再舉例看下較爲簡單的處理LIST_OFFSETS
的過程,即handleListOffsetRequest
,來完成一個請求的閉環。
我用紅色箭頭標示了調用鏈。代表處理完請求以後是塞給對應的Processor
的。
最後再來個更詳細的總覽圖,把源碼分析到的類基本上都對應的加上去了。
請求處理優先級
上面提到的data-plane
和control-plane
是時候揭開面紗了。這兩個對應的就是數據類請求和控制類請求。
爲何須要分兩類請求呢?直接在請求裏面用key標明請求是要讀寫數據啊仍是更新元數據不就好了嗎?
簡單點的說好比咱們想刪除某個topic,咱們確定是想這個topic立刻被刪除的,而此時producer還一直往這個topic寫數據,那這個狀況多是咱們的刪除請求排在第N個...等前面的寫入請求處理好了才輪到刪除的請求。實際上前面哪些往這個topic寫入的請求都是沒用的,平白的消耗資源。
再或者說進行Preferred Leader
選舉時候,producer
將ack
設置爲all
時候,老leader
還在等着follower
寫完數據向他報告呢,誰知follower
已經成爲了新leader
,而通知它leader已經變動的請求因爲被一堆數據類型請求堵着呢,老leader
就傻傻的在等着,直到超時。
就是爲了解決這種狀況,社區將請求分爲兩類。
那如何讓控制類的請求優先被處理?優先隊列?
社區採起的是兩套Listener
,即數據類型一個listener
,控制類一個listener
。
對應的就是咱們上面講的網絡通訊模型,在kafka中有兩套! kafka經過兩套監聽變相的實現了請求優先級,畢竟數據類型請求確定不少,控制類確定少,這樣看來控制類確定比大部分數據類型先被處理!
迂迴戰術啊。
控制類的和數據類區別就在於,就一個Porcessor線程
,而且請求隊列寫死的長度爲20。
最後
看源碼主要就是得耐心,耐心跟下去。而後再跳出來看。你會發現不過如此,哈哈哈。
歡迎關注個人公衆號【yes的練級攻略】,更多硬核文章等你來讀。