#RabbitMQ 監控(三)html
驗證RabbitMQ健康運行只是確保消息通訊架構可靠性的一部分,同時,你也須要確保消息通訊結構配置沒有遭受意外修改,從而避免應用消息丟失。
RabbitMQ Management HTTP API提供了一個方法容許你查看任何vhost上的任何隊列:/api/queues/<vhost>/<queue>。你不只能夠查看配置詳情,還能夠查看隊列的數據統計,例如隊列消耗的內存,或者隊列的平均消息吞吐量。使用curl測試一下該API,這裏的/%2F仍是表明默認的vhost(/)。java
curl -u guest:guest http://127.0.0.1:15672/api/queues/%2F/springrabbitexercise response { "consumer_details": [ { "channel_details": { "peer_host": "127.0.0.1", "peer_port": 62679, "connection_name": "127.0.0.1:62679 -> 127.0.0.1:5672", "user": "guest", "number": 2, "node": "rabbit@localhost", "name": "127.0.0.1:62679 -> 127.0.0.1:5672 (2)" }, "arguments": [], "prefetch_count": 1, "ack_required": true, "exclusive": false, "consumer_tag": "amq.ctag-YImeU8Fm_VahDpxv8EAw2Q", "queue": { "vhost": "/", "name": "springrabbitexercise" } } ], "messages_details": { "rate": 7357 }, "messages": 232517, "messages_unacknowledged_details": { "rate": 0.2 }, "messages_unacknowledged": 5, "messages_ready_details": { "rate": 7356.8 }, "messages_ready": 232512, "reductions_details": { "rate": 1861021.8 }, "reductions": 58754154, ... "auto_delete": false, "durable": true, "vhost": "/", "name": "springrabbitexercise", "message_bytes_persistent": 2220250, "message_bytes_ram": 2220250, "message_bytes_unacknowledged": 40, "message_bytes_ready": 2220210, "message_bytes": 2220250, "messages_persistent": 232517, "messages_unacknowledged_ram": 5, "messages_ready_ram": 232512, "messages_ram": 232517, "garbage_collection": { "minor_gcs": 0, "fullsweep_after": 65535, "min_heap_size": 233, "min_bin_vheap_size": 46422, "max_heap_size": 0 }, "state": "running" }
爲了方便閱讀,去掉了部分返回值,可是仍是能夠看到隊列的不少信息。例如能夠看到一個consumer的信息、消息佔用的內存、隊列的durable、auto_delete屬性等。利用這些配置信息,新的健康監控程序能夠經過API方法的輸出來輕鬆監控隊列的屬性,並在發生變動時通知你。
就像以前編寫健康檢測程序那樣,除了服務器、端口、vhost、用戶名和密碼以外,還須要知道:node
* 隊列的名稱,以便監控其配置git
* 該隊列是否將durable和auto_delete選項打開github
###清單3.1 檢測隊列配置spring
完整代碼在個人github,下面代碼中的@Data和@Slf4j都是插件lombok中的註解,想要了解的可自行百度。api
1.定義查看隊列信息的接口 RMQResource.java服務器
@Path("api") @Consumes({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_JSON}) public interface RMQResource { /** * Return a queue`s info * * @param vhost * @param name * @return {@link QueueInfo} */ @GET @Path("queues/{vhost}/{name}") Response getQueueInfo(@PathParam("vhost") String vhost, @PathParam("name") String name); }
2.定義查看隊列接口的返回值 QueueInfo.javamybatis
@Data public class QueueInfo { private ConsumerDetails[] consumer_details; /** * unknown class */ @JsonIgnore private Object[] incoming; /** * unknown class */ @JsonIgnore private Object[] deliveries; /** * unknown class */ @JsonIgnore private Object arguments; private Boolean exclusive; //... private Boolean auto_delete; private Boolean durable; private String vhost; private String name; /** * unknown class */ @JsonIgnore private Object head_message_timestamp; /** * unknown class */ @JsonIgnore private Object recoverable_slaves; private Long memory; private Double consumer_utilisation; private Integer consumers; /** * unknown class */ @JsonIgnore private Object exclusive_consumer_tag; /** * unknown class */ @JsonIgnore private Object policy; @JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss") private Date idle_since; }
3.檢測隊列配置 QueueConfigCheck.java架構
/** * 檢測隊列配置 */ @Slf4j public class QueueConfigCheck { private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class); public static void checkQueueConfig(String vhost, CheckQueue queue) { RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig(); String host = config.getHost(); Response response = null; try { response = rmqResource.getQueueInfo(vhost, queue.getQueue_name()); } catch (Exception e) { log.error("UNKNOWN: Could not connect to {}, cause {}", host, e.getMessage()); ExitUtil.exit(ExitType.UNKNOWN.getValue()); } if (response == null || response.getStatus() == 404) { log.error("CRITICAL: Queue {} does not exist.", queue.getQueue_name()); ExitUtil.exit(ExitType.CRITICAL.getValue()); } else if (response.getStatus() > 299) { log.error("UNKNOWN: Unexpected API error : {}", response); ExitUtil.exit(ExitType.UNKNOWN.getValue()); } else { QueueInfo info = response.readEntity(QueueInfo.class); if (!info.getAuto_delete().equals(queue.getAuto_delete())) { log.warn("WARN: Queue {} - auto_delete flag is NOT {}", queue.getQueue_name(), info.getAuto_delete()); ExitUtil.exit(ExitType.WARN.getValue()); } if (!info.getDurable().equals(queue.getDurable())) { log.warn("WARN: Queue {} - durable flag is NOT {}", queue.getQueue_name(), info.getDurable()); ExitUtil.exit(ExitType.WARN.getValue()); } } log.info("OK: Queue {} configured correctly.", queue.getQueue_name()); ExitUtil.exit(ExitType.OK.getValue()); } }
4.檢測隊列配置的方法參數 CheckQueue.java @Data public class CheckQueue {
private final String queue_name; private final Boolean auto_delete; private final Boolean durable; public CheckQueue(String queue_name, Boolean auto_delete, Boolean durable) { this.queue_name = queue_name; this.auto_delete = auto_delete; this.durable = durable; } }
5.運行檢測程序
@Test public void testQueueConfig() { String queue_name = "springrabbitexercise"; Boolean auto_delete = false; Boolean durable = true; String vhost = "/"; CheckQueue queue = new CheckQueue(queue_name, auto_delete, durable); QueueConfigCheck.checkQueueConfig(vhost, queue); }
能夠看到監控正常運行:
11:38:23.286 [main] INFO com.lanxiang.rabbitmqmonitor.check.QueueConfigCheck - OK: Queue springrabbitexercise configured correctly.
11:38:23.289 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK
這段RabbitMQ隊列檢測的程序有一處修改,若是健康檢測程序沒法鏈接到API服務器的話,會返回EXIT_UNKNOWN。前一章的API ping健康檢測要麼成功要麼失敗,故障代碼之間沒有區別,可是隊列檢測API方法在失敗時經過HTTP狀態碼提供了更多信息。若是HTTP狀態碼是404就表明嘗試驗證的隊列不存在,檢測失敗並返回EXIT_CRITICAL。對於其餘大於299的HTTP狀態碼,退出代碼爲EXIT_UNKNOWN。
在獲取到RabbitMQ API的response以後,使用JSON進行解碼,而且把獲得的durable和auto_delete參數與指望的參數進行比較,若是參數和預期不相符的話,返回EXIT_WARNING或者EXIT_CRITICAL狀態碼。若是隊列全部的配置都正確的話,那麼就正確退出。
在瞭解咱們對RabbitMQ作監控的原理以後,能夠根據RabbitMQ Management HTTP API定製更多的監控,例如:
* /api/nodes,能夠獲取集羣中每一個節點的數據
* /api/queues/<vhost>/<queue>,能夠獲取隊列的詳細狀況,例如消息處理的速率、積壓的消息數量等。
除此以外還有許多其餘API,咱們要作的就是根據自身的業務邏輯和這些API來設計合理的監控腳本。RabbitMQ監控系列就到此結束啦,仍是很惋惜沒有實戰的機會吧,由於最近在工做變更期間,看了一下RabbitMQ實戰這本書,興起想寫一下博客試試。 畢業快一年了,想養成寫博客的習慣。正好最近也在工做變更中,能有閒暇時間嘗試一下,博客寫的比較水,多多包涵。