在正常狀況下,Kafka中的每一個Topic都會有不少個分區,每一個分區又會存在多個副本。在這些副本中,存在一個leader分區,而剩下的分區叫作 follower,全部對分區的讀寫操做都是對leader分區進行的。因此當咱們向Kafka寫消息或者從Kafka讀取消息的時候,必須先找到對應分區的Leader及其所在的Broker地址,這樣才能夠進行後續的操做。本文將要介紹的就是 Kafka 是如何找到 leader 分區的。apache
咱們知道, Kafka 是使用 Scala 語言編寫的,可是其支持不少語言的客戶端,包括:C/C++、PHP、Go以及Ruby等等(參見https://cwiki.apache.org/confluence/display/KAFKA/Clients)。這是爲何呢?這是由於 Kafka 內部實現了一套基於TCP層的協議,只要使用這種協議與Kafka進行通訊,就可使用不少語言來操做Kafka。緩存
目前 Kafka 內部支持多達30多種協議,本文介紹的 Kafka 客戶端是如何找到 leader 分區就涉及到 Kafka 內部的 Metadata 協議。Metadata 協議主要解決如下四種問題:服務器
Kafka中存在哪些主題?併發
每一個主題有幾個分區?3d
Leader分區所在的broker地址及端口?code
每一個broker的地址及端口是多少?blog
客戶端只須要構造相應的請求,併發送到Broker端,便可獲取到上面四個問題的答案。整個過程以下:內存
客戶端構造相應的請求io
客戶端將請求發送到Broker端class
Broker端接收到請求處理,並將結果發送到客戶端。
Metadata 請求協議(v0-v3版本)以下:
目前 Metadata 請求協議存在五個版本,v0-v3版本格式一致。可是這些協議存在一個問題:當 Kafka 服務器端將 auto.create.topics.enable
參數設置爲 ture 時,若是咱們查詢的主題不存在,Kafka 將會自動建立這個主題,這極可能不是咱們想要的結果。因此,基於這個問題,到了 Metadata 請求協議第五版,格式已經變化了,以下:客戶端只須要構造一個 TopicMetadataRequest
,裏面包括咱們須要查詢主題的名字(TopicNames);固然,咱們能夠一次查詢多個主題,只須要將這些主題放進List裏面便可。同時,咱們還能夠不傳入任何主題的名字,這時候 Kafka 將會把內部全部的主題相關的信息發送給客戶端。
Kafka 的 Broker 收到客戶端的請求處理完以後,會構造一個 TopicMetadataResponse
,併發送給客戶端。TopicMetadataResponse
協議的格式以下:咱們能夠指定 allow_auto_topic_creation
參數來告訴 Kafka 是否須要在主題不存在的時候建立,這時候控制權就在咱們了。
能夠看到,相應協議裏面包含了每一個分區的 Leader、Replicas 以及 Isr 信息,同時還包括了Kafka 集羣全部Broker的信息。若是處理出現了問題,會出現相應的錯誤信息碼,主要包括下面幾個:
並且,Metadata 協議是目前惟一一個能夠向任何 Broker 發送的協議。由於任何一個 Broker 在啓動以後會存儲這些Metadata信息的。並且,Kafka 提供的客戶端在獲取到 Metadata 信息以後也會將它存儲到內存中的。而且在如下幾種狀況會更新已經緩存下來的 Metadata 信息:
在往Kafka發送請求是收到 Not a Leader 異常;
在 meta‐data.max.age.ms
參數配置的時間過時以後。
以上兩種狀況 Kafka提供的客戶端會自動再發送一次 Metadata 請求,這樣就能夠獲取到更新的信息。整個過程以下:
好了,說了半天的,咱們來看看程序裏面如何構造 TopicMetadataRequest
以及處理 TopicMetadataResponse
。
TopicMetadataRequest
是經過 SimpleConsumer
的 send
方法發送的,其返回的是 TopicMetadataResponse
,其中就包含了咱們須要的信息。 運行上面的程序輸出以下:
上面的輸出就能夠看到各個分區的leader所在機器、isr以及全部replicas等信息。有一點咱們須要注意,由於目前存在多個版本的 Metadata 請求協議,咱們可使用低版本的協議與高版本的Kafka集羣進行通訊,由於高版本的 Kafka 可以支持低版本的 Metadata 請求協議;可是咱們不能使用高版本的 Metadata 請求協議與低版本的 Kafka 通訊。