#RabbitMQ 監控(二)html
經過測試RabbitMQ是否可以接收新的請求和構造AMQP信道,能夠用來驗證RabbitMQ服務器是否健康。接下來,咱們將檢測消息通訊的整個過程,向RabbitMQ發佈消息而後消費該消息,來驗證消息被正確地路由了。
雖然能夠經過簡單的擴展AMQP健康檢測程序來對路由過程進行完整的測試,可是檢測程序會所以增添額外的複雜性。由於它須要建立隊列,並確保若是健康檢測程序沒有完成的話消息不會創建。這裏有其餘選擇,隨RabbitMQ Management插件一同發佈的REST API的特性之一,就是一個能夠內部檢測RabbitMQ服務器健康狀態的API。aliveness-test,使用三個步驟來驗證RabbitMQ服務器是否健康:
* 建立一個隊列來接收測試消息java
* 用隊列名稱做爲消息路由鍵,將消息發往默認交換器git
* 當消息到達隊列的時候就消費該消息,不然就報錯github
因爲檢測程序(aliveness-test)運行在Erlang虛擬機內部,所以它不會受到網絡問題的影響。若是在虛擬機外部的話,網絡問題可能會阻止你鏈接到RabbitMQ的端口(5672)。所以,最好是結合RabbitMQ 監控(一)構建的AMQP健康檢測和基於API的健康檢測兩種方式,能夠確保對RabbitMQ服務器的全方位監控。特別須要注意的是aliveness-test API檢測的過程不會刪除建立的隊列,這意味着你的健康檢測程序能夠以很是短的週期重複運行。
那麼如何使用aliveness-test API來編寫健康檢測程序呢?RabbitMQ自帶的Management Plugin提供了一些REST API,在RabbitMQ Management頁面能夠看到latest HTTP API documentation here的連接,點擊能夠查看這些API。目前最新的是RabbitMQ Management HTTP API。
在上面的API頁面能夠看到關於aliveness-testAPI的描述:spring
GET request Path:/api/aliveness-test/vhost Description: Declares a test queue, then publishes and consumes a message. Intended for use by monitoring tools. If everything is working correctly, will return HTTP status 200 with body: {"status":"ok"} Note: the test queue will not be deleted (to to prevent queue churn if this is repeatedly pinged).
使用curl測試一下該API,這裏的/%2F表明默認的vhost(/)api
curl -u guest:guest http://127.0.0.1:15672/api/aliveness-test/%2F response:{"status":"ok"}
經測試API可用,所以咱們如今要作的就是封裝RabbitMQ Management HTTP API的方法。在這個DEMO中,我使用的是Jersey Client。須要看完整代碼的請點擊個人github。服務器
###清單2.1 針對RabbitMQ的基於REST API的健康檢測程序網絡
1.定義須要調用的接口 RMQResource.javamybatis
@Path("api") @Consumes({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_JSON}) public interface RMQResource { [@GET](https://my.oschina.net/get) @Path("aliveness-test/{vhost}") Response testAliveness(@PathParam("vhost") String vhost); }
2.aliveness-test的response CheckResp.javaapp
/** * aliveness-test接口的返回值 */ public class CheckResp { private String status; public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public String toString() { return "CheckResp{" + "status='" + status + '\'' + '}'; } }
3.封裝RabbitMQ的API RMQAPI.java
/** * RabbitMQ的REST API */ public class RMQApi { private final static Logger log = LoggerFactory.getLogger(RMQApi.class); private static Map<Class<?>, Object> restInstance = new HashMap<>(); static { RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig(); String rmqUrl = config.getRmqUrl(); String username = config.getUsername(); String password = config.getPassword(); //調用RabbitMQ Management HTTP API須要帶上驗證信息 String authorization = "Basic " + Base64Util.base64Encode((username + ":" + password).getBytes()); log.info("RabbitMQ monitor REST url {}", rmqUrl); ClientConfig clientConfig = new ClientConfig(); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(10); connectionManager.setDefaultMaxPerRoute(10); connectionManager.setValidateAfterInactivity(60000); clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); WebTarget target = ClientBuilder.newClient(clientConfig) .register(JacksonFeature.class) .register(new JacksonJsonProvider(objectMapper)) .target(rmqUrl); restInstance.put(RMQResource.class, bindService(RMQResource.class, target, authorization)); } private static <T> T bindService(Class<T> clazz, WebTarget target, String authorization) { MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>(); headers.put("Authorization", Arrays.asList((Object) authorization)); return WebResourceFactory.newResource(clazz, target, false, headers, new ArrayList<Cookie>(), new Form()); } @SuppressWarnings("unchecked") public static <T> T getService(Class<T> tClass) { return (T) restInstance.get(tClass); } }
4.檢測RabbitMQ狀態 APIPingCheck.java
/** * 基於RabbitMQ的REST API的檢測 */ public class APIPingCheck { private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class); private final static Logger log = LoggerFactory.getLogger(APIPingCheck.class); public static void checkAPIPing(String vhost) { RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig(); String host = config.getHost(); Response response = null; try { response = rmqResource.testAliveness(vhost); } catch (Exception e) { log.error("CRITICAL: Could not connect to {}, cause {}", host, e.getMessage()); ExitUtil.exit(ExitType.CRITICAL.getValue()); } if (response == null || response.getStatus() > 299) { log.error("CRITICAL: Broker not alive : {}", response); ExitUtil.exit(ExitType.CRITICAL.getValue()); } else { log.info("OK: Broker alive: {}", response.readEntity(CheckResp.class)); ExitUtil.exit(ExitType.OK.getValue()); } } }
5.運行檢測程序
@Test public void alivenessTest() { String vhost = "/"; System.out.println(rmqResource.testAliveness(vhost)); }
能夠看到監控程序正常運行
10:43:54.516 [main] INFO com.lanxiang.rabbitmqmonitor.check.APIPingCheck - OK: Broker alive: CheckResp{status='ok'}
10:43:54.517 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK
如今嘗試將RabbitMQ關掉,再運行監控程序
rabbitmqctl stop_app
能夠看到程序報錯,沒法鏈接到RabbitMQ
14:48:06.105 [main] ERROR com.lanxiang.rabbitmqmonitor.check.APIPingCheck - CRITICAL: Could not connect to 127.0.0.1, cause java.net.ConnectException: Connection refused
14:48:06.106 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is CRITICAL
如今你已經能夠監控RabbitMQ是否能接收鏈接,同時也能檢測它是否能成功路由消息。可是若是有人將隊列的持久化屬性修改成非持久化,致使消息更容易丟失的話,該如何保護RabbitMQ配置免遭危險的修改?
##下一章將編寫一個監控隊列(或者交換器)配置的健康檢測程序。