RabbitMQ 的監控

技術博客:github.com/yongxinz/te…html

同時,也歡迎關注個人微信公衆號 AlwaysBeta,更多精彩內容等你來。 前端

上兩篇文章介紹了:node

接下來講說監控的相關內容。python

監控仍是很是重要的,特別是在生產環境。磁盤滿了,隊列積壓嚴重,若是咱們還不知道,老闆確定會懷疑,莫不是這傢伙要跑路?git

並且我如今就遇到了這樣的狀況,主要是隊列積壓的問題。因爲量不是很大,因此磁盤空間倒不是很擔憂,但有時程序執行會報錯,致使隊列一直消費不下去,這就很讓人尷尬了。程序員

查了一些資料,總結了一下。想要了解 RabbitMQ 的運行狀態,主要有三種途徑:Management UI,rabbitmqctl 命令和 REST API。github

Management UI

management-ui.png

RabbitMQ 給咱們提供了豐富的 Web 管理功能,經過頁面,咱們能看到 RabbitMQ 的總體運行情況,交換機和隊列的狀態等,還能夠進行人員管理和權限配置,至關全面。json

但若是想經過頁面來監控,那出不出問題只能靠緣分。看到出問題了,是運氣好,看不到出問題,那是必然。後端

這也是我當前的現狀,因此爲了不出現大問題,得趕忙改變一下。api

備註:經過 http://127.0.0.1:15672 來訪問 Web 頁面,默認狀況下用戶名和密碼都是 guest,但生產環境下都應該改掉的。

rabbitmqctl 命令

與前端頁面對應的就是後端的命令行命令了,一樣很是豐富。平時本身測試,或者臨時查看一些狀態時,也能用得上。但就我我的使用感受來講,用的並非不少。

我總結一些還算經常使用的,列在下面,你們各取所需:

# 啓動服務
rabbitmq-server

# 中止服務
rabbitmqctl stop

# vhost 增刪查
rabbitmqctl add_vhost
rabbitmqctl delete_vhost
rabbitmqctl list_vhosts

# 查詢交換機
rabbitmqctl list_exchanges

# 查詢隊列
rabbitmqctl list_queues

# 查看消費者信息
rabbitmqctl list_consumers

# user 增刪查
rabbitmqctl add_user
rabbitmqctl delete_user
rabbitmqctl list_users
複製代碼

REST API

終於來到重點了,對於程序員來講,看到有現成的 API 能夠調用,那真是太幸福了。

自動化監控和一些須要批量的操做,經過調用 API 來實現是最好的方式。好比有一些須要初始化的用戶和權限,就能夠經過腳原本一鍵完成,而不是經過頁面逐個添加,簡單又快捷。

下面是一些經常使用的 API:

# 歸納信息
curl -i -u guest:guest http://localhost:15672/api/overview

# vhost 列表
curl -i -u guest:guest http://localhost:15672/api/vhosts

# channel 列表
curl -i -u guest:guest http://localhost:15672/api/channels
      
# 節點信息
curl -i -u guest:guest http://localhost:15672/api/nodes
      
# 交換機信息
curl -i -u guest:guest http://localhost:15672/api/exchanges
      
# 隊列信息
curl -i -u guest:guest http://localhost:15672/api/queues
複製代碼

就我如今遇到的狀況來講,overviewqueues 這兩個 API 就能夠知足個人需求,你們也能夠根據本身項目的實際狀況來選擇。

API 返回內容是 json,並且字段仍是挺多的,剛開始看會感受一臉懵,具體含義對照官網的解釋和實際狀況來慢慢琢磨,弄懂也不是很困難。

下面代碼包含了 API 請求以及返回結果的解析,能夠在測試環境下執行,稍加更改就能夠應用到生產環境。

import json
import logging
import optparse

import requests

logging.basicConfig(
    format='%(asctime)s - %(pathname)s[%(lineno)d] - %(levelname)s: %(message)s',
    level=logging.INFO)
logger = logging.getLogger(__name__)


class RabbitMQMoniter(object):
    """ RabbitMQ Management API """
    def __init__(self, host='', port=15672, username='guest', password='guest'):
        self.host = host
        self.port = port
        self.username = username
        self.password = password

    def call_api(self, path):
        logger.info('call rabbit api to get data on ' + path)

        headers = {'content-type': 'application/json'}
        url = '{0}://{1}:{2}/api/{3}'.format('http', self.host, self.port, path)
        res = requests.get(url, headers=headers, auth=(self.username, self.password))

        return res.json()

    def list_queues(self):
        """ curl -i -u guest:guest http://localhost:15672/api/queues return: list """
        queues = []
        for queue in self.call_api('queues'):
            element = {
                'vhost': queue['vhost'],
                'queue': queue['name']
            }
            queues.append(element)
            logger.info('get queue ' + queue['vhost'] + '/' + queue['name'])
        return queues

    def list_nodes(self):
        """ curl -i -u guest:guest http://localhost:15672/api/nodes return: list """
        nodes = []
        for node in self.call_api('nodes'):
            name = node['name'].split('@')[1]
            element = {
                'node': name,
                'node_type': node['type']
            }
            nodes.append(element)
            logger.info('get nodes ' + name + '/' + node['type'])
        return nodes

    def check_queue(self):
        """ check queue """
        for queue in self.call_api('queues'):
            self._get_queue_data(queue)
        return True

    def _get_queue_data(self, queue):
        """ get queue data """
        for item in ['memory', 'messages', 'messages_ready', 'messages_unacknowledged', 'consumers']:
            key = 'rabbitmq.queues[{0},queue_{1},{2}]'.format(queue['vhost'], item, queue['name'])
            value = queue.get(item, 0)
            logger.info('queue data: - %s %s' % (key, value))

        for item in ['deliver_get', 'publish']:
            key = 'rabbitmq.queues[{0},queue_message_stats_{1},{2}]'.format(queue['vhost'], item, queue['name'])
            value = queue.get('message_stats', {}).get(item, 0)
            logger.info('queue data: - %s %s' % (key, value))

    def check_aliveness(self):
        """ check alive """
        return self.call_api('aliveness-test/%2f')['status']

    def check_overview(self, item):
        """ check overview """
        if item in ['channels', 'connections', 'consumers', 'exchanges', 'queues']:
            return self.call_api('overview').get('object_totals').get(item, 0)
        elif item in ['messages', 'messages_ready', 'messages_unacknowledged']:
            return self.call_api('overview').get('queue_totals').get(item, 0)
        elif item == 'message_stats_deliver_get':
            return self.call_api('overview').get('message_stats', {}).get('deliver_get', 0)
        elif item == 'message_stats_publish':
            return self.call_api('overview').get('message_stats', {}).get('publish', 0)
        elif item == 'message_stats_ack':
            return self.call_api('overview').get('message_stats', {}).get('ack', 0)
        elif item == 'message_stats_redeliver':
            return self.call_api('overview').get('message_stats', {}).get('redeliver', 0)
        elif item == 'rabbitmq_version':
            return self.call_api('overview').get('rabbitmq_version', 'None')

    def check_server(self, item, node_name):
        """ check server """
        node_name = node_name.split('.')[0]
        for nodeData in self.call_api('nodes'):
            if node_name in nodeData['name']:
                return nodeData.get(item, 0)
        return 'Not Found'


def main():
    """ Command-line """
    choices = ['list_queues', 'list_nodes', 'queues', 'check_aliveness', 'overview', 'server']

    parser = optparse.OptionParser()
    parser.add_option('--username', help='RabbitMQ API username', default='guest')
    parser.add_option('--password', help='RabbitMQ API password', default='guest')
    parser.add_option('--host', help='RabbitMQ API host', default='127.0.0.1')
    parser.add_option('--port', help='RabbitMQ API port', type='int', default=15672)
    parser.add_option('--check', type='choice', choices=choices, help='Type of check')
    parser.add_option('--metric', help='Which metric to evaluate', default='')
    parser.add_option('--node', help='Which node to check (valid for --check=server)')
    (options, args) = parser.parse_args()

    if not options.check:
        parser.error('At least one check should be specified')

    logger.info('start running ...')

    api = RabbitMQMoniter(username=options.username, password=options.password, host=options.host, port=options.port)

    if options.check == 'list_queues':
        logger.info(json.dumps({'data': api.list_queues()}, indent=4, separators=(',', ':')))
    elif options.check == 'list_nodes':
        logger.info(json.dumps({'data': api.list_nodes()}, indent=4, separators=(',', ':')))
    elif options.check == 'queues':
        logger.info(api.check_queue())
    elif options.check == 'check_aliveness':
        logger.info(api.check_aliveness())
    elif options.check == 'overview':
        if not options.metric:
            parser.error('Missing required parameter: "metric"')
        else:
            if options.node:
                logger.info(api.check_overview(options.metric))
            else:
                logger.info(api.check_overview(options.metric))
    elif options.check == 'server':
        if not options.metric:
            parser.error('Missing required parameter: "metric"')
        else:
            if options.node:
                logger.info(api.check_server(options.metric, options.node))
            else:
                logger.info(api.check_server(options.metric, api.host))


if __name__ == '__main__':
    main()
複製代碼

調用及返回:

python3 rabbitmq_status.py --check list_queues

# 2020-04-12 14:33:15,298 - rabbitmq_status.py[142] - INFO: start running ...
# 2020-04-12 14:33:15,298 - rabbitmq_status.py[26] - INFO: call rabbit api to get data on queues
# 2020-04-12 14:33:15,312 - rabbitmq_status.py[46] - INFO: get queue //task_queue
# 2020-04-12 14:33:15,312 - rabbitmq_status.py[147] - INFO: {
# "data":[
# {
# "vhost":"/",
# "queue":"task_queue"
# }
# ]
# }
複製代碼

經過對返回結果進行解析,就能夠判斷 RabbitMQ 的總體運行狀態,若是發生超閾值的狀況,能夠發送告警或郵件,來達到監控的效果。

針對隊列積壓狀況的監控判斷,有兩種方式:一是設置隊列積壓長度閾值,若是超過閾值即告警;二是保存最近五次的積壓長度,若是積壓逐漸增加並超閾值,即告警。

第二種方式更好,判斷更加精準,誤告可能性小,但實現起來也更復雜。

這裏只是提一個思路,等後續再把實踐結果和代碼分享出來。或者你們有哪些更好的方法嗎?歡迎留言交流。

源碼地址:

github.com/yongxinz/te…

參考文檔:

www.rabbitmq.com/monitoring.…

blog.51cto.com/john88wang/…

相關文章
相關標籤/搜索