Kafka監控工具KafkaOffsetMonitor配置及使用

 1、KafkaOffsetMonitor簡述

KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,咱們能夠瀏覽當前的消費者組,而且每一個Topic的全部Partition的消費狀況均可以一目瞭然。html

2、KafkaOffsetMonitor下載

KafkaOffsetMonitor託管在Github上,能夠經過Github下載。
下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releasesjava

或者下載百度網盤:連接:https://pan.baidu.com/s/1geEBEvT 密碼:jaeulinux

 

3、KafkaOffsetMonitor啓動

將下載下來的KafkaOffsetMonitor jar包上傳到linux上,能夠新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,經過java編譯命令來運行這個jar包:git

[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回車後,能夠看到控制檯輸出:

serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088

若是沒有指定端口,則默認會開啓一個隨機端口。github

參數說明:

zk :zookeeper主機地址,若是有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認爲offsetapp

爲了更方便的啓動KafkaOffsetMonitor,能夠寫一個啓動腳原本直接運行,我這裏新建一個名爲:kafka-monitor-start.sh的腳本,而後編輯這個腳本:數據庫

[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;

而後退出保存便可,接下來修改一下kafka-monitor-start.sh的權限apache

[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh 

啓動KafkaOffsetMonitor:vim

[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [root@kafka50 KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root   16u  IPv6  26047      0t0  TCP *:radan-http (LISTEN)

4、KafkaOffsetMonitor Web UI

在遊覽器中輸入:http://ip:port便可以查看KafkaOffsetMonitor Web UI,以下圖:安全

 

在下圖中有一個Visualizations選項卡,點擊其中的Cluster Overview能夠查看當前Kafka集羣的Broker狀況app

 

5、簡單的Producer

一、新建一個Topic

  首先爲本次試驗新建一個Topic,命令以下:

bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer

 

二、新建SimpleProducer代碼

  在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }

  程序運行效果: 
  這裏寫圖片描述

三、ConsoleConsumer消費該topic

  用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer

  消費截圖以下: 
  這裏寫圖片描述

四、KafkaOffsetMonitor頁面

(1)在Topic List選項卡中,咱們能夠看到剛纔新建的kafkamonitor-simpleproducer 
  這裏寫圖片描述
(2)點開後,能看到有一個console-consumer正在消費該topic 
  這裏寫圖片描述
(3)繼續進入該Consumer,能夠查看該Consumer當前的消費情況 
  這裏寫圖片描述
  這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。 
  圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯後於Producer的message數目。 
(4)看一眼各partition中的message消費狀況 
  這裏寫圖片描述
  從上圖能夠看到,當前有3個Partition,每一個Partition中的message數目分佈很不均勻。這裏能夠與接下來的自定義Producer的狀況進行一個對比。

6、自定義Partitioner的Producer

一、新建一個Topic

bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer

二、Partitioner代碼

  邏輯很簡單,循環依次往各Partition中發送message。

import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */
public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }

三、Producer代碼

  將自定義的Partitioner設置到Producer,其餘調用過程和二中相似。

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }

  代碼運行效果以下圖: 
  這裏寫圖片描述

四、ConsoleConsumer消費Message

bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer

 

  消費效果以下圖: 
  這裏寫圖片描述

五、KafkaOffsetMonitor頁面

  其餘頁面與上面的相似,這裏只觀察一下每一個partition中的message數目與第二節中的對比。能夠看到這裏每一個Partition中message分別是很均勻的。 
  這裏寫圖片描述

注意事項: 
  注意這裏有一個坑,默認狀況下Producer往一個不存在的Topic發送message時會自動建立這個Topic。因爲在這個封裝中,有同時傳遞message和topic的狀況,若是調用方法時傳入的參數反了,將會在Kafka集羣中自動建立Topic。在正常狀況下,應該是先把Topic根據須要建立好,而後Producer往該Topic發送Message,最好把Kafka這個默認自動建立Topic的功能關掉。 
  那麼,假設真的不當心建立了多餘的Topic,在刪除時,會出現「marked for deletion」提示,只是將該topic標記爲刪除,使用list命令仍然能看到。若是須要調整這兩個功能的話,在server.properties中配置以下兩個參數:

 

參數 默認值 做用
auto.create.topics.enable true Enable auto creation of topic on the server
delete.topic.enable false Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off

 

七,KafkaOffsetMonitor 總結

KafkaOffsetMonitor:程序一個jar包的形式運行,部署較爲方便。只有監控功能,使用起來也較爲安全。除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款:Kafka Web Console:監控功能較爲全面,能夠預覽消息,監控Offset、Lag等信息,但存在bug,不建議在生產環境中使用。Kafka Manager:偏向Kafka集羣管理,若操做不當,容易致使集羣出現故障。對Kafka實時生產和消費消息是經過JMX實現的。沒有記錄Offset、Lag等信息。

相關文章
相關標籤/搜索