消息堆積是消息中間件的一大特點,消息中間件的流量削峯、冗餘存儲等功能正是得益於消息中間件的消息堆積能力。然而消息堆積實際上是一把亦正亦邪的雙刃劍,若是應用場合不恰當反而會對上下游的業務形成沒必要要的麻煩,好比消息堆積勢必會影響上下游整個調用鏈的時效性,有些中間件如RabbitMQ在發生消息堆積時在某些狀況下還會影響自身的性能。對於Kafka而言,雖然消息堆積不會對其自身性能帶來多大的困擾,但不免不會影響上下游的業務,堆積過多有可能會形成磁盤爆滿,或者觸發日誌清除策略而形成消息丟失的狀況。如何利用好消息堆積這把雙刃劍,監控是最爲關鍵的一步。java
消息堆積是消費滯後(Lag)的一種表現形式,消息中間件服務端中所留存的消息與消費掉的消息之間的差值即爲消息堆積量,也稱之爲消費滯後(Lag)量。對於Kafka而言,消息被髮送至Topic中,而Topic又分紅了多個分區(Partition),每個Partition都有一個預寫式的日誌文件,雖然Partition能夠繼續細分爲若干個段文件(Segment),可是對於上層應用來講能夠將Partition當作最小的存儲單元(一個由多個Segment文件拼接的「巨型文件」)。每一個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中。咱們來看下圖,其就是Partition的一個真實寫照: node
上圖中有四個概念:bootstrap
要計算Kafka中某個消費者的滯後量很簡單,首先看看其消費了幾個Topic,而後針對每一個Topic來計算其中每一個Partition的Lag,每一個Partition的Lag計算就顯得很是的簡單了,參考下圖:bash
由圖可知消費Lag=HW - ConsumerOffset。對於這裏你們有可能有個誤區,就是認爲Lag應該是LEO與ConsumerOffset之間的差值。LEO是對消費者不可見的,既然不可見何來消費滯後一說。app
那麼這裏就引入了一個新的問題,HW和ConsumerOffset的值如何獲取呢? 性能
首先來講說ConsumerOffset,Kafka中有兩處能夠存儲,一個是Zookeeper,而另外一個是」**consumer_offsets這個內部topic中,前者是0.8.x版本中的使用方式,可是隨着版本的迭代更新,如今愈來愈趨向於後者。就拿1.0.0版原本說,雖然默認是存儲在」**consumer_offsets」中,可是保不齊用於就將其存儲在了Zookeeper中了。這個問題倒也不難解決,針對兩種方式都去拉取,而後哪一個有值的取哪一個。不過這裏還有一個問題,對於消費位移來講,其通常不會實時的更新,而更多的是定時更新,這樣能夠提升總體的性能。那麼這個定時的時間間隔就是ConsumerOffset的偏差區間之一。ui
再來講說HW,其也是Kafka中Partition的一個狀態。有可能你會察覺到在Kafka的JMX中能夠看到「kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]」這樣一個屬性,可是這個值不是LEO而是HW。spa
那麼怎樣正確的計算消費的Lag呢?對Kafka熟悉的同窗可能會想到Kafka中自帶的kafka-consumer_groups.sh腳本中就有Lag的信息,示例以下:線程
[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
複製代碼
咱們深究一下kafka-consumer_groups.sh腳本,發現只有一句代碼:scala
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
複製代碼
其含義就是執行kafka.admin.ConsumerGroupCommand而已。進一步深究,在ConsumerGroupCommand內部抓住了2句關鍵代碼:
val consumerGroupService = new KafkaConsumerGroupService(opts)
val (state, assignments) = consumerGroupService.describeGroup()
複製代碼
代碼詳解:consumerGroupService的類型是ConsumerGroupServicesealed trait類型),而KafkaConsumerGroupService只是ConsumerGroupService的一種實現,還有一種實現是ZkConsumerGroupService,分別對應新版的消費方式(消費位移存儲在__consumer_offsets中)和舊版的消費方式(消費位移存儲在zk中),詳細計算步驟參考下一段落的內容。opt參數是指「 –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID」等參數。第2句代碼是調用describeGroup()方法來獲取具體的信息,即二元組中的assignments,這個assignments中保存了上面打印信息中的全部內容。
Scala小知識: 在Scala中trait(特徵)至關於Java的接口,實際上它比接口更大強大。與Java中的接口不一樣的是,它還能夠定義屬性和方法的實現(JDK8起的接口默認方法)。通常狀況下Scala中的類只能繼承單一父類,可是若是是trait的話就能夠繼承多個,從結果來看是實現了多重繼承。被sealed聲明的trait僅能被同一文件的類繼承。
ZkConsumerGroupService中計算消費lag的步驟以下:
KafkaConsumerGroupService中計算消費lag的步驟以下:
能夠看到KafkaConsumerGroupService與ZkConsumerGroupService的計算Lag的方式都差很少,可是KafkaConsumerGroupService能獲取更多消費詳情,而且ZkConsumerGroupService也被標註爲@Deprecated的了,後面內容都針對KafkaConsumerGroupService來作說明。既然Kafka已經爲咱們提供了線程的方法來獲取Lag,那麼咱們有何須再重複造輪子,這裏筆者寫了一個調用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala語言編寫的,在Java的程序裏使用相似scala.collection.Seq這樣的全名稱以防止混淆):
String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand
.PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup();
scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get();
scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator();
while (iterable.hasNext()) {
ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
pas.topic().get(), pas.partition().get(), pas.offset().get(),
pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
pas.host().get(), pas.clientId().get()));
}
複製代碼
在使用時,你能夠封裝一下這段代碼而後返回一個相似List<ConsumerGroupCommand.PartitionAssignmentState>的東西給上層業務代碼作進一步的使用。ConsumerGroupCommand.PartitionAssignmentState的代碼以下:
case class PartitionAssignmentState(
group: String, coordinator: Option[Node], topic: Option[String],
partition: Option[Int], offset: Option[Long], lag: Option[Long],
consumerId: Option[String], host: Option[String],
clientId: Option[String], logEndOffset: Option[Long])
複製代碼
Scala小知識: 對於case class, 在這裏你能夠簡單的把它當作是一個JavaBean,可是它遠比JavaBean強大,好比它會自動生成equals、hashCode、toString、copy、伴生對象、apply、unapply等等東西。在 scala 中,對保護(Protected)成員的訪問比 java 更嚴格一些。由於它只容許保護成員在定義了該成員的的類的子類中被訪問。而在java中,用protected關鍵字修飾的成員,除了定義了該成員的類的子類能夠訪問,同一個包裏的其餘類也能夠進行訪問。Scala中,若是沒有指定任何的修飾符,則默認爲 public。這樣的成員在任何地方均可以被訪問。
若是你正在試着運行上面一段程序,你會發現編譯失敗,報錯:cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。這時候須要將所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState類前面的protected修飾符去掉才能編譯經過。