以前寫過一篇《從源碼分析如何優雅的使用 Kafka 生產者》 ,有生產者天然也就有消費者。git
建議對 Kakfa 還比較陌生的朋友能夠先看看。github
就個人使用經驗來講,大部分狀況都是處於數據下游的消費者角色。也用 Kafka
消費過日均過億的消息(不得不佩服 Kakfa 的設計),本文將藉助我使用 Kakfa 消費數據的經驗來聊聊如何高效的消費數據。數據庫
以以前生產者中的代碼爲例,事先準備好了一個 Topic:data-push
,3個分區。多線程
先往裏邊發送 100 條消息,沒有自定義路由策略,因此消息會均勻的發往三個分區。源碼分析
先來談談最簡單的單線程消費,以下圖所示:性能
因爲數據散列在三個不一樣分區,因此單個線程須要遍歷三個分區將數據拉取下來。測試
單線程消費的示例代碼:線程
這段代碼你們在官網也能夠找到:將數據取出放到一個內存緩衝中最後寫入數據庫的過程。設計
先不討論其中的 offset 的提交方式。3d
經過消費日誌能夠看出:
取出的 100 條數據確實是分別遍歷了三個分區。
單線程消費雖然簡單,但存在如下幾個問題:
既然單線程有諸多問題,那是否能夠用多線程來提升效率呢?
在多線程以前不得不將消費模式分爲兩種進行探討:消費組、獨立消費者。
這兩種消費模式對應的處理方式有着很大的不一樣,因此頗有必要單獨來說。
先從獨立消費者模式
談起,這種模式相對於消費組來講用的相對小衆一些。
看一個簡單示例便可知道它的用法:
值得注意的是:獨立消費者能夠不設置 group.id 屬性。
也是發送100條消息,消費結果以下:
經過 API 能夠看出:咱們能夠手動指定須要消費哪些分區。
好比 data-push
Topic 有三個分區,我能夠手動只消費其中的 1 2 分區,第三個能夠視狀況來消費。
同時它也支持多線程的方式,每一個線程消費指定分區進行消費。
爲了直觀,只發送了 10 條數據。
根據消費結果能夠看出:
c1 線程只取 0 分區;c2 只取 1 分區;c3 只取 2 分區的數據。
甚至咱們能夠將消費者多進程部署,這樣的消費方式以下:
假設 Topic:data-push
的分區數爲 4 個,那咱們就能夠按照圖中的方式建立兩個進程。
每一個進程內有兩個線程,每一個線程再去消費對應的分區。
這樣當咱們性能不夠新增 Topic 的分區數時,消費者這邊只須要這樣水平擴展便可,很是的靈活。
這種自定義分區消費的方式在某些場景下仍是適用的,好比生產者每次都將某一類的數據只發往一個分區。這樣咱們就能夠只針對這一個分區消費。
但這種方式有一個問題:可用性不高,當其中一個進程掛掉以後;該進程負責的分區數據無法轉移給其餘進程處理。
消費組模式應當是使用最多的一種消費方式。
咱們能夠建立 N 個消費者實例(new KafkaConsumer()
),當這些實例都用同一個 group.id
來建立時,他們就屬於同一個消費組。
在同一個消費組中的消費實例能夠收到消息,但一個分區的消息只會發往一個消費實例。
仍是藉助官方的示例圖來更好的理解它。
某個 Topic 有四個分區 p0 p1 p2 p3
,同時建立了兩個消費組 groupA,groupB
。
C一、C2
。C三、C四、C五、C6
。這樣消息是如何劃分到每一個消費實例的呢?
經過圖中能夠得知:
須要注意的是:
這裏的消費實例簡單的能夠理解爲
new KafkaConsumer
,它和進程沒有關係。
好比說某個 Topic 有三個分區,可是我啓動了兩個進程來消費它。
其中每一個進程有兩個消費實例,那其實就至關於有四個實例了。
這時可能就會問 4 個實例怎麼消費 3 個分區呢?
這個 Kafka 已經幫我作好了,它會來作消費組裏的 Rebalance
。
好比上面的狀況,3 個分區卻有 4 個消費實例;最終確定只有三個實例能取到消息。但至因而哪三個呢,這點 Kakfa 會自動幫咱們分配好。
看個例子,還在以前的 data-push
這個 Topic,其中有三個分區。
當其中一個進程(其中有三個線程,每一個線程對應一個消費實例)時,消費結果以下:
裏邊的 20 條數據都被這個進程的三個實例消費掉。
這時我新啓動了一個進程,程序和上面那個如出一轍;這樣就至關於有兩個進程,同時就是 6 個實例。
我再發送 10 條消息會發現:
進程1 只取到了分區 1 裏的兩條數據(以前是全部數據都是進程1裏的線程獲取的)。
同時進程2則消費了剩下的 8 條消息,分別是分區 0、2 的數據(總的仍是隻有三個實例取到了數據,只是分別在不一樣的進程裏)。
當我關掉進程2,再發送10條數據時會發現全部數據又被進程1裏的三個線程消費了。
經過這些測試相信你們已經能夠看到消費組的優點了。
咱們能夠在一個消費組中建立多個消費實例來達到高可用、高容錯的特性,不會出現單線程以及獨立消費者掛掉以後數據不能消費的狀況。同時基於多線程的方式也極大的提升了消費效率。
而當新增消費實例或者是消費實例掛掉時 Kakfa
會爲咱們從新分配消費實例與分區的關係就被稱爲消費組 Rebalance
。
發生這個的前提條件通常有如下幾個:
Rebalance
。因此推薦使用這樣的方式消費數據,同時擴展性也很是好。當性能不足新增分區時只須要啓動新的消費實例加入到消費組中便可。
本次只分享了幾個不一樣消費數據的方式,並無着重研究消費參數、源碼;這些內容感興趣的話能夠在下次分享。
文中提到的部分源碼能夠在這裏查閱:
https://github.com/crossoverJie/JCSprout
歡迎關注公衆號一塊兒交流: