技術博客:github.com/yongxinz/te…html
同時,也歡迎關注個人微信公衆號 AlwaysBeta,更多精彩內容等你來。 前端

上兩篇文章介紹了:node
接下來講說監控的相關內容。python
監控仍是很是重要的,特別是在生產環境。磁盤滿了,隊列積壓嚴重,若是咱們還不知道,老闆確定會懷疑,莫不是這傢伙要跑路?git
並且我如今就遇到了這樣的狀況,主要是隊列積壓的問題。因爲量不是很大,因此磁盤空間倒不是很擔憂,但有時程序執行會報錯,致使隊列一直消費不下去,這就很讓人尷尬了。程序員
查了一些資料,總結了一下。想要了解 RabbitMQ 的運行狀態,主要有三種途徑:Management UI,rabbitmqctl 命令和 REST API。github
Management UI

RabbitMQ 給咱們提供了豐富的 Web 管理功能,經過頁面,咱們能看到 RabbitMQ 的總體運行情況,交換機和隊列的狀態等,還能夠進行人員管理和權限配置,至關全面。json
但若是想經過頁面來監控,那出不出問題只能靠緣分。看到出問題了,是運氣好,看不到出問題,那是必然。後端
這也是我當前的現狀,因此爲了不出現大問題,得趕忙改變一下。api
備註:經過 http://127.0.0.1:15672 來訪問 Web 頁面,默認狀況下用戶名和密碼都是 guest,但生產環境下都應該改掉的。
rabbitmqctl 命令
與前端頁面對應的就是後端的命令行命令了,一樣很是豐富。平時本身測試,或者臨時查看一些狀態時,也能用得上。但就我我的使用感受來講,用的並非不少。
我總結一些還算經常使用的,列在下面,你們各取所需:
# 啓動服務 rabbitmq-server # 中止服務 rabbitmqctl stop # vhost 增刪查 rabbitmqctl add_vhost rabbitmqctl delete_vhost rabbitmqctl list_vhosts # 查詢交換機 rabbitmqctl list_exchanges # 查詢隊列 rabbitmqctl list_queues # 查看消費者信息 rabbitmqctl list_consumers # user 增刪查 rabbitmqctl add_user rabbitmqctl delete_user rabbitmqctl list_users 複製代碼
REST API
終於來到重點了,對於程序員來講,看到有現成的 API 能夠調用,那真是太幸福了。
自動化監控和一些須要批量的操做,經過調用 API 來實現是最好的方式。好比有一些須要初始化的用戶和權限,就能夠經過腳原本一鍵完成,而不是經過頁面逐個添加,簡單又快捷。
下面是一些經常使用的 API:
# 歸納信息 curl -i -u guest:guest http://localhost:15672/api/overview # vhost 列表 curl -i -u guest:guest http://localhost:15672/api/vhosts # channel 列表 curl -i -u guest:guest http://localhost:15672/api/channels # 節點信息 curl -i -u guest:guest http://localhost:15672/api/nodes # 交換機信息 curl -i -u guest:guest http://localhost:15672/api/exchanges # 隊列信息 curl -i -u guest:guest http://localhost:15672/api/queues 複製代碼
就我如今遇到的狀況來講,overview
和 queues
這兩個 API 就能夠知足個人需求,你們也能夠根據本身項目的實際狀況來選擇。
API 返回內容是 json,並且字段仍是挺多的,剛開始看會感受一臉懵,具體含義對照官網的解釋和實際狀況來慢慢琢磨,弄懂也不是很困難。
下面代碼包含了 API 請求以及返回結果的解析,能夠在測試環境下執行,稍加更改就能夠應用到生產環境。
import json import logging import optparse import requests logging.basicConfig( format='%(asctime)s - %(pathname)s[%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) class RabbitMQMoniter(object): """ RabbitMQ Management API """ def __init__(self, host='', port=15672, username='guest', password='guest'): self.host = host self.port = port self.username = username self.password = password def call_api(self, path): logger.info('call rabbit api to get data on ' + path) headers = {'content-type': 'application/json'} url = '{0}://{1}:{2}/api/{3}'.format('http', self.host, self.port, path) res = requests.get(url, headers=headers, auth=(self.username, self.password)) return res.json() def list_queues(self): """ curl -i -u guest:guest http://localhost:15672/api/queues return: list """ queues = [] for queue in self.call_api('queues'): element = { 'vhost': queue['vhost'], 'queue': queue['name'] } queues.append(element) logger.info('get queue ' + queue['vhost'] + '/' + queue['name']) return queues def list_nodes(self): """ curl -i -u guest:guest http://localhost:15672/api/nodes return: list """ nodes = [] for node in self.call_api('nodes'): name = node['name'].split('@')[1] element = { 'node': name, 'node_type': node['type'] } nodes.append(element) logger.info('get nodes ' + name + '/' + node['type']) return nodes def check_queue(self): """ check queue """ for queue in self.call_api('queues'): self._get_queue_data(queue) return True def _get_queue_data(self, queue): """ get queue data """ for item in ['memory', 'messages', 'messages_ready', 'messages_unacknowledged', 'consumers']: key = 'rabbitmq.queues[{0},queue_{1},{2}]'.format(queue['vhost'], item, queue['name']) value = queue.get(item, 0) logger.info('queue data: - %s %s' % (key, value)) for item in ['deliver_get', 'publish']: key = 'rabbitmq.queues[{0},queue_message_stats_{1},{2}]'.format(queue['vhost'], item, queue['name']) value = queue.get('message_stats', {}).get(item, 0) logger.info('queue data: - %s %s' % (key, value)) def check_aliveness(self): """ check alive """ return self.call_api('aliveness-test/%2f')['status'] def check_overview(self, item): """ check overview """ if item in ['channels', 'connections', 'consumers', 'exchanges', 'queues']: return self.call_api('overview').get('object_totals').get(item, 0) elif item in ['messages', 'messages_ready', 'messages_unacknowledged']: return self.call_api('overview').get('queue_totals').get(item, 0) elif item == 'message_stats_deliver_get': return self.call_api('overview').get('message_stats', {}).get('deliver_get', 0) elif item == 'message_stats_publish': return self.call_api('overview').get('message_stats', {}).get('publish', 0) elif item == 'message_stats_ack': return self.call_api('overview').get('message_stats', {}).get('ack', 0) elif item == 'message_stats_redeliver': return self.call_api('overview').get('message_stats', {}).get('redeliver', 0) elif item == 'rabbitmq_version': return self.call_api('overview').get('rabbitmq_version', 'None') def check_server(self, item, node_name): """ check server """ node_name = node_name.split('.')[0] for nodeData in self.call_api('nodes'): if node_name in nodeData['name']: return nodeData.get(item, 0) return 'Not Found' def main(): """ Command-line """ choices = ['list_queues', 'list_nodes', 'queues', 'check_aliveness', 'overview', 'server'] parser = optparse.OptionParser() parser.add_option('--username', help='RabbitMQ API username', default='guest') parser.add_option('--password', help='RabbitMQ API password', default='guest') parser.add_option('--host', help='RabbitMQ API host', default='127.0.0.1') parser.add_option('--port', help='RabbitMQ API port', type='int', default=15672) parser.add_option('--check', type='choice', choices=choices, help='Type of check') parser.add_option('--metric', help='Which metric to evaluate', default='') parser.add_option('--node', help='Which node to check (valid for --check=server)') (options, args) = parser.parse_args() if not options.check: parser.error('At least one check should be specified') logger.info('start running ...') api = RabbitMQMoniter(username=options.username, password=options.password, host=options.host, port=options.port) if options.check == 'list_queues': logger.info(json.dumps({'data': api.list_queues()}, indent=4, separators=(',', ':'))) elif options.check == 'list_nodes': logger.info(json.dumps({'data': api.list_nodes()}, indent=4, separators=(',', ':'))) elif options.check == 'queues': logger.info(api.check_queue()) elif options.check == 'check_aliveness': logger.info(api.check_aliveness()) elif options.check == 'overview': if not options.metric: parser.error('Missing required parameter: "metric"') else: if options.node: logger.info(api.check_overview(options.metric)) else: logger.info(api.check_overview(options.metric)) elif options.check == 'server': if not options.metric: parser.error('Missing required parameter: "metric"') else: if options.node: logger.info(api.check_server(options.metric, options.node)) else: logger.info(api.check_server(options.metric, api.host)) if __name__ == '__main__': main() 複製代碼
調用及返回:
python3 rabbitmq_status.py --check list_queues # 2020-04-12 14:33:15,298 - rabbitmq_status.py[142] - INFO: start running ... # 2020-04-12 14:33:15,298 - rabbitmq_status.py[26] - INFO: call rabbit api to get data on queues # 2020-04-12 14:33:15,312 - rabbitmq_status.py[46] - INFO: get queue //task_queue # 2020-04-12 14:33:15,312 - rabbitmq_status.py[147] - INFO: { # "data":[ # { # "vhost":"/", # "queue":"task_queue" # } # ] # } 複製代碼
經過對返回結果進行解析,就能夠判斷 RabbitMQ 的總體運行狀態,若是發生超閾值的狀況,能夠發送告警或郵件,來達到監控的效果。
針對隊列積壓狀況的監控判斷,有兩種方式:一是設置隊列積壓長度閾值,若是超過閾值即告警;二是保存最近五次的積壓長度,若是積壓逐漸增加並超閾值,即告警。
第二種方式更好,判斷更加精準,誤告可能性小,但實現起來也更復雜。
這裏只是提一個思路,等後續再把實踐結果和代碼分享出來。或者你們有哪些更好的方法嗎?歡迎留言交流。
源碼地址:
參考文檔: