在這一篇文章中,我將向你介紹消費者的一些參數。算法
這些參數影響了每次poll()
請求的數據量,以及等待時間。session
在這以後,我將向你介紹Kafka用來保證消費者擴展性以及可用性的設計——消費者組。併發
在消費者組的介紹中,我將重點放在了Rebalance
的過程上,由於這是一個很重要又常常發生,還會致使消費者組不可用的操做。fetch
對於一個消費者來講,他要作的事情只有一件,那就是使用poll()
來拉取消息。設計
至於他是從哪一個分區拉取,則是靠消費者組來動態的調整這個消費者所消費的分區,又或者是由開發者來自定義。code
但不管如何,這個消費者都須要經過poll()
來拉取消息。開發
這也是這一節的內容:經過參數配置可以影響poll操做的哪些內容。源碼
首先須要肯定一點,當消費者使用poll()
拉取消息的時候,他只能拉到HW水位線及如下的消息。hash
咱們可讓消費者針對於某一個分區進行消費。it
爲了實現這個目標,咱們能夠用assign()
方法。
可是注意,當這個消費者不是單獨的一個消費者,而是屬於某個消費者組的時候,將不容許使用自定義的分區分配。
對應的配置分別是:
fetch.min.bytes
對於每次拉取的最小字節數,默認是1。當拉取的消息大小小於設定的這個限度時,將會等待,直到此次被拉取的消息大小大於這個值。
因而咱們能夠得知,當咱們即將要消費的消息比較小時,能夠適當的調大這個參數的值,以提升吞吐量。
可是注意,這也可能形成消息的額外延遲。
fetch.max.bytes
這個參數跟上面的同樣,只不過他表明的意義是最大的字節數。
可是這存在一個問題,若是咱們的消息大小全都大於這個參數的值,會發生什麼狀況呢?
答案是會返回即將拉取分區的第一條消息。
也就是說在這個參數中,不存在「不符合條件就不返回數據」的狀況。
還有一個參數,叫作max.partition.fetch.bytes
這個參數跟上面提到的每次拉取的最大字節數工做原理是同樣的,也是會保證當消息大於設定的值的時候,必定會返回數據。
而不一樣的地方在於,這個參數表明的是分區。也就是說,一個參數表明的是一次拉取請求,而另一個參數表明的是針對於每個分區的拉取請求。
fetch.max.wait.ms
這個參數的意義在於:若是拉取消息的時間達到了這個參數設定的值,那麼不管符不符合其餘條件,都會返回數據。
那麼你很容易能夠猜到,這個參數跟fetch.min.bytes
是有關係的,這是爲了防止當fetch.min.bytes
參數設置的過大,致使沒法返回消息的狀況。
固然了,這個參數還有一個意義,若是你的業務須要更小的延遲,那麼應該調小這個參數。
若是咱們的最大拉取字節數設置成了很是大,那麼是否是表明咱們每一次的poll()
,都能直接拉到HW水位呢?
答案是否認的。
還存在一個參數:
max.poll.records
這個參數的意義在於,每次拉取消息的最大數量。
一樣的,若是消息的大小都比較小,那麼能夠調大這個參數,以提升消費速度。
另外,還存在一些消費者組相關的參數,我在這裏先提一下,具體更詳細的解釋,將在後文給出。
heartbeat.interval.ms
這個參數是設置消費者與消費者組對應的Coordinator發送心跳響應的間隔時間。
session.timeout.ms
這個參數是用於Coordinator判斷多長時間沒收到消費者的心跳響應而認爲這個消費者已經下線的時間。
max.poll.interval.ms
這個參數用於Coordinator判斷多長時間內消費者都沒有拉取消息,而認爲這個消費者已經下線的時間。
auto.offset.reset
這個參數其實跟消費者組的聯繫不是很大,可是我認爲能夠寫在這裏。
由於有這麼一個場景,當消費者Rebalance以後,若是位移主題以前保存的位移已經被刪除了,那麼這個參數就決定了消費者該從哪裏開始消費。
固然了,關於消費者還有許多的參數,不只僅是上文提到的這些。
而上文提到的這些參數,是我認爲可讓初學者更好的理解消費者的工做原理。
在解釋Rebalance的原理以前,我想先跟你說一下個人思路,省得你看的一頭霧水。
固然了,這個思路是我認爲更適合我本身去理解的。你也能夠先看第三大節,再有了一個大概的認識後,再來看這一節的內容。
我但願先告訴你Rebalance的過程是怎麼樣的,這裏說的過程指的是Rebalance已經發生了,那麼在Rebalance的過程當中,會發生哪些事情。
在這以後,我再跟你說說Rebalance的五種狀態。
那麼,咱們開始。
首先,應該有一個認識。Rebalance的全部操做都是經過Coordinator的協調下完成的,組內的消費者之間並不會進行相關的通訊與交流。
Coordinator你能夠理解爲是一個服務,位於某個broker節點上。
假設當前的消費者已經保存了這個這個節點的信息,那麼將會直接進入第二步。
若是當前的消費者沒有保存這個信息(好比這是一個新加入這個消費者組的消費者),那麼他須要先找到這個Coordinator所在的broker節點。
這裏的broker節點,是這個消費者對應的消費者組對應的位移主題的分區的leader節點。
聽起來有點繞,讓我來再解釋一下。
消費者 -> 消費者組 -> __consumer_offsets -> partition -> leader
關於位移主題,我已經在第二篇文章中提到過了,在這裏再也不贅述。
可是在這裏,讓咱們來再來回憶一遍消費者組對應的partition是怎麼找到的。
Group ID
的hash值__consumer_offsets
的分區數取模在找到了對應的broker節點後,第二步是發送加入Group的請求。
在這一步中,不管是以前已經在Group內的成員,仍是準備加入Group的成員,都須要發送Join Group的申請。
在發起的JoinGroupRequest中,須要包含以下的數據:
Group id
Session_timeout
Rebalance_timeout
Menber_id
Partition assignor
須要事先說明的是,這裏的名稱並不嚴格,是爲了更好的理解而這樣寫的。若是你想要知道更加嚴謹的請求內容,能夠去看廝大的《深刻理解Kafka》。
下面咱們挨個解釋:
Group ID
,消費者組ID,表明了即將加入的消費者組。
Session_timeout
,上文中提到過這個參數,用於Coordinator判斷多長時間內沒收到客戶端的心跳包而認爲這個客戶端已經下線。
Rebalance_timeout
,值等同於max.poll.interval.ms
,意義在於告知Coordinator用多長的時間來等待其餘消費者加入這個消費者組。
咱們在上文中提到,不管以前是否是這個消費者組的成員,只要開啓了Rebalance,就須要從新加入這個消費者組。所以,Coordinator須要一段時間來接受JoinGroupRequest的請求。
至於爲何須要一段時間來接受請求,以及這段時間發生了什麼,我將在後面給你解釋。
menber_id
,做爲組內消費者的識別編號,若是是新加入組的消費者,這個字段留空。
Partition assignor
,指的是分區分配方式。由於Rebalance這個過程,就是分區分配的一個過程。每一個消費者將其接受的分配方式放在這個字段中,隨後由Coordinator選出每一個消費者都承認的分區分配方式。
而後咱們來聊聊在這個階段,Coordinator須要作什麼。
Coordinator須要一段時間來接收來自客戶端的JoinGroupRequest請求,是由於Coordinator須要收集每個成員的信息,選出leader和分區分配方式,所以,Coordinator須要足夠的時間來「收集信息」。這就回答了上文說到的爲何「Coordinator須要一段時間來接受JoinGroupRequest的請求」。
選舉leader的算法很簡單,第一個發送請求的consumer,就是leader。
選出分區分配策略的算法也很簡單,首先Coordinator會收集全部消費者都支持的分區分配方式,而後每一個消費者爲它支持的分配方式投上一票。注意,這裏的投票行爲沒有通過多一次的交互,而是Coordinator選取每一個消費者的JoinGroupRequest中的第一個分區分配方式,做爲這個消費者所投的票。
當Coordinator選取好Leader和分區分配方式後,將返回JoinGroupResponse給各個消費者。
在返回給各個消費者的JoinGroupResponse中,包含了menber_id,分區分配方式等。而對於leader消費者來講,還將得到組內其餘消費者的元數據,包含了各個消費者的menber_id,分區分配方式。
至此,JoinGroup階段完成。
注意,每一個消費者從發送JoinGroupRequest到接收到JoinGroupResponse請求這段時間,是阻塞的。
在第二步結束以後,每一個消費者已經知道了本身的menber_id
,以及Coordinator所選擇的分區分配方式。
可是此時每一個消費者還不知道本身應該消費哪一個分區。
這個分區分配的過程,是交給Leader消費者來完成的。
可是注意,雖說這個過程是Leader消費者完成的,可是Leader消費者並不會跟其餘消費者直接通訊,而是將分配方式告知Coordinator,由Coordinator來告知各個消費者。
這個過程,稱爲Sync_Group
。
在這個過程當中,每個消費者都會發送SyncGroupRequest給Coordinator。要注意的是,Leader消費者在這個Request中還附帶了其餘消費者的分區分配信息。
在Coordinator收到了這些請求後,會將這個分區分配方案等元數據保存在__consumer_offsets
主題中。
隨後,Coordinator將發送響應給各個消費者。
在這個響應中,包含了各個消費者應該負責消費的分區編號。
至此,每一個消費者都瞭解了本身應該消費的分區是哪些了。
在上一個階段中,組內各個消費者已經知道了本身負責的是哪些分區。
可是還存在一個問題,消費者應該從分區的哪一個位置開始消費呢?
這就用到了__consumer_offsets
主題了,這個主題保存了某個消費者組的各個分區的消費位移。
此外,每一個消費者還須要不斷地發送心跳包給Coordinator,以告知Coordinator本身沒有下線。
這個發送心跳包的時間,就是咱們設置的heartbeat.interval.ms
參數。
在每一個心跳包的響應中,Coordinator就會告知這個消費者,需不須要Rebalance。
那麼也就說明了,這個參數設置的越小,消費者就越早可以得知是否須要Rebalance。
而對應的session.timeout.ms
,指的就是Coordinator在這麼長的時間內沒收到消費者的心跳包,而認爲這個消費者過時的參數。
在上面說完了Rebalance的核心原理後,咱們再來聊聊消費者組的各個狀態。
先來介紹一下消費者組有哪幾種狀態:
__consumer_offsets
中也沒有保存這個消費者組的元數據。一般發生在這個消費者組被刪除了,或者__consumer_offsets
分區leader發生了改變。(至於這個狀態我瞭解的也不是不少,若是能夠的話,麻煩你評論區告訴我。)Rebalance_timeout
這麼長的時間。消費者組的狀態介紹大概就是這樣的。
簡單的來說,當一個消費者組須要Rebalance的時候,他就會進入PreparingRebalance階段,而後一直流轉到Stable階段。
在這個期間,若是有任何的成員變更,就會回到PreparingRebalance階段。
在這個期間,若是Coordinator改變,或者消費者組被刪除等,就會進入Dead階段。
首先,謝謝你能看到這裏!
在這一篇文章中,我沒有像介紹生產者那樣介紹一遍源碼。
由於對於生產者來講,他只須要將消息發送到broker中,而對於消費者來講,這個過程複雜得多,我但願可以用比較淺顯易懂的方式,讓你可以瞭解消費者組的工做方式。
在有了這樣的一個認識以後,不管使用什麼客戶端,我認爲都不會有太大的問題。
此外,在這一篇中我花了較大的筆墨去介紹Rebalance的過程,是由於Rebalance是一個很常見的現象,並且在這期間會致使Kafka消費者的不可用,因此我但願瞭解了Rebalance的工做原理,可以讓你更容易的避免沒必要要的Rebalance。
固然了,由於做者才疏學淺能力有限,可能在這個過程當中忽略了一些很重要的細節,又或者有一些錯誤的理解。若是你發現了,還請不吝指教,謝謝你!
再次謝謝你能看到這裏,感恩~
PS:若是有任何的問題,能夠在公衆號找到我,歡迎來找我玩!