1. 為什么要消息分發
當 broker 擁有多個消費者時,就會將消息分發給不同的消費者,消費者之間的消息不會重復,RabbitMQ 默認的消息分發機制是輪詢,但會無論消費者是否發送了 ack,broker 都會繼續發送消息至消費者,這就會造成消費者壓力增大。于是,可以限制消費者每一次接收到的消息的數量,當消息達到該數量時,broker 就不會繼續給這個消費者發送消息,而是會給其他的消費者派送消息。
所消費者消費了一條消息,并且給 broker 發送了 ack,那么此時消費者所未消耗的消息就沒有達到最大消息數量,于是 broker 就會繼續給消費者分配消息,直到消費者未消費的消息數量達到上限。
對于這種特性,有下面兩個應用場景:
1.1 限流
在某些秒殺場景中,每一秒消費者接收到的消息都會非常大,那么就會造成消費者壓力過大。于是我們就可以限制消費者所能接收的最大消息數量。
配置代碼如下:
spring:rabbitmq:listener:simple:acknowledge-mode: manualprefetch: 5 #每個隊列最多接收五條消息
在配置中,prefetch 表示隊列中的消息數量上限為 5 條,若隊列中未確認的消息數量達到 5 條,此時 broker?就不會繼續給該隊列分配消息,而是給其它的未達到上限的隊列分配消息。
并且此處要設置為手動確認,若使用 auto 或 none,可能業務邏輯還沒有開始消息就已經被簽收,這就無法發揮限流的作用。
隊列、交換機聲明代碼如下:
@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qoeExchange")public DirectExchange qoeExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBind")public Binding qosBind(@Qualifier("qoeExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.QOS_ROUTINGKEY);}
生產者代碼如下:
@RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {String messageInfo = "qos... " + i;rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, Constants.QOS_ROUTINGKEY, messageInfo);}return "消息發送成功";}
消費者代碼如下:
@Component
@Slf4j
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);}}
此處,消費者沒有給 broker 發送 ack,那么隊列中的消息就會一直存在。
代碼運行結果如下:
這里我們可以看到,一共有 5 條未確認的消息, 已經達到了上限,于是就不會繼續向消費者發送消息。
1.2 負載均衡
使用消息分發,也可以實現負載均衡。
現有兩個消費者 A、B,A 處理消息的速度慢,B處理消息的速度快。若不設置負載均衡,那么就會出現 A 積壓的消息過多,而 B 幾乎沒有什么消息擠壓,這就沒有充分地利用資源。
于是我們可以將?prefetch 設置為 1,那么每次消費者只會接收到一條消息,當 A、B 接收到消息后,在 B 處理完成時 A 還沒有處理完,于是 broker 就會給 B 繼續推送消息,直到 A 處理完成后才會繼續給 A 推送消息。
消費者代碼如下:
@Component
@Slf4j
public class QosListener {/*** 消費者1* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消費者 1 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(2000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 消費者2* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener2(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消費者 2 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(1000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
上述代碼,將消費者 2 的處理速度是消費者 1 的兩倍,代碼運行結果如下:
可以看出,消費者? 2 每消耗 2 條數據,消費者 1 才消耗 1 條數據。,也就達到了負載均衡的作用。