全部的討論都是基於KIP-291展開的。抱歉,這又是一篇沒有圖的文字。apache
目前Kafka broker對全部發過來的請求都是一視同仁的,不會區別對待。不論是用於生產消費的PRODUCE和FETCH請求,仍是controller端發送的LeaderAndIsr/StopReplica/UpdateMetadata請求,亦或是其餘類型的請求也是同樣。一般咱們這裏把PRODUCE/FETCH請求稱爲數據類請求;把controller發送的那3種請求稱爲控制類請求或controller類請求——在源碼中前者被稱爲data plane request,後者稱爲controller plane request。緩存
這種公平處理原則在不少場合下都是不合理的。爲何?簡單來講控制類請求具備直接令數據類請求失效的能力。舉個例子,若是我有個topic,單分區雙副本,其中broker0上保存leader副本,broker1上保存follower副本。當broker0上積壓了大量的PRODUCE請求時,此時用戶執行了重分區或preferred分區選舉將broker1變動成了leader,那麼controller會向broker0發送LeaderAndIsr請求告訴它如今是一個follower了,而broker1上的follower已經中止向leader拉取數據(由於它要成爲leader了)——此時一個比較尷尬的情形出現了:若是producer的acks設置的是all,那麼這些在LeaderAndIsr請求以前積壓的PRODUCE請求就沒法正常完成——要麼一直緩存在purtagory中要麼請求超時返回給client。設想一下,若是Kafka可以及時地處理LeaderAndIsr請求,那麼這些積壓的PRODUCE請求就能當即失敗(NOT_LEADER_FOR_PARTITION),立刻返回給client。Client不用等到 purgatory中的請求超時,下降了請求的處理時間。即便acks不是all,縱然積壓的PRODUCE請求寫入本地日誌後成功返回,但處理過LeaderAndIsr請求後broker0上副本變爲follower,還要執行截斷(truncation),所以在client看來這些消息就丟失了。安全
再舉一個例子,一樣是在積壓大量數據類請求的broker上,若是用戶刪除了topic,那麼StopReplica請求沒法及時處理,致使topic沒法真正刪除,增長了刪除topic的延時。網絡
最後還能夠舉個例子說明對UpdateMetadata的影響。若是UpdateMetadata不能及時處理,broker上保存的就是過時的元數據,當client獲取到這些數據時,不論是producer仍是consumer均可能沒法正常工做,直到獲取到最新的元數據信息。fetch
經過上面3個例子能夠看出一般狀況下咱們但願controller類請求的處理優先級要高於數據類請求,這也是社區作KIP-291的初衷 。可喜的是Kafka 2.2正式實現了這個功能,下面咱們來看看社區是怎麼作的:線程
其實在KIP-291以前,我也思考過這個問題。當時我提出的想法是這樣的:在broker的KafkaRequestHandlerPool中實現一個優先級隊列,當controller類請求到達時,它可以」搶佔式「地排在處理隊列的最前部——這是很天然的想法,因此我本覺得KIP-291也是這麼實現的,但通篇看下來我尷尬地發現我這個解決思路記錄在「Rejected Alternatives"中。這個方案最大的問題在於它沒法處理隊列已滿的情形,即當處理隊列已經沒法容納任何新的請求時該如何支持優先處理controller類請求?縱然有優先級隊列也沒法解決這個問題。scala
KIP-291是怎麼解決的呢?很簡單,Kafka從新爲controller類請求作了專屬的監聽器+請求隊列+acceptor+processor線程。監聽器經過Kafka的listeners和advertised.listeners設置,新的請求隊列則專門保存controller類請求,而acceptor和processor線程負責接收網絡發送過來的以及處理隊列中的controller類請求。咱們一個一個說吧。設計
當前,用戶能夠在listeners中指定多套監聽器,好比PLAINTEXT://kafka1:9092, SSL://kafka1:9093。你其實也能夠自定義你的監聽器,好比INTERNAL://kafka1:9094。用戶能夠指定broker端參數inter.broker.listener.name或security.inter.broker.protocol(兩個不能同時指定)來設定,同時你還須要在listener.security.protocol.map中指定這個自定義listener使用的安全協議,好比: listener.security.protocol.map=INTERNAL:PLAINTEXT。KIP-291複用了這個設計,若是你設置了inter.broker.listener.name或security.inter.broker.protocol,Kafka會默認使用這個listener專屬服務controller類請求。同時社區還引入了一個新的參數:control.plane.listener.name,用來專門讓你設置服務controller類請求的監聽器名稱。這個參數的優先級要高於前面那兩個參數,所以仍是推薦用戶直接設置此參數,好比設置control.plane.listener.name=CONTROLLER,同時更新listener.security.protocol.map,增長CONTROLLER:PLAINTEXT匹配對(假設你用的是PLAINTEXT)。這就是爲controller類請求建立監聽器的方法。日誌
下面說請求隊列和acceptor、processor線程。 其實也不用細說,和現有的設計如出一轍,只是默認的隊列大小再也不是500,而是20,默認的線程數再也不是8而是2,由於咱們假設controller類請求一般不該該有積壓。具體的實現原理有興趣的話直接讀KafkaRequestHandlerPool.scala、RequestChannel.scala和SocketServer.scala源碼吧。還須要修改的地方是controller代碼,特別是在ControllerChannelManager.scala中增長新的broker時必定要使用controller類請求專屬的監聽器。隊列
除了以上這些,該KIP也引入了不少監控controller類請求處理的JMX指標,如隊列請求數、線程空閒程度等,這些和以前的指標都是同樣的,只是僅監控controller plane監聽器之用。再說一點,當前Kafka支持動態地調整請求處理線程數。在對請求進行區分處理後,我估計後續也要支持對controller類請求線程數的動態調整吧。
整體來講,將請求作區分處理後對於繁忙Kafka集羣將可以更迅速地處理控制類請求,表現爲狀態的更新更加及時,集羣不一致狀態窗口將會縮小,同時還提高了總體可用性。目前該KIP還只是對請求作兩類處理,也許往後會作一些更加細粒度的區分——好比Metadata請求是否也應該享有更高的優先級處理。
最後還想提一句,KIP-291是我認爲近期社區改動影響比較大的兩個KIP之一。另外一個則是KIP-392——還記得Kafka不能從follower副本讀數據的限制吧?這個KIP要打破這個限制!只是目前該KIP還在討論中,咱們後面拭目以待吧。