消費組組(Consumer group)能夠說是kafka頗有亮點的一個設計。傳統的消息引擎處理模型主要有兩種,隊列模型,和發佈-訂閱模型。html
隊列模型:早期消息處理引擎就是按照隊列模型設計的,所謂隊列模型,跟隊列數據結構相似,生產者產生消息,就是入隊,消費者接收消息就是出隊,並刪除隊列中數據,消息只能被消費一次。但這種模型有一個問題,那就是隻能由一個消費者消費,沒法直接讓多個消費者消費數據。基於這個缺陷,後面又演化出發佈-訂閱模型。正則表達式
發佈-訂閱模型:發佈訂閱模型中,多了一個主題。消費者會預先訂閱主題,生產者寫入消息到主題中,只有訂閱了該主題的消費者才能獲取到消息。這樣一來就可讓多個消費者消費數據。算法
以往的消息處理引擎大多隻支持其中一種模型,但藉助kafka的消費者組機制,能夠同時實現這兩種模型。同時還可以對消費組進行動態擴容,讓消費變得易於伸縮。apache
這篇咱們先介紹下消費者組,而後主要討論kafka著名的重平衡機制。bootstrap
所謂消費者組,那天然是由消費者組成的,組內能夠有一個或多個消費者實例,而這些消費者實例共享一個id,稱爲group id。對了,默認建立消費者的group id是在KAFKA_HOME/conf/consumer.properties文件中定義的,打開就能看到。默認的group id值是test-consumer-group。服務器
消費者組內的全部成員一塊兒訂閱某個主題的全部分區,注意一個消費者組中,每個分區只能由組內的一消費者訂閱。網絡
看看下面這張圖,這是kakfa官網上給出的說明圖。session
這張圖應該很好的說明了消費者組,咱們從上到下解釋一下,kafka cluster中有兩臺broker服務器,每一臺都有兩個分區,這四個分區都是同一個topic下的。下左的消費者組A,組內有兩個消費者,每一個消費者負責兩個分區的消費,而右邊的消費者組B有四個消費者,每一個負責消費一個分區。數據結構
當消費者組中只有一個消費者的時候,就是消息隊列模型,否則就是發佈-訂閱模型,而且易於伸縮。分佈式
上面那張圖,仔細推敲一下就會發現,圖中其實已經有一些既定的事實,好比消費者組內消費者小於或等於分區數,以及topic分區數恰好是消費者組內成員數的倍數。
那麼若是消費者組內成員數超過度區數會怎樣呢?好比有4個分區,但消費者組內有6個消費者,這時候有2個消費者不會分配分區,它會一直空閒。
而若是消費者不是分區的倍數,好比topic內有4個分區,而消費者組內有三個消費者,那怎麼辦呢?這時候只會有兩個消費者分別被分配兩個分區,第三個消費者一樣空閒。
因此,消費者組內的消費者數量最好是與分區數持平,再不濟,最好也是要是分區數的數量成比例。
這裏順便說下如何查看消費者組及組內消費狀況,可使用ConsumerGroupCommand命令工具,來查看具體的kafka消費者組。注意,這裏都是以最新版的kafka版本,也就是2.+版本。
可使用以下命令列出當前集羣中的kafka組信息。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list test-consumer-group
具體到某個組的消費者狀況,可使用下面這條命令工具:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
說完消費者組,再來講說與消費者組息息相關的重平衡機制。重平衡能夠說是kafka爲人詬病最多的一個點了。
重平衡其實就是一個協議,它規定了如何讓消費者組下的全部消費者來分配topic中的每個分區。好比一個topic有100個分區,一個消費者組內有20個消費者,在協調者的控制下讓組內每個消費者分配到5個分區,這個分配的過程就是重平衡。
重平衡的觸發條件主要有三個:
爲何說重平衡爲人詬病呢?由於重平衡過程當中,消費者沒法從kafka消費消息,這對kafka的TPS影響極大,而若是kafka集內節點較多,好比數百個,那重平衡可能會耗時極多。數分鐘到數小時都有可能,而這段時間kafka基本處於不可用狀態。因此在實際環境中,應該儘可能避免重平衡發生。
瞭解了什麼是重平衡,重平衡的缺點和觸發條件後,咱們先來看看重平衡的三種不一樣策略,而後說說應該如何避免重平衡發生。
kafka提供了三種重平衡分配策略,這裏順便介紹一下:
具體實現位於,package org.apache.kafka.clients.consumer.RangeAssignor。
這種分配是基於每一個主題的分區分配,若是主題的分區分區不能平均分配給組內每一個消費者,那麼對該主題,某些消費者會被分配到額外的分區。咱們來看看具體的例子。
舉例:目前有兩個消費者C0和C1,兩個主題t0和t1,每一個主題三個分區,分別是t0p0,t0p1,t0p2,和t1p0,t1p1,t1p2。
那麼分配狀況會是:
我來大概解釋一下,range這種模式,消費者被分配的單位是基於主題的,拿上面的例子來講,是主題t0的三個分區分配給2個消費者,t1三個分區分配給消費者。因而便會出現消費者c0分配到主題c0兩個主題,c1兩個主題的狀況,而非每一個消費者分配兩個主題各三個分區。
具體實現位於,package org.apache.kafka.clients.consumer.RoundRobinAssignor。
RoundRobin是基於所有主題的分區來進行分配的,同時這種分配也是kafka默認的rebalance分區策略。仍是用剛剛的例子來看,
舉例:兩個消費者C0和C1,兩個主題t0和t1,每一個主題三個分區,分別是t0p0,t0p1,t0p2,和t1p0,t1p1,t1p2。
因爲是基於所有主題的分區,那麼分配狀況會是:
由於是基於所有主題的分區來平均分配給消費者,因此這種分配策略能更加均衡得分配分區給每個消費者。
上面說的都是同一消費者組內消費組都訂閱相同主題的狀況。更復雜的狀況是,同一組內的消費者訂閱不一樣的主題,那麼任然可能會致使分區不均衡的狀況。
仍是舉例說明,有三個消費者C0,C1,C2 。三個主題t0,t1,t2,分別有1,2,3個分區 t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。
其中,C0訂閱t0,C1訂閱t0,t1。C2訂閱t0,t1,t2。最終訂閱狀況以下:
這個結果乍一看有點迷,其實能夠這樣理解,按照序號順序進行循環分配,t0只有一個分區,先碰到C0就分配給它了。t1有兩個分區,被C1和C2訂閱,那麼會循環將兩個分區分配出去,最後到t2,有三個分區,卻只有C2訂閱,那麼就將三個分區分配給C2。
Sticky分配策略是最新的也是最複雜的策略,其具體實現位於package org.apache.kafka.clients.consumer.StickyAssignor。
這種分配策略是在0.11.0才被提出來的,主要是爲了必定程度解決上面提到的重平衡非要從新分配所有分區的問題。稱爲粘性分配策略。
聽名字就知道,主要是爲了讓目前的分配儘量保持不變,只挪動儘量少的分區來實現重平衡。
仍是舉例說明,有三個消費者C0,C1,C2 。三個主題t0,t1,t2,t3。每一個主題各有兩個分區, t0p0,t0p1,t1p0,t1p1,t2p0,t2p1,t3p0,t3p1。
如今訂閱狀況以下:
假設如今C1掛掉了,若是是RoundRobin分配策略,那麼會變成下面這樣:
就是說它會所有從新打亂,再分配,而如何使用Sticky分配策略,會變成這樣:
也就是說,儘量保留了原來的分區狀況,不去改變它,在這個基礎上進行均衡分配,不過這個策略目前彷佛還有些bug,因此實際使用也很少。
要說徹底避免重平衡,那是不可能滴,由於你沒法徹底保證消費者不會故障。而消費者故障其實也是最多見的引起重平衡的地方,因此這裏主要介紹如何盡力避免消費者故障。
而其餘幾種觸發重平衡的方式,增長分區,或是增長訂閱的主題,抑或是增長消費者,更多的是主動控制,這裏也很少討論。
首先要知道,若是消費者真正掛掉了,那咱們是沒有什麼辦法的,但實際中,會有一些狀況,會讓kafka錯誤地認爲一個正常的消費者已經掛掉了,咱們要的就是避免這樣的狀況出現。
固然要避免,那首先要知道哪些狀況會出現錯誤判斷掛掉的狀況。在分佈式系統中,一般是經過心跳來維持分佈式系統的,kafka也不例外。對這部份內容有興趣能夠看看我以前的這篇分佈式系統一致性問題與Raft算法(上)。這裏要說的是,在分佈式系統中,因爲網絡問題你不清楚沒接收到心跳,是由於對方真正掛了仍是隻是由於負載太重沒來得及發生心跳或是網絡堵塞。因此通常會約定一個時間,超時即斷定對方掛了。而在kafka消費者場景中,session.timout.ms參數就是規定這個超時時間是多少。
還有一個參數,heartbeat.interval.ms,這個參數控制發送心跳的頻率,頻率越高越不容易被誤判,但也會消耗更多資源。
此外,還有最後一個參數,max.poll.interval.ms,咱們都知道消費者poll數據後,須要一些處理,再進行拉取。若是兩次拉取時間間隔超過這個參數設置的值,那麼消費者就會被踢出消費者組。也就是說,拉取,而後處理,這個處理的時間不能超過max.poll.interval.ms這個參數的值。這個參數的默認值是5分鐘,而若是消費者接收到數據後會執行耗時的操做,則應該將其設置得大一些。
小結一下,其實主要就是三個參數,session.timout.ms控制心跳超時時間,heartbeat.interval.ms控制心跳發送頻率,以及max.poll.interval.ms控制poll的間隔。這裏給出一個相對較爲合理的配置,以下:
以上~