#RabbitMQ 監控(三)
驗證RabbitMQ健康運行只是確保消息通信架構可靠性的一部分,同時,你也需要確保消息通信結構配置沒有遭受意外修改,從而避免應用消息丟失。
RabbitMQ Management HTTP API提供了一個方法允許你查看任何vhost上的任何隊列:/api/queues//。你不僅可以查看配置詳情,還可以查看隊列的數據統計,例如隊列消耗的內存,或者隊列的平均消息吞吐量。使用curl測試一下該API,這里的/%2F還是代表默認的vhost(/)。
curl -u guest:guest http://127.0.0.1:15672/api/queues/%2F/springrabbitexercise
response
{
"consumer_details": [
{
"channel_details": {
"peer_host": "127.0.0.1",
"peer_port": 62679,
"connection_name": "127.0.0.1:62679 -> 127.0.0.1:5672",
"user": "guest",
"number": 2,
"node": "rabbit@localhost",
"name": "127.0.0.1:62679 -> 127.0.0.1:5672 (2)"
},
"arguments": [],
"prefetch_count": 1,
"ack_required": true,
"exclusive": false,
"consumer_tag": "amq.ctag-YImeU8Fm_VahDpxv8EAw2Q",
"queue": {
"vhost": "/",
"name": "springrabbitexercise"
}
}
],
"messages_details": {
"rate": 7357
},
"messages": 232517,
"messages_unacknowledged_details": {
"rate": 0.2
},
"messages_unacknowledged": 5,
"messages_ready_details": {
"rate": 7356.8
},
"messages_ready": 232512,
"reductions_details": {
"rate": 1861021.8
},
"reductions": 58754154,
...
"auto_delete": false,
"durable": true,
"vhost": "/",
"name": "springrabbitexercise",
"message_bytes_persistent": 2220250,
"message_bytes_ram": 2220250,
"message_bytes_unacknowledged": 40,
"message_bytes_ready": 2220210,
"message_bytes": 2220250,
"messages_persistent": 232517,
"messages_unacknowledged_ram": 5,
"messages_ready_ram": 232512,
"messages_ram": 232517,
"garbage_collection": {
"minor_gcs": 0,
"fullsweep_after": 65535,
"min_heap_size": 233,
"min_bin_vheap_size": 46422,
"max_heap_size": 0
},
"state": "running"
}
為了方便閱讀,去掉了部分返回值,但是還是可以看到隊列的很多信息。例如可以看到一個consumer的信息、消息占用的內存、隊列的durable、auto_delete屬性等。利用這些配置信息,新的健康監控程序可以通過API方法的輸出來輕松監控隊列的屬性,并在發生變更時通知你。
就像之前編寫健康檢測程序那樣,除了服務器、端口、vhost、用戶名和密碼之外,還需要知道:
* 隊列的名稱,以便監控其配置
* 該隊列是否將durable和auto_delete選項打開
###清單3.1 檢測隊列配置
完整代碼在我的github,下面代碼中的@Data和@Slf4j都是插件lombok中的注解,想要了解的可自行百度。
1.定義查看隊列信息的接口 RMQResource.java
@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {
/**
* Return a queue`s info
*
* @param vhost
* @param name
* @return {@link QueueInfo}
*/
@GET
@Path("queues/{vhost}/{name}")
Response getQueueInfo(@PathParam("vhost") String vhost, @PathParam("name") String name);
}
2.定義查看隊列接口的返回值 QueueInfo.java
@Data
public class QueueInfo {
private ConsumerDetails[] consumer_details;
/**
* unknown class
*/
@JsonIgnore
private Object[] incoming;
/**
* unknown class
*/
@JsonIgnore
private Object[] deliveries;
/**
* unknown class
*/
@JsonIgnore
private Object arguments;
private Boolean exclusive;
//...
private Boolean auto_delete;
private Boolean durable;
private String vhost;
private String name;
/**
* unknown class
*/
@JsonIgnore
private Object head_message_timestamp;
/**
* unknown class
*/
@JsonIgnore
private Object recoverable_slaves;
private Long memory;
private Double consumer_utilisation;
private Integer consumers;
/**
* unknown class
*/
@JsonIgnore
private Object exclusive_consumer_tag;
/**
* unknown class
*/
@JsonIgnore
private Object policy;
@JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss")
private Date idle_since;
}
3.檢測隊列配置 QueueConfigCheck.java
/**
* 檢測隊列配置
*/
@Slf4j
public class QueueConfigCheck {
private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class);
public static void checkQueueConfig(String vhost, CheckQueue queue) {
RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
String host = config.getHost();
Response response = null;
try {
response = rmqResource.getQueueInfo(vhost, queue.getQueue_name());
} catch (Exception e) {
log.error("UNKNOWN: Could not connect to {}, cause {}", host, e.getMessage());
ExitUtil.exit(ExitType.UNKNOWN.getValue());
}
if (response == null || response.getStatus() == 404) {
log.error("CRITICAL: Queue {} does not exist.", queue.getQueue_name());
ExitUtil.exit(ExitType.CRITICAL.getValue());
} else if (response.getStatus() > 299) {
log.error("UNKNOWN: Unexpected API error : {}", response);
ExitUtil.exit(ExitType.UNKNOWN.getValue());
} else {
QueueInfo info = response.readEntity(QueueInfo.class);
if (!info.getAuto_delete().equals(queue.getAuto_delete())) {
log.warn("WARN: Queue {} - auto_delete flag is NOT {}", queue.getQueue_name(), info.getAuto_delete());
ExitUtil.exit(ExitType.WARN.getValue());
}
if (!info.getDurable().equals(queue.getDurable())) {
log.warn("WARN: Queue {} - durable flag is NOT {}", queue.getQueue_name(), info.getDurable());
ExitUtil.exit(ExitType.WARN.getValue());
}
}
log.info("OK: Queue {} configured correctly.", queue.getQueue_name());
ExitUtil.exit(ExitType.OK.getValue());
}
}
4.檢測隊列配置的方法參數 CheckQueue.java
@Data
public class CheckQueue {
private final String queue_name;
private final Boolean auto_delete;
private final Boolean durable;
public CheckQueue(String queue_name, Boolean auto_delete, Boolean durable) {
this.queue_name = queue_name;
this.auto_delete = auto_delete;
this.durable = durable;
}
}
5.運行檢測程序
@Test
public void testQueueConfig() {
String queue_name = "springrabbitexercise";
Boolean auto_delete = false;
Boolean durable = true;
String vhost = "/";
CheckQueue queue = new CheckQueue(queue_name, auto_delete, durable);
QueueConfigCheck.checkQueueConfig(vhost, queue);
}
可以看到監控正常運行:
11:38:23.286 [main] INFO com.lanxiang.rabbitmqmonitor.check.QueueConfigCheck - OK: Queue springrabbitexercise configured correctly.
11:38:23.289 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK
這段RabbitMQ隊列檢測的程序有一處修改,如果健康檢測程序無法連接到API服務器的話,會返回EXIT_UNKNOWN。前一章的API ping健康檢測要么成功要么失敗,故障代碼之間沒有區別,但是隊列檢測API方法在失敗時通過HTTP狀態碼提供了更多信息。如果HTTP狀態碼是404就代表嘗試驗證的隊列不存在,檢測失敗并返回EXIT_CRITICAL。對于其他大于299的HTTP狀態碼,退出代碼為EXIT_UNKNOWN。
在獲取到RabbitMQ API的response之后,使用JSON進行解碼,并且把得到的durable和auto_delete參數與期望的參數進行比較,如果參數和預期不相符的話,返回EXIT_WARNING或者EXIT_CRITICAL狀態碼。如果隊列所有的配置都正確的話,那么就正確退出。
在了解我們對RabbitMQ做監控的原理之后,可以根據RabbitMQ Management HTTP API定制更多的監控,例如:
* /api/nodes,可以獲取集群中每個節點的數據
* /api/queues//,可以獲取隊列的詳細情況,例如消息處理的速率、積壓的消息數量等。
除此之外還有許多其他API,我們要做的就是根據自身的業務邏輯和這些API來設計合理的監控腳本。RabbitMQ監控系列就到此結束啦,還是很可惜沒有實戰的機會吧,因為最近在工作變動期間,看了一下RabbitMQ實戰這本書,興起想寫一下博客試試。
畢業快一年了,想養成寫博客的習慣。正好最近也在工作變動中,能有閑暇時間嘗試一下,博客寫的比較水,多多包涵。