本次的教程是我想監控kafka的消費狀況,舉個栗子前端
[root@VM_0_98_centos bin]# ./kafka-consumer-groups.sh --bootstrap-server 172.20.150.1:9092 --describe --group ee TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ee_172_20_50 0 93864 93864 0 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_25 0 592471 592480 9 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_19 0 156781 156781 0 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_26 0 1345 1345 0 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_22 0 197724 197747 23 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_23 0 147067 147067 0 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_24 0 620405 620406 1 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_172_20_21 0 7883826 7883828 2 ee-0-213a104f-f2b7-490d-80cd-a4e391f292ab /172.20.150.1 ee-0 ee_scm 0 205365 205365 0 - - - 說明: 紅色部分表示表示消費的個數,黃色部分爲生產的個數,綠色部分爲剩餘多少個
這個夠直接了吧,先說一下爲啥使用LLD 呢,這個先不解釋。看完你應該會明白吧,若是不明白,就跟着耍一遍,應該就明白了java
[root@VM_0_98_centos ~]# cat /opt/zabbix_agent/conf/zabbix_agentd.conf
PidFile=/opt/zabbix_agent/pids/zabbix_agentd.pid
LogFile=/opt/zabbix_agent/logs/zabbix_agentd.log
LogFileSize=0
AllowRoot=1
# StartAgents=0
Server=zabbix-server_IP
ServerActive=zabbix-server_IP
Hostname=VM_0_98_centos_zabbix-agent_IP
Include=/opt/zabbix_agent/conf/zabbix_agentd/*.conf
UnsafeUserParameters=1
HostMetadataItem=system.uname
HostMetadata=ee_mq
Timeout=30
在總的配置文件中定義導入其餘配置文件,這裏說一下爲啥須要分配置文件,由於我以爲清爽,沒啥理由。若是硬要給一個理由就是,我喜歡這樣,好吧,我認可這樣很是方便管理python
[root@VM_0_98_centos zabbix_agentd]# ll total 20 -rwxr-xr-x 1 zabbix zabbix 173 Mar 12 11:11 java_process.conf -rw-r--r-- 1 root root 180 Mar 15 11:47 kafka.conf -rwxr-xr-x 1 zabbix zabbix 75 Mar 8 17:11 tcp_conn_status.conf
看到沒,我分配了不少配置文件,每一個文件對應前端一個模板,腳本中的一個或者幾個腳本。這樣管理起來很是的方便。若是不分開,後期維護的人會很是恨你。曾經據說一個程序員因不滿其餘四個同事的作事風格把他們四個槍斃了,你看着辦吧nginx
[root@VM_0_98_centos zabbix_agentd]# cat kafka.conf UserParameter=kafka.discovery,sudo python /opt/zabbix_agent/scripts/kafka/get_kafka.py UserParameter=kafka.data[*],sudo python /opt/zabbix_agent/scripts/kafka/get_data.py $1 $2 $3
腳本一共三個文件程序員
[root@VM_0_98_centos kafka]# ll total 12 -rwxr-xr-x 1 root root 2526 Mar 15 11:42 get_data.py -rwxr-xr-x 1 zabbix zabbix 2279 Mar 15 11:32 get_kafka.py -rw-r--r-- 1 zabbix zabbix 18 Mar 14 16:47 kafka_monitor.yaml
1)先來看看配置文件中是啥kafka_monitor.yamlshell
[root@VM_0_98_centos kafka]# cat kafka_monitor.yaml groups: ee ng
沒錯,就是你logstash中本身定義的groupjson
2)先看get_kafka.py 中是幹啥的呢?bootstrap
#!/usr/bin/env python # coding:utf-8 import yaml import os import sys import subprocess import re import time import json # ./kafka-consumer-groups.sh --bootstrap-server 172.20.150.1:9092 --describe --group ee class KafkaMonitor(object): def __init__(self): self.bootstrap_server = "172.20.150.1:9092" self.cmd = "/opt/kafka/kafka_2.12-2.1.0/bin/kafka-consumer-groups.sh" try: f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'kafka_monitor.yaml') self.groups = yaml.load(f) except IOError as e: print 'Error, kafka_monitor.yaml is not found' sys.exit(1) else: f.close() if self.groups is None: self.groups = {} print 'Error, kafka_monitor.yaml content is empty' sys.exit(2) def run(self): self.result_list = [] for self.group in self.groups.values()[0].split(): self.cmd_run = "%s --bootstrap-server %s --describe --group %s | egrep -v 'TOPIC|^$'" % ( self.cmd, self.bootstrap_server, self.group) subp = subprocess.Popen(self.cmd_run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.datas_list = subp.stdout.readlines() self.result_dict = {} for data_list in self.datas_list: data_list = data_list.strip().split() # self.result_data = {'datetime': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic': data_list[0], # 'group': self.arg1, 'partition': int(data_list[1]), 'logsize': data_list[2], # 'offset': int(data_list[3]), 'lag': data_list[4]} # self.result_data = {"{#GROUPNAME}": self.arg1,"{#TOPICNAME}": data_list[0], # "{#LOGSIZE}": int(data_list[2]),"{#OFFSET}":int(data_list[3]),"{#LAG}": int(data_list[4])} self.result_data = {"{#GROUPNAME}": self.group,"{#TOPICNAME}": data_list[0]} self.result_list.append(self.result_data) print json.dumps({"data": self.result_list},sort_keys=True,indent=4) if __name__ == "__main__": client = KafkaMonitor() client.run()
也沒啥,就是讀取剛纔的kafka_monitor.yaml 的配置文件,而後執行一條指令。就是開始的那個指令,獲取一些你想要的數據,而後清洗一下格式,獲得你想要的結果。用官方的話說就是 kafka_monitor.yaml是輸入---> 處理 ---> 獲得你想要的結果。centos
看看結果是啥吧bash
[root@VM_0_98_centos kafka]# python get_kafka.py { "data": [ { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "mqtt_110_2" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_50" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_25" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_19" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_26" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_22" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_23" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_24" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_21" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_20" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_28" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_27" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_26" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_scm" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_access_172_20_28" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_error_172_20_20" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_access_172_20_20" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_error_172_20_28" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "nginx_error" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "nginx_access" } ] }
沒錯,就是groupname和topicname,具體爲啥是這種格式,zabbix規定的吧。其中 {#GROUPNAME}和{#TOPICNAME}能夠在zabbix-server前端配置頁中看作是 宏變量
3)看一下第三個腳本吧get_data.py
這個是幹啥的呢,就是你給我
#!/usr/bin/env python # coding:utf-8 import yaml import os import sys import subprocess import re import time import json # ./kafka-consumer-groups.sh --bootstrap-server 172.20.150.1:9092 --describe --group ee class KafkaMonitor(object): def __init__(self): self.group_name = sys.argv[1] self.topic_name = sys.argv[2] self.data_type = sys.argv[3] self.bootstrap_server = "172.20.150.1:9092" self.cmd = "/opt/kafka/kafka_2.12-2.1.0/bin/kafka-consumer-groups.sh" try: f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'kafka_monitor.yaml') self.groups = yaml.load(f) except IOError as e: print 'Error, kafka_monitor.yaml is not found' sys.exit(1) else: f.close() if self.groups is None: self.groups = {} print 'Error, kafka_monitor.yaml content is empty' sys.exit(2) def run(self): self.result_list = [] for self.group in self.groups.values()[0].split(): self.cmd_run = "%s --bootstrap-server %s --describe --group %s | egrep -v 'TOPIC|^$'" % ( self.cmd, self.bootstrap_server, self.group) subp = subprocess.Popen(self.cmd_run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.datas_list = subp.stdout.readlines() self.result_dict = {} for data_list in self.datas_list: data_list = data_list.strip().split() if self.group_name == self.group and self.topic_name == data_list[0]: if self.data_type == "offset": print int(data_list[3]) elif self.data_type == "logsize": print int(data_list[2]) else: print int(data_list[4]) # self.result_data = {'datetime': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic': data_list[0], # 'group': self.arg1, 'partition': int(data_list[1]), 'logsize': data_list[2], # 'offset': int(data_list[3]), 'lag': data_list[4]} # self.result_data = {"{#GROUPNAME}": self.arg1,"{#TOPICNAME}": data_list[0], # "{#LOGSIZE}": int(data_list[2]),"{#OFFSET}":int(data_list[3]),"{#LAG}": int(data_list[4])} if __name__ == "__main__": client = KafkaMonitor() client.run()
以供接收三個參數,groupname和topname,data_type(lag/offset/logsize) 而後輸出一個值,看一下執行結果吧
[root@VM_0_98_centos kafka]# python get_data.py ee personal_income_tax_172_20_26 lag 0
看到了吧。執行有點慢,到了這步以後,你就能夠重啓你的zabbix_agent了
[root@VM_4_84_centos ~]# zabbix_get -s 172.20.150.1 -p 10050 -k kafka.discovery { "data": [ { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "mqtt_110_2" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_50" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_25" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_19" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_26" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_22" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_23" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_24" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_172_20_21" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_20" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_28" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_27" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "personal_income_tax_172_20_26" }, { "{#GROUPNAME}": "ee", "{#TOPICNAME}": "ee_scm" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_access_172_20_28" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_error_172_20_20" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_access_172_20_20" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "personal_income_tax_nginx_error_172_20_28" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "nginx_error" }, { "{#GROUPNAME}": "ng", "{#TOPICNAME}": "nginx_access" } ] }
[root@VM_4_84_centos ~]# zabbix_get -s 172.20.150.1 -p 10050 -k kafka.data[ng,personal_income_tax_nginx_access_172_20_20,lag] 15
看到了吧,是否是很簡單
3 zabbix-server 頁面配置
1)看圖吧
看圖吧
仍是看圖吧哈哈
2)搞個圖出來吧
而後應用到你的主機就能夠啦。
如今說說爲啥這種狀況,我選擇了LLD呢,由於個人topicname很是多,我不想本身配置,後期仍是不斷的增長,我也不想配置。那這種LLD的方式能夠知足你,這裏須要注意一點的是,若是你的組增長了,須要在配置文件中 kafka_monitor.yaml 加上就能夠了。
所謂的自動化,個人理解就是想法設法的偷懶,而且標準化的環境,高效的執行,你只須要看着他執行就好了。
寫在最後,祝願天下全部有情人終成眷屬