#RabbitMQ 监控(二)
通过测试RabbitMQ是否能够接收新的请求和构造AMQP信道,可以用来验证RabbitMQ服务器是否健康。接下来,我们将检测消息通信的整个过程,向RabbitMQ发布消息然后消费该消息,来验证消息被正确地路由了。
虽然可以通过简单的扩展AMQP健康检测程序来对路由过程进行完整的测试,但是检测程序会因此增添额外的复杂性。因为它需要创建队列,并确保如果健康检测程序没有完成的话消息不会建立。这里有其他选择,随RabbitMQ Management插件一同发布的REST API的特性之一,就是一个可以内部检测RabbitMQ服务器健康状态的API。aliveness-test,使用三个步骤来验证RabbitMQ服务器是否健康:
* 创建一个队列来接收测试消息
* 用队列名称作为消息路由键,将消息发往默认交换器
* 当消息到达队列的时候就消费该消息,否则就报错
由于检测程序(aliveness-test)运行在Erlang虚拟机内部,因此它不会受到网络问题的影响。如果在虚拟机外部的话,网络问题可能会阻止你连接到RabbitMQ的端口(5672)。因此,最好是结合RabbitMQ 监控(一)构建的AMQP健康检测和基于API的健康检测两种方式,可以确保对RabbitMQ服务器的全方位监控。特别需要注意的是aliveness-test API检测的过程不会删除创建的队列,这意味着你的健康检测程序可以以非常短的周期重复运行。
那么如何使用aliveness-test API来编写健康检测程序呢?RabbitMQ自带的Management Plugin提供了一些REST API,在RabbitMQ Management页面可以看到latest HTTP API documentation here的链接,点击可以查看这些API。目前最新的是RabbitMQ Management HTTP API。
在上面的API页面可以看到关于aliveness-testAPI的描述:
GET request
Path:/api/aliveness-test/vhost
Description:
Declares a test queue, then publishes and consumes a message. Intended for use by monitoring tools. If everything is working correctly, will return HTTP status 200 with body:
{"status":"ok"}
Note: the test queue will not be deleted (to to prevent queue churn
if this is repeatedly pinged).
使用curl测试一下该API,这里的/%2F代表默认的vhost(/)
curl -u guest:guest http://127.0.0.1:15672/api/aliveness-test/%2F
response:{"status":"ok"}
经测试API可用,因此我们现在要做的就是封装RabbitMQ Management HTTP API的方法。在这个DEMO中,我使用的是Jersey Client。需要看完整代码的请点击我的github。
###清单2.1 针对RabbitMQ的基于REST API的健康检测程序
1.定义需要调用的接口 RMQResource.java
@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {
[@GET](https://my.oschina.net/get)
@Path("aliveness-test/{vhost}")
Response testAliveness(@PathParam("vhost") String vhost);
}
2.aliveness-test的response CheckResp.java
/**
* aliveness-test接口的返回值
*/
public class CheckResp {
private String status;
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public String toString() {
return "CheckResp{" +
"status='" + status + '\'' +
'}';
}
}
3.封装RabbitMQ的API RMQAPI.java
/**
* RabbitMQ的REST API
*/
public class RMQApi {
private final static Logger log = LoggerFactory.getLogger(RMQApi.class);
private static Map<Class<?>, Object> restInstance = new HashMap<>();
static {
RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
String rmqUrl = config.getRmqUrl();
String username = config.getUsername();
String password = config.getPassword();
//调用RabbitMQ Management HTTP API需要带上验证信息
String authorization = "Basic " + Base64Util.base64Encode((username + ":" + password).getBytes());
log.info("RabbitMQ monitor REST url {}", rmqUrl);
ClientConfig clientConfig = new ClientConfig();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(10);
connectionManager.setDefaultMaxPerRoute(10);
connectionManager.setValidateAfterInactivity(60000);
clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
WebTarget target = ClientBuilder.newClient(clientConfig)
.register(JacksonFeature.class)
.register(new JacksonJsonProvider(objectMapper))
.target(rmqUrl);
restInstance.put(RMQResource.class, bindService(RMQResource.class, target, authorization));
}
private static <T> T bindService(Class<T> clazz, WebTarget target, String authorization) {
MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
headers.put("Authorization", Arrays.asList((Object) authorization));
return WebResourceFactory.newResource(clazz, target, false, headers, new ArrayList<Cookie>(), new Form());
}
@SuppressWarnings("unchecked")
public static <T> T getService(Class<T> tClass) {
return (T) restInstance.get(tClass);
}
}
4.检测RabbitMQ状态 APIPingCheck.java
/**
* 基于RabbitMQ的REST API的检测
*/
public class APIPingCheck {
private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class);
private final static Logger log = LoggerFactory.getLogger(APIPingCheck.class);
public static void checkAPIPing(String vhost) {
RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
String host = config.getHost();
Response response = null;
try {
response = rmqResource.testAliveness(vhost);
} catch (Exception e) {
log.error("CRITICAL: Could not connect to {}, cause {}", host, e.getMessage());
ExitUtil.exit(ExitType.CRITICAL.getValue());
}
if (response == null || response.getStatus() > 299) {
log.error("CRITICAL: Broker not alive : {}", response);
ExitUtil.exit(ExitType.CRITICAL.getValue());
} else {
log.info("OK: Broker alive: {}", response.readEntity(CheckResp.class));
ExitUtil.exit(ExitType.OK.getValue());
}
}
}
5.运行检测程序
@Test
public void alivenessTest() {
String vhost = "/";
System.out.println(rmqResource.testAliveness(vhost));
}
可以看到监控程序正常运行
10:43:54.516 [main] INFO com.lanxiang.rabbitmqmonitor.check.APIPingCheck - OK: Broker alive: CheckResp{status='ok'}
10:43:54.517 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK
现在尝试将RabbitMQ关掉,再运行监控程序
rabbitmqctl stop_app
可以看到程序报错,无法连接到RabbitMQ
14:48:06.105 [main] ERROR com.lanxiang.rabbitmqmonitor.check.APIPingCheck - CRITICAL: Could not connect to 127.0.0.1, cause java.net.ConnectException: Connection refused
14:48:06.106 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is CRITICAL
现在你已经可以监控RabbitMQ是否能接收连接,同时也能检测它是否能成功路由消息。但是如果有人将队列的持久化属性修改为非持久化,导致消息更容易丢失的话,该如何保护RabbitMQ配置免遭危险的修改?
##下一章将编写一个监控队列(或者交换器)配置的健康检测程序。