RabbitMQ監控(二):使用REST API來檢測

#RabbitMQ 監控(二)html

  經過測試RabbitMQ是否可以接收新的請求和構造AMQP信道,能夠用來驗證RabbitMQ服務器是否健康。接下來,咱們將檢測消息通訊的整個過程,向RabbitMQ發佈消息而後消費該消息,來驗證消息被正確地路由了。
  雖然能夠經過簡單的擴展AMQP健康檢測程序來對路由過程進行完整的測試,可是檢測程序會所以增添額外的複雜性。由於它須要建立隊列,並確保若是健康檢測程序沒有完成的話消息不會創建。這裏有其餘選擇,隨RabbitMQ Management插件一同發佈的REST API的特性之一,就是一個能夠內部檢測RabbitMQ服務器健康狀態的API。aliveness-test,使用三個步驟來驗證RabbitMQ服務器是否健康:
  
  * 建立一個隊列來接收測試消息java

  * 用隊列名稱做爲消息路由鍵,將消息發往默認交換器git

  * 當消息到達隊列的時候就消費該消息,不然就報錯github

  因爲檢測程序(aliveness-test)運行在Erlang虛擬機內部,所以它不會受到網絡問題的影響。若是在虛擬機外部的話,網絡問題可能會阻止你鏈接到RabbitMQ的端口(5672)。所以,最好是結合RabbitMQ 監控(一)構建的AMQP健康檢測和基於API的健康檢測兩種方式,能夠確保對RabbitMQ服務器的全方位監控。特別須要注意的是aliveness-test API檢測的過程不會刪除建立的隊列,這意味着你的健康檢測程序能夠以很是短的週期重複運行。
  那麼如何使用aliveness-test API來編寫健康檢測程序呢?RabbitMQ自帶的Management Plugin提供了一些REST API,在RabbitMQ Management頁面能夠看到latest HTTP API documentation here的連接,點擊能夠查看這些API。目前最新的是RabbitMQ Management HTTP API
  
  在上面的API頁面能夠看到關於aliveness-testAPI的描述:spring

GET request
Path:/api/aliveness-test/vhost
Description:
Declares a test queue, then publishes and consumes a message. Intended for use by monitoring tools. If everything is working correctly, will return HTTP status 200 with body:
{"status":"ok"}
Note: the test queue will not be deleted (to to prevent queue churn 
if this is repeatedly pinged).

  使用curl測試一下該API,這裏的/%2F表明默認的vhost(/)api

curl -u guest:guest http://127.0.0.1:15672/api/aliveness-test/%2F
response:{"status":"ok"}

  經測試API可用,所以咱們如今要作的就是封裝RabbitMQ Management HTTP API的方法。在這個DEMO中,我使用的是Jersey Client。須要看完整代碼的請點擊個人github服務器

###清單2.1 針對RabbitMQ的基於REST API的健康檢測程序網絡

1.定義須要調用的接口 RMQResource.javamybatis

@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {

    [@GET](https://my.oschina.net/get)
    @Path("aliveness-test/{vhost}")
    Response testAliveness(@PathParam("vhost") String vhost);
    
}

2.aliveness-test的response CheckResp.javaapp

/**
 * aliveness-test接口的返回值
 */
public class CheckResp {

    private String status;

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "CheckResp{" +
                "status='" + status + '\'' +
                '}';
    }
}

3.封裝RabbitMQ的API RMQAPI.java

/**
 * RabbitMQ的REST API
 */
public class RMQApi {

    private final static Logger log = LoggerFactory.getLogger(RMQApi.class);

    private static Map<Class<?>, Object> restInstance = new HashMap<>();

    static {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String rmqUrl = config.getRmqUrl();
        String username = config.getUsername();
        String password = config.getPassword();
		
		//調用RabbitMQ Management HTTP API須要帶上驗證信息
        String authorization = "Basic " + Base64Util.base64Encode((username + ":" + password).getBytes());

        log.info("RabbitMQ monitor REST url {}", rmqUrl);

        ClientConfig clientConfig = new ClientConfig();
        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(10);
        connectionManager.setDefaultMaxPerRoute(10);
        connectionManager.setValidateAfterInactivity(60000);

        clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);



        WebTarget target = ClientBuilder.newClient(clientConfig)
                .register(JacksonFeature.class)
                .register(new JacksonJsonProvider(objectMapper))
                .target(rmqUrl);

        restInstance.put(RMQResource.class, bindService(RMQResource.class, target, authorization));
    }

    private static <T> T bindService(Class<T> clazz, WebTarget target, String authorization) {
        MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
        headers.put("Authorization", Arrays.asList((Object) authorization));
        return WebResourceFactory.newResource(clazz, target, false, headers, new ArrayList<Cookie>(), new Form());
    }

    @SuppressWarnings("unchecked")
    public static <T> T getService(Class<T> tClass) {
        return (T) restInstance.get(tClass);
    }
}

4.檢測RabbitMQ狀態 APIPingCheck.java

/**
 * 基於RabbitMQ的REST API的檢測
 */
public class APIPingCheck {

    private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class);

    private final static Logger log = LoggerFactory.getLogger(APIPingCheck.class);

    public static void checkAPIPing(String vhost) {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String host = config.getHost();
        Response response = null;
        try {
            response = rmqResource.testAliveness(vhost);
        } catch (Exception e) {
            log.error("CRITICAL: Could not connect to {}, cause {}", host, e.getMessage());
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        }
        if (response == null || response.getStatus() > 299) {
            log.error("CRITICAL: Broker not alive : {}", response);
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        } else {
            log.info("OK: Broker alive: {}", response.readEntity(CheckResp.class));
            ExitUtil.exit(ExitType.OK.getValue());
        }
    }
}

5.運行檢測程序

@Test
public void alivenessTest() {
    String vhost = "/";
    System.out.println(rmqResource.testAliveness(vhost));
}

能夠看到監控程序正常運行

10:43:54.516 [main] INFO com.lanxiang.rabbitmqmonitor.check.APIPingCheck - OK: Broker alive: CheckResp{status='ok'}
10:43:54.517 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK

如今嘗試將RabbitMQ關掉,再運行監控程序

rabbitmqctl stop_app

能夠看到程序報錯,沒法鏈接到RabbitMQ

14:48:06.105 [main] ERROR com.lanxiang.rabbitmqmonitor.check.APIPingCheck - CRITICAL: Could not connect to 127.0.0.1, cause java.net.ConnectException: Connection refused
14:48:06.106 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is CRITICAL

如今你已經能夠監控RabbitMQ是否能接收鏈接,同時也能檢測它是否能成功路由消息。可是若是有人將隊列的持久化屬性修改成非持久化,致使消息更容易丟失的話,該如何保護RabbitMQ配置免遭危險的修改?

##下一章將編寫一個監控隊列(或者交換器)配置的健康檢測程序。

相關文章
相關標籤/搜索