Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。Kafka以下特性,受到諸多公司的青睞。html
一、高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息(核心目標之一)。node
二、支持經過Kafka服務器和消費機集羣來分區消息json
…………bootstrap
Kafka的做用我就不在這BB了,你們能夠瞅瞅http://blog.jobbole.com/75328/,總結的很是好。bash
Kafka監控的幾個指標服務器
一、lag:多少消息沒有消費微信
二、logsize:Kafka存的消息總數app
三、offset:已經消費的消息分佈式
lag = logsize - offset, 主要監控lag是否正常網站
crontab設置每分鐘執行spoorer.py
# -*- coding:utf-8 -*- import os, sys, time, json, yaml from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError from kafka import (KafkaClient, KafkaConsumer) class spoorerClient(object): def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url='/', timeout=3, log_dir='/tmp/spoorer'): self.zookeeper_hosts = zookeeper_hosts self.kafka_hosts = kafka_hosts self.timeout = timeout self.log_dir = log_dir self.log_file = log_dir + '/' + 'spoorer.log' self.kafka_logsize = {} self.result = [] self.log_day_file = log_dir + '/' + 'spoorer_day.log.' + str(time.strftime("%Y-%m-%d", time.localtime())) self.log_keep_day = 1 try: f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'spoorer.yaml') self.white_topic_group = yaml.load(f) except IOError as e: print 'Error, spoorer.yaml is not found' sys.exit(1) else: f.close() if self.white_topic_group is None: self.white_topic_group = {} if not os.path.exists(self.log_dir): os.mkdir(self.log_dir) for logfile in [x for x in os.listdir(self.log_dir) if x.split('.')[-1] != 'log' and x.split('.')[-1] != 'swp']: if int(time.mktime(time.strptime(logfile.split('.')[-1], '%Y-%m-%d'))) < int(time.time()) - self.log_keep_day * 86400: os.remove(self.log_dir + '/' + logfile) if zookeeper_url == '/': self.zookeeper_url = zookeeper_url else: self.zookeeper_url = zookeeper_url + '/' def spoorer(self): try: kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout) except Exception as e: print "Error, cannot connect kafka broker." sys.exit(1) else: kafka_topics = kafka_client.topics finally: kafka_client.close() try: zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout) zookeeper_client.start() except Exception as e: print "Error, cannot connect zookeeper server." sys.exit(1) try: groups = map(str,zookeeper_client.get_children(self.zookeeper_url + 'consumers')) except NoNodeError as e: print "Error, invalid zookeeper url." zookeeper_client.stop() sys.exit(2) else: for group in groups: if 'offsets' not in zookeeper_client.get_children(self.zookeeper_url + 'consumers/%s' % group): continue topic_path = 'consumers/%s/offsets' % (group) topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path)) if len(topics) == 0: continue for topic in topics: if topic not in self.white_topic_group.keys(): continue elif group not in self.white_topic_group[topic].replace(' ','').split(','): continue partition_path = 'consumers/%s/offsets/%s' % (group,topic) partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path)) for partition in partitions: base_path = 'consumers/%s/%s/%s/%s' % (group, '%s', topic, partition) owner_path, offset_path = base_path % 'owners', base_path % 'offsets' offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0] try: owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0] except NoNodeError as e: owner = 'null' metric = {'datetime':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic':topic, 'group':group, 'partition':int(partition), 'logsize':None, 'offset':int(offset), 'lag':None, 'owner':owner} self.result.append(metric) finally: zookeeper_client.stop() try: kafka_consumer = KafkaConsumer(bootstrap_servers=self.kafka_hosts) except Exception as e: print "Error, cannot connect kafka broker." sys.exit(1) else: for kafka_topic in kafka_topics: self.kafka_logsize[kafka_topic] = {} partitions = kafka_client.get_partition_ids_for_topic(kafka_topic) for partition in partitions: offset = kafka_consumer.get_partition_offsets(kafka_topic, partition, -1, 1)[0] self.kafka_logsize[kafka_topic][partition] = offset with open(self.log_file,'w') as f1, open(self.log_day_file,'a') as f2: for metric in self.result: logsize = self.kafka_logsize[metric['topic']][metric['partition']] metric['logsize'] = int(logsize) metric['lag'] = int(logsize) - int(metric['offset']) f1.write(json.dumps(metric,sort_keys=True) + '\n') f1.flush() f2.write(json.dumps(metric,sort_keys=True) + '\n') f2.flush() finally: kafka_consumer.close() return '' if __name__ == '__main__': check = spoorerClient(zookeeper_hosts=‘zookeeperIP地址:端口', zookeeper_url=‘znode節點', kafka_hosts=‘kafkaIP:PORT', log_dir='/tmp/log/spoorer', timeout=3) print check.spoorer()
格式:
kafka_topic_name: group_name1, group_name2, (group名字縮進4個空格,嚴格按照yaml格式)
{"datetime": "2016-03-18 11:36:02", "group": "group_name1", "lag": 73, "logsize": 28419259, "offset": 28419186, "owner": "消費partition線程", "partition": 3, "topic": "kafka_topic_name"}
monitor_kafka.sh腳本檢索spoorer.log文件,並配合zabbix監控
#!/bin/bash topic=$1 group=$2 #$3可取值lag、logsize、offset class=$3 case $3 in lag) echo "`cat /tmp/log/spoorer/spoorer.log | grep -w \\"${topic}\\" | grep -w \\"${group}\\" |awk -F'[ ,]' '{sum+=$9}'END'{print sum}'`" ;; logsize) echo "`cat /tmp/log/spoorer/spoorer.log | grep -w \\"${topic}\\" | grep -w \\"${group}\\" |awk -F'[ ,]' '{sum+=$12}'END'{print sum}'`" ;; offset) echo "`cat /tmp/log/spoorer/spoorer.log | grep -w \\"${topic}\\" | grep -w \\"${group}\\" |awk -F'[ ,]' '{sum+=$15}'END'{print sum}'`" ;; *) echo "Error input:" ;; esac exit 0
zabbix_agentd.conf擴展配置
UserParameter=kafka.lag[*],/usr/local/zabbix-2.4.5/script/monitor_kafka.sh $1 $2 lag UserParameter=kafka.offset[*],/usr/local/zabbix-2.4.5/script/monitor_kafka.sh $1 $2 offset UserParameter=kafka.logsize[*],/usr/local/zabbix-2.4.5/script/monitor_kafka.sh $1 $2 logsize
zabbix設置Key
kafka.lag[kafka_topic_name,group_name1] kafka.logsize[kafka_topic_name,group_name1] kafka.offset[kafka_topic_name,group_name1]
報警的Trigger觸發規則也是對lag的值作報警,具體閥值設置爲多少,仍是看你們各自業務需求了。
接收告警消息能夠選擇郵件和短信、網上教程也比較多,教程帖子:
http://www.iyunv.com/thread-22904-1-1.html 10 http://www.iyunv.com/thread-40998-1-1.html 12
若是以爲本身搞這些比較麻煩的話,也能夠試試 OneAlert 一鍵集成zabbix,短信、電話、微信、APP啥都能搞定,還免費,用着不錯。
http://www.onealert.com/activity/zabbix.html 37