#RabbitMQ 监控(二)

  通过测试RabbitMQ是否能够接收新的请求和构造AMQP信道,可以用来验证RabbitMQ服务器是否健康。接下来,我们将检测消息通信的整个过程,向RabbitMQ发布消息然后消费该消息,来验证消息被正确地路由了。
  虽然可以通过简单的扩展AMQP健康检测程序来对路由过程进行完整的测试,但是检测程序会因此增添额外的复杂性。因为它需要创建队列,并确保如果健康检测程序没有完成的话消息不会建立。这里有其他选择,随RabbitMQ Management插件一同发布的REST API的特性之一,就是一个可以内部检测RabbitMQ服务器健康状态的API。aliveness-test,使用三个步骤来验证RabbitMQ服务器是否健康:
  
  * 创建一个队列来接收测试消息

  * 用队列名称作为消息路由键,将消息发往默认交换器

  * 当消息到达队列的时候就消费该消息,否则就报错

  由于检测程序(aliveness-test)运行在Erlang虚拟机内部,因此它不会受到网络问题的影响。如果在虚拟机外部的话,网络问题可能会阻止你连接到RabbitMQ的端口(5672)。因此,最好是结合RabbitMQ 监控(一)构建的AMQP健康检测和基于API的健康检测两种方式,可以确保对RabbitMQ服务器的全方位监控。特别需要注意的是aliveness-test API检测的过程不会删除创建的队列,这意味着你的健康检测程序可以以非常短的周期重复运行。
  那么如何使用aliveness-test API来编写健康检测程序呢?RabbitMQ自带的Management Plugin提供了一些REST API,在RabbitMQ Management页面可以看到latest HTTP API documentation here的链接,点击可以查看这些API。目前最新的是RabbitMQ Management HTTP API
  
  在上面的API页面可以看到关于aliveness-testAPI的描述:

GET request
Path:/api/aliveness-test/vhost
Description:
Declares a test queue, then publishes and consumes a message. Intended for use by monitoring tools. If everything is working correctly, will return HTTP status 200 with body:
{"status":"ok"}
Note: the test queue will not be deleted (to to prevent queue churn
if this is repeatedly pinged).

  使用curl测试一下该API,这里的/%2F代表默认的vhost(/)

curl -u guest:guest http://127.0.0.1:15672/api/aliveness-test/%2F
response:{"status":"ok"}

  经测试API可用,因此我们现在要做的就是封装RabbitMQ Management HTTP API的方法。在这个DEMO中,我使用的是Jersey Client。需要看完整代码的请点击我的github

###清单2.1 针对RabbitMQ的基于REST API的健康检测程序

1.定义需要调用的接口 RMQResource.java

@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {

    [@GET](https://my.oschina.net/get)
    @Path("aliveness-test/{vhost}")
    Response testAliveness(@PathParam("vhost") String vhost);

}

2.aliveness-test的response CheckResp.java

/**
 * aliveness-test接口的返回值
 */
public class CheckResp {

    private String status;

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "CheckResp{" +
                "status='" + status + '\'' +
                '}';
    }
}

3.封装RabbitMQ的API RMQAPI.java

/**
 * RabbitMQ的REST API
 */
public class RMQApi {

    private final static Logger log = LoggerFactory.getLogger(RMQApi.class);

    private static Map<Class<?>, Object> restInstance = new HashMap<>();

    static {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String rmqUrl = config.getRmqUrl();
        String username = config.getUsername();
        String password = config.getPassword();

		//调用RabbitMQ Management HTTP API需要带上验证信息
        String authorization = "Basic " + Base64Util.base64Encode((username + ":" + password).getBytes());

        log.info("RabbitMQ monitor REST url {}", rmqUrl);

        ClientConfig clientConfig = new ClientConfig();
        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(10);
        connectionManager.setDefaultMaxPerRoute(10);
        connectionManager.setValidateAfterInactivity(60000);

        clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);



        WebTarget target = ClientBuilder.newClient(clientConfig)
                .register(JacksonFeature.class)
                .register(new JacksonJsonProvider(objectMapper))
                .target(rmqUrl);

        restInstance.put(RMQResource.class, bindService(RMQResource.class, target, authorization));
    }

    private static <T> T bindService(Class<T> clazz, WebTarget target, String authorization) {
        MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
        headers.put("Authorization", Arrays.asList((Object) authorization));
        return WebResourceFactory.newResource(clazz, target, false, headers, new ArrayList<Cookie>(), new Form());
    }

    @SuppressWarnings("unchecked")
    public static <T> T getService(Class<T> tClass) {
        return (T) restInstance.get(tClass);
    }
}

4.检测RabbitMQ状态 APIPingCheck.java

/**
 * 基于RabbitMQ的REST API的检测
 */
public class APIPingCheck {

    private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class);

    private final static Logger log = LoggerFactory.getLogger(APIPingCheck.class);

    public static void checkAPIPing(String vhost) {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String host = config.getHost();
        Response response = null;
        try {
            response = rmqResource.testAliveness(vhost);
        } catch (Exception e) {
            log.error("CRITICAL: Could not connect to {}, cause {}", host, e.getMessage());
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        }
        if (response == null || response.getStatus() > 299) {
            log.error("CRITICAL: Broker not alive : {}", response);
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        } else {
            log.info("OK: Broker alive: {}", response.readEntity(CheckResp.class));
            ExitUtil.exit(ExitType.OK.getValue());
        }
    }
}

5.运行检测程序

@Test
public void alivenessTest() {
    String vhost = "/";
    System.out.println(rmqResource.testAliveness(vhost));
}

可以看到监控程序正常运行

10:43:54.516 [main] INFO com.lanxiang.rabbitmqmonitor.check.APIPingCheck - OK: Broker alive: CheckResp{status='ok'}
10:43:54.517 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK

现在尝试将RabbitMQ关掉,再运行监控程序

rabbitmqctl stop_app

可以看到程序报错,无法连接到RabbitMQ

14:48:06.105 [main] ERROR com.lanxiang.rabbitmqmonitor.check.APIPingCheck - CRITICAL: Could not connect to 127.0.0.1, cause java.net.ConnectException: Connection refused
14:48:06.106 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is CRITICAL

现在你已经可以监控RabbitMQ是否能接收连接,同时也能检测它是否能成功路由消息。但是如果有人将队列的持久化属性修改为非持久化,导致消息更容易丢失的话,该如何保护RabbitMQ配置免遭危险的修改?

##下一章将编写一个监控队列(或者交换器)配置的健康检测程序。

03-17 15:15