消息確認機制
簡單介紹
RabbitMQ Broker 發送消息給消費者后,消費者處理該消息時可能會發生異常,導致消費失敗。
如果 Broker 在發送消息后就直接刪了,就會導致消息的丟失。
為了保證消息可靠到達消費者并且成功處理了該消息,RabbitMQ 提供了消息確認機制。
消費者在訂閱隊列時,可以指定 autoAck
參數,該參數指定是否自動確認消息。
autoAck=true
:消費者接收到消息后,自動確認消息,RabbitMQ Broker 立即刪除該消息。autoAck=false
:消費者接收到消息后,不自動確認消息,需要消費者調用channel.basicAck()
方法確認消息。如果消費者處理消息時發生異常,則可以調用channel.basicNack()
方法,表示不確認該消息的接收。
Spring AMQP 提供了三種模式的消息確認
AcknowledgeMode.NONE
:消息一經發送,就不管它了,不管消費者是否處理成功,都直接確認消息。AcknowledgeMode.AUTO
(默認):自動確認,消息接收后,消費者處理成功時自動確認該消息,如果處理時發送異常,則不會確認消息。AcknowledgeMode.MANUAL
:手動確認,消息接收后,消費者處理成功時,需要調用channel.basicAck()
方法確認消息,如果處理時發送異常,則需要調用channel.basicNack()
方法,表示不確認該消息的接收。
代碼示例
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("ackExchange")public DirectExchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}
// @Bean("binding")
// public Binding binding(Exchange exchange, Queue queue) {
// return BindingBuilder.bind(queue)
// .to(exchange)
// .with("ack")
// .noargs();
// }@Bean("binding1")public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消費者消息確認喵~");return "發送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模擬處理業務邏輯");int a = 3 / 0;System.out.println("模擬處理業務完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}
持久性機制
簡單介紹
前面講了消費端處理消息時,消息如何不丟失,但是如何保證 RabbitMQ
服務停掉以后,生產者發送的消息不丟失呢。默認情況下, RabbitMQ
退出或者由于某種原因崩潰時,會忽視隊列和消息。
為了保證消息持久化,RabbitMQ 提供了持久化機制,分別是:交換機持久化、隊列持久化和消息持久化。
- 交換機持久化:使用
ExchangeBuilder.durable(true)
方法創建的交換機,RabbitMQ 會將交換機信息持久化到磁盤,重啟 RabbitMQ 后可以自動恢復。 - 隊列持久化:使用
QueueBuilder.durable(true)
方法創建的隊列,RabbitMQ 會將隊列信息持久化到磁盤,重啟 RabbitMQ 后可以自動恢復。 - 消息持久化:消息持久化可以保證消息不丟失,即使 RabbitMQ 重啟或者崩潰,消息也不會丟失。
將所有的消息都設置為持久化,會嚴重影響 RabbitMQ 的性能,這是因為寫入磁盤的速度相比于寫入內存的速度還是很慢的,對于可靠性不是那么高的消息,可以不采用持久化處理以提高整體的吞吐量。
在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡。
盡管設置了持久化,也不能保證就一定可以持久化。這是因為在將這些持久化信息寫入磁盤時也是需要時間的,如果 RabbitMQ 在這段時間內崩潰,那么這些信息也會丟失。
代碼示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("persistQueue")public Queue persistQueue() {return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE).build();}@Bean("persistExchange")public DirectExchange persistExchange() {return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE).durable(false).build();}@Bean("binding2")public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("persist").noargs();}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/persist")public String persist() {Message message = new Message("消費者消息確認喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);return "發送成功";}
}
發送方確認機制
簡單介紹
在發送方將消息發送至 RabbitMQ Broker 時,也有可能出現消息丟失的情況。
為了保證消息可靠到達 Broker,RabbitMQ 提供了發送方確認機制。
發送方確認機制是指,在消息發送到 Broker
后,發送方會等待 Broker
回應,如果發送方收到消息,則發送方認為消息發送成功,如果發送方未收到消息,則發送方認為消息發送失敗,可以重新發送。
RabbitMQ 提供了兩種方式保證發送方發送的消息的可靠傳輸
confirm 確認模式
:發送方在發送消息后,對發送方設置一個ConfirmCallback
的監聽,無論消息是否抵達Exchange
,這個監聽都會被執行,如果消息抵達了Exchange
,則ACK
為true
,如果消息沒有抵達Exchange
,則ACK
為false
。returns 退回模式
:盡管確認消息發送至Exchange
后,也依然不能完全保證消息的可靠傳輸。在Exchange
和Queue
會有一個Routing Key(Binding Key)
的綁定關系,如果消息沒有匹配到任何一個Queue
,則通過returns
模式則會退回到發送方。
代碼示例
confirm 確認模式
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息發送確認機制publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("binding3")public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("confirm");}
}
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 設置確認消息機制rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack) {System.out.printf("接收到消息,消息ID:%s\n",correlationData == null ? null : correlationData.getId());} else {System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",correlationData == null ? null : correlationData.getId(), cause);}}});return rabbitTemplate;}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {// ! 直接使用 setConfirmCallback 會影響其他接口的調用// ! 且只能設置一個確認回調,多次發起請求會報錯
// rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// @Override
// public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// System.out.println("執行了confirm方法");
// if (ack) {
// System.out.printf("接收到消息,消息ID:%s\n",
// correlationData == null ? null : correlationData.getId());
// } else {
// System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
// correlationData == null ? null : correlationData.getId(), cause);
// }
// }
// });CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);return "消息發送成功";}
}
returns 退回模式
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息發送退回機制publisher-returns: true
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 設置消息退回機制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回:" + returned);}});return rabbitTemplate;}
}
總結:如何確保消息的可靠性傳輸
- 發送方 => 服務端:通過發送方確認機制,confirm 確認模式 和 returns 退回模式,確保消息可靠到達。
- 服務端:通過持久化機制,保證消息不丟失。
- 服務端 => 接收方:通過消息確認機制,確保消息被消費者正確消費。
重試機制
簡單介紹
消息在處理失敗后,重新發送,重新處理,這便是消息重試機制。
RabbitMQ 提供了消息重試機制,可以設置消息最大重試次數,超過最大重試次數還未成功消費,則消息會被丟棄。
代碼示例
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:
# 消息接收確認機制# acknowledge-mode: manual # 手動確認時,重發機制無效acknowledge-mode: autoretry:enabled: true # 開啟重試機制initial-interval: 5000ms # 重發時間間隔max-attempts: 5 # 最大重試次數
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("binding4")public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("retry");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/retry")public String retry() {rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息發送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void process(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3 / 0;System.out.println("業務處理完成");}
}
TTL 機制
簡單介紹
TTL(Time To Live)機制,可以設置消息的存活時間,超過存活時間還未消費,則消息會被丟棄。
RabbitMQ 提供了 TTL 機制,可以設置隊列和消息的 TTL 值,超過 TTL 值還未消費,則消息會被丟棄。
兩者區別:
- 設置隊列 TTL 值,一旦消息過期,就會從隊列中刪除。設置隊列過期時間,隊列中已過期的消息肯定在隊列頭部,RabbitMQ 只要定期掃描對頭的消息是否過期即可。
- 設置消息 TTL 值,即使消息過期,也不會馬上刪除,只有在發送至消費者時才會檢測其是否已經過期,如果過期才會刪除。設置消息過期時間,每個消息的過期時間都可能不盡相同,所以需要掃描整個隊列的消息才可確定是否過期,為了確保性能,所以采取類似于
懶漢模式
的方式。
將隊列 TTL 設置為 30s,第一個消息的 TTL 設置為 30s,第二個消息的 TTL 設置為 10s。
理論上說,在 10s 后,第二個消息應該被丟棄。但由于設置了隊列 TTL 值的機制,只會掃描隊頭的消息是否過期,所以在第一個消息過期之前,第二個消息不會被刪除。
代碼示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20_000) // ? 設置隊列的 TTL 值.build();}@Bean("ttlExchange")public DirectExchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("binding5")public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 設置消息的 TTL 值message.getMessageProperties().setExpiration("10000");return message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",messagePostProcessor);return "消息發送成功";}
}
死信隊列
簡單介紹
死信(Dead Letter),就是因為某種原因,導致消費者消費失敗的消息,稱之為死信。
死信隊列,當消息在一個隊列中變成死信時,它就被重新被發送到另一個交換機,該交換機就是死信交換機(Dead Letter Exchange)。
該死信交換機綁定死信隊列,當消息被重新發送到死信交換機時,它就被重新投遞到死信隊列。
消息變成死信會有如下幾種原因:
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue 參數設置為 false。
- 消息過期。
- 隊列達到最大長度。
代碼示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DlConfig {@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置該隊列的死信交換機.deadLetterRoutingKey("dl") // ? 死信交換機綁定死信隊列的 Routing Key.ttl(10_000).maxLength(10L) // ? 設置隊列最大長度.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void processNormal(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void processDl(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");return "消息發送成功:" + new Date();}
}
延遲隊列
簡單介紹
延遲隊列(Delay Queue),即消息被發送以后,并不想讓消費者立刻消費該消息,而是等待一段時間后再消費。
延遲隊列的使用場景有很多,比如:
- 智能家居:智能設備產生的事件,如開關、溫度變化等,可以先存放在延遲隊列中,等待一段時間后再消費。
- 日常管理:預定會議,需要在會議開始前 15 分鐘通知參會人員。
- 訂單處理:訂單創建后,需要 30 分鐘后才會發貨。
RabbitMQ 本身沒有提供延遲隊列的功能,但是基于消息過期后會變成死信的特性,可以通過設置 TTL 和死信隊列來實現延遲隊列的功能。
代碼示例
@RequestMapping("/delay")
public String delay() {//發送帶ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s過期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s過期return messagePostProcessor;});return "發送成功!";
}
由于 RabbitMQ 檢查消息是否過期的機制,如果 20s 的消息先到隊列,那么 10s 的消息只會在 20s 后才會被檢查到過期。
延遲隊列插件
RabbitMQ 官方提供了延遲隊列插件,可以實現延遲隊列的功能。
延遲隊列插件
安裝隊列插件
延遲隊列插件下載地址
下載插件后,需要將插件放到 RabbitMQ 的插件目錄(/usr/lib/rabbitmq/plugins
)下,然后重啟 RabbitMQ 服務。
代碼示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).durable(true).delayed() // ? 設置隊列為延遲隊列.build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void processDelay(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(20_000L); // ? 設置消息的延遲發送時間return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(10_000L); // ? 設置消息的延遲發送時間return message;});return "消息發送成功:" + new Date();}
}
事務機制
簡單介紹
RabbitMQ
是基于 AMQP
協議實現的,該協議實現了事務機制,因此RabbitMQ也支持事務機制事務。
Spring AMQP
也提供了對事務相關的操作。RabbitMQ
事務允許開發者確保消息的發送和接收是原子性的,要么全部成功,要么全部失敗。
代碼示例
配置事務管理器:
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {String msg = "trans test...";System.out.println("發送第一條消息:" + msg + 1);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);int a = 3 / 0;System.out.println("發送第二條消息:" + msg + 2);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);return "消息發送成功:";
}
消息分發
簡單介紹
一個隊列可以給多個消費者消費,默認情況下,RabbitMQ 是以輪詢的方式將消息分發給這些消費者,不管該消費是否已經消費并且確認。
這種情況是不太合理的,如果每個消費者消費的能力都不同,有的消費者消費快,有的慢,這會極大降低整體系統的吞吐量和處理速度。
我們可以使用 channel.basicQos(int prefetchCount)
來限制當前信
道上的消費者所能保持的最大未確認消息的數量。
當該消費者達到最大的 prefetchCount
限制時,RabbitMQ 會停止向該消費者分發消息,直到該消費者的未確認消息數量小于 prefetchCount
時。
代碼示例
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手動確認prefetch: 5 # 隊列最大接收五條消息
@Bean("qosQueue")
public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos");
}
@RequestMapping("/qos")
public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");}return "消息發送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模擬處理業務邏輯");System.out.println("模擬處理業務完成");// channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}
}
應用場景
- 限流:可以根據消費者的處理能力,設置
prefetchCount
限制每個消費者所能接收的消息數量,從而達到限流的目的。 - 負載均衡:通過將
prefetchCount
設置為1
,通過設 prefetch = 1 的方式,告訴 RabbitMQ 一次只給一個消費者一條消息,也就是說,在處理并確認前一條消息之前,不要向該消費者發送新消息。相反,它會將它分派給下一個不忙的消費者。