KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,咱們能夠瀏覽當前的消費者組,而且每一個Topic的全部Partition的消費狀況均可以一目瞭然。html
KafkaOffsetMonitor託管在Github上,能夠經過Github下載。
下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releasesjava
或者下載百度網盤:連接:https://pan.baidu.com/s/1geEBEvT 密碼:jaeulinux
將下載下來的KafkaOffsetMonitor jar包上傳到linux上,能夠新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,經過java編譯命令來運行這個jar包:git
[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回車後,能夠看到控制檯輸出: serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp} 2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088
若是沒有指定端口,則默認會開啓一個隨機端口。github
參數說明:
zk :zookeeper主機地址,若是有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認爲offsetapp
爲了更方便的啓動KafkaOffsetMonitor,能夠寫一個啓動腳原本直接運行,我這裏新建一個名爲:kafka-monitor-start.sh的腳本,而後編輯這個腳本:數據庫
[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;
而後退出保存便可,接下來修改一下kafka-monitor-start.sh的權限apache
[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh
啓動KafkaOffsetMonitor:vim
[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [root@kafka50 KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
在遊覽器中輸入:http://ip:port便可以查看KafkaOffsetMonitor Web UI,以下圖:安全
在下圖中有一個Visualizations選項卡,點擊其中的Cluster Overview能夠查看當前Kafka集羣的Broker狀況app
首先爲本次試驗新建一個Topic,命令以下:
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer
在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }
程序運行效果:
用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消費截圖以下:
(1)在Topic List選項卡中,咱們能夠看到剛纔新建的kafkamonitor-simpleproducer
(2)點開後,能看到有一個console-consumer正在消費該topic
(3)繼續進入該Consumer,能夠查看該Consumer當前的消費情況
這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。
圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯後於Producer的message數目。
(4)看一眼各partition中的message消費狀況
從上圖能夠看到,當前有3個Partition,每一個Partition中的message數目分佈很不均勻。這裏能夠與接下來的自定義Producer的狀況進行一個對比。
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer
邏輯很簡單,循環依次往各Partition中發送message。
import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */
public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }
將自定義的Partitioner設置到Producer,其餘調用過程和二中相似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }
代碼運行效果以下圖:
bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer
消費效果以下圖:
其餘頁面與上面的相似,這裏只觀察一下每一個partition中的message數目與第二節中的對比。能夠看到這裏每一個Partition中message分別是很均勻的。
注意事項:
注意這裏有一個坑,默認狀況下Producer往一個不存在的Topic發送message時會自動建立這個Topic。因爲在這個封裝中,有同時傳遞message和topic的狀況,若是調用方法時傳入的參數反了,將會在Kafka集羣中自動建立Topic。在正常狀況下,應該是先把Topic根據須要建立好,而後Producer往該Topic發送Message,最好把Kafka這個默認自動建立Topic的功能關掉。
那麼,假設真的不當心建立了多餘的Topic,在刪除時,會出現「marked for deletion」提示,只是將該topic標記爲刪除,使用list命令仍然能看到。若是須要調整這兩個功能的話,在server.properties中配置以下兩個參數:
參數 | 默認值 | 做用 |
---|---|---|
auto.create.topics.enable | true | Enable auto creation of topic on the server |
delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |
KafkaOffsetMonitor:程序一個jar包的形式運行,部署較爲方便。只有監控功能,使用起來也較爲安全。除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款:Kafka Web Console:監控功能較爲全面,能夠預覽消息,監控Offset、Lag等信息,但存在bug,不建議在生產環境中使用。Kafka Manager:偏向Kafka集羣管理,若操做不當,容易致使集羣出現故障。對Kafka實時生產和消費消息是經過JMX實現的。沒有記錄Offset、Lag等信息。