RabbitMQ ④-持久化 || 死信隊列 || 延遲隊列 || 事務

在這里插入圖片描述

消息確認機制

簡單介紹

RabbitMQ Broker 發送消息給消費者后,消費者處理該消息時可能會發生異常,導致消費失敗。

如果 Broker 在發送消息后就直接刪了,就會導致消息的丟失。

為了保證消息可靠到達消費者并且成功處理了該消息,RabbitMQ 提供了消息確認機制。

消費者在訂閱隊列時,可以指定 autoAck 參數,該參數指定是否自動確認消息。

  • autoAck=true:消費者接收到消息后,自動確認消息,RabbitMQ Broker 立即刪除該消息。
  • autoAck=false:消費者接收到消息后,不自動確認消息,需要消費者調用 channel.basicAck() 方法確認消息。如果消費者處理消息時發生異常,則可以調用 channel.basicNack() 方法,表示不確認該消息的接收。

Spring AMQP 提供了三種模式的消息確認

  • AcknowledgeMode.NONE:消息一經發送,就不管它了,不管消費者是否處理成功,都直接確認消息。
  • AcknowledgeMode.AUTO(默認):自動確認,消息接收后,消費者處理成功時自動確認該消息,如果處理時發送異常,則不會確認消息。
  • AcknowledgeMode.MANUAL:手動確認,消息接收后,消費者處理成功時,需要調用 channel.basicAck() 方法確認消息,如果處理時發送異常,則需要調用 channel.basicNack() 方法,表示不確認該消息的接收。

代碼示例

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("ackExchange")public DirectExchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}
//    @Bean("binding")
//    public Binding binding(Exchange exchange, Queue queue) {
//        return BindingBuilder.bind(queue)
//                .to(exchange)
//                .with("ack")
//                .noargs();
//    }@Bean("binding1")public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消費者消息確認喵~");return "發送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模擬處理業務邏輯");int a = 3 / 0;System.out.println("模擬處理業務完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

持久性機制

簡單介紹

前面講了消費端處理消息時,消息如何不丟失,但是如何保證 RabbitMQ 服務停掉以后,生產者發送的消息不丟失呢。默認情況下, RabbitMQ 退出或者由于某種原因崩潰時,會忽視隊列和消息。

為了保證消息持久化,RabbitMQ 提供了持久化機制,分別是:交換機持久化、隊列持久化和消息持久化。

  • 交換機持久化:使用 ExchangeBuilder.durable(true) 方法創建的交換機,RabbitMQ 會將交換機信息持久化到磁盤,重啟 RabbitMQ 后可以自動恢復。
  • 隊列持久化:使用 QueueBuilder.durable(true) 方法創建的隊列,RabbitMQ 會將隊列信息持久化到磁盤,重啟 RabbitMQ 后可以自動恢復。
  • 消息持久化:消息持久化可以保證消息不丟失,即使 RabbitMQ 重啟或者崩潰,消息也不會丟失。

將所有的消息都設置為持久化,會嚴重影響 RabbitMQ 的性能,這是因為寫入磁盤的速度相比于寫入內存的速度還是很慢的,對于可靠性不是那么高的消息,可以不采用持久化處理以提高整體的吞吐量。

在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡。

盡管設置了持久化,也不能保證就一定可以持久化。這是因為在將這些持久化信息寫入磁盤時也是需要時間的,如果 RabbitMQ 在這段時間內崩潰,那么這些信息也會丟失。

代碼示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("persistQueue")public Queue persistQueue() {return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE).build();}@Bean("persistExchange")public DirectExchange persistExchange() {return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE).durable(false).build();}@Bean("binding2")public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("persist").noargs();}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/persist")public String persist() {Message message = new Message("消費者消息確認喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);return "發送成功";}
}

發送方確認機制

簡單介紹

在發送方將消息發送至 RabbitMQ Broker 時,也有可能出現消息丟失的情況。

為了保證消息可靠到達 Broker,RabbitMQ 提供了發送方確認機制。

發送方確認機制是指,在消息發送到 Broker 后,發送方會等待 Broker 回應,如果發送方收到消息,則發送方認為消息發送成功,如果發送方未收到消息,則發送方認為消息發送失敗,可以重新發送。

RabbitMQ 提供了兩種方式保證發送方發送的消息的可靠傳輸

  • confirm 確認模式:發送方在發送消息后,對發送方設置一個 ConfirmCallback 的監聽,無論消息是否抵達 Exchange,這個監聽都會被執行,如果消息抵達了 Exchange,則 ACKtrue,如果消息沒有抵達 Exchange,則 ACKfalse
  • returns 退回模式:盡管確認消息發送至 Exchange 后,也依然不能完全保證消息的可靠傳輸。在 ExchangeQueue 會有一個 Routing Key(Binding Key) 的綁定關系,如果消息沒有匹配到任何一個 Queue,則通過 returns 模式則會退回到發送方。

代碼示例

confirm 確認模式

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息發送確認機制publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("binding3")public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("confirm");}
}
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 設置確認消息機制rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack) {System.out.printf("接收到消息,消息ID:%s\n",correlationData == null ? null : correlationData.getId());} else {System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",correlationData == null ? null : correlationData.getId(), cause);}}});return rabbitTemplate;}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {// ! 直接使用 setConfirmCallback 會影響其他接口的調用// ! 且只能設置一個確認回調,多次發起請求會報錯
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                System.out.println("執行了confirm方法");
//                if (ack) {
//                    System.out.printf("接收到消息,消息ID:%s\n",
//                            correlationData == null ? null : correlationData.getId());
//                } else {
//                    System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
//                            correlationData == null ? null : correlationData.getId(), cause);
//                }
//            }
//        });CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);return "消息發送成功";}
}

returns 退回模式

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息發送退回機制publisher-returns: true
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 設置消息退回機制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回:" + returned);}});return rabbitTemplate;}
}

總結:如何確保消息的可靠性傳輸

  • 發送方 => 服務端:通過發送方確認機制confirm 確認模式returns 退回模式,確保消息可靠到達。
  • 服務端:通過持久化機制,保證消息不丟失。
  • 服務端 => 接收方:通過消息確認機制,確保消息被消費者正確消費。

重試機制

簡單介紹

消息在處理失敗后,重新發送,重新處理,這便是消息重試機制。

RabbitMQ 提供了消息重試機制,可以設置消息最大重試次數,超過最大重試次數還未成功消費,則消息會被丟棄。

代碼示例

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:
#         消息接收確認機制# acknowledge-mode: manual # 手動確認時,重發機制無效acknowledge-mode: autoretry:enabled: true # 開啟重試機制initial-interval: 5000ms # 重發時間間隔max-attempts: 5 # 最大重試次數
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("binding4")public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("retry");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/retry")public String retry() {rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息發送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void process(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3 / 0;System.out.println("業務處理完成");}
}

TTL 機制

簡單介紹

TTL(Time To Live)機制,可以設置消息的存活時間,超過存活時間還未消費,則消息會被丟棄。

RabbitMQ 提供了 TTL 機制,可以設置隊列和消息的 TTL 值,超過 TTL 值還未消費,則消息會被丟棄。

兩者區別:

  • 設置隊列 TTL 值,一旦消息過期,就會從隊列中刪除。設置隊列過期時間,隊列中已過期的消息肯定在隊列頭部,RabbitMQ 只要定期掃描對頭的消息是否過期即可。
  • 設置消息 TTL 值,即使消息過期,也不會馬上刪除,只有在發送至消費者時才會檢測其是否已經過期,如果過期才會刪除。設置消息過期時間,每個消息的過期時間都可能不盡相同,所以需要掃描整個隊列的消息才可確定是否過期,為了確保性能,所以采取類似于 懶漢模式 的方式。

將隊列 TTL 設置為 30s,第一個消息的 TTL 設置為 30s,第二個消息的 TTL 設置為 10s。

理論上說,在 10s 后,第二個消息應該被丟棄。但由于設置了隊列 TTL 值的機制,只會掃描隊頭的消息是否過期,所以在第一個消息過期之前,第二個消息不會被刪除。

代碼示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20_000) // ? 設置隊列的 TTL 值.build();}@Bean("ttlExchange")public DirectExchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("binding5")public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 設置消息的 TTL 值message.getMessageProperties().setExpiration("10000");return message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",messagePostProcessor);return "消息發送成功";}
}

死信隊列

簡單介紹

死信(Dead Letter),就是因為某種原因,導致消費者消費失敗的消息,稱之為死信。

死信隊列,當消息在一個隊列中變成死信時,它就被重新被發送到另一個交換機,該交換機就是死信交換機(Dead Letter Exchange)。

該死信交換機綁定死信隊列,當消息被重新發送到死信交換機時,它就被重新投遞到死信隊列。

消息變成死信會有如下幾種原因:

  • 消息被拒絕(basic.reject 或 basic.nack)并且 requeue 參數設置為 false。
  • 消息過期。
  • 隊列達到最大長度。

代碼示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DlConfig {@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置該隊列的死信交換機.deadLetterRoutingKey("dl") // ? 死信交換機綁定死信隊列的 Routing Key.ttl(10_000).maxLength(10L) // ? 設置隊列最大長度.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void processNormal(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void processDl(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");return "消息發送成功:" + new Date();}
}

延遲隊列

簡單介紹

延遲隊列(Delay Queue),即消息被發送以后,并不想讓消費者立刻消費該消息,而是等待一段時間后再消費。

延遲隊列的使用場景有很多,比如:

  • 智能家居:智能設備產生的事件,如開關、溫度變化等,可以先存放在延遲隊列中,等待一段時間后再消費。
  • 日常管理:預定會議,需要在會議開始前 15 分鐘通知參會人員。
  • 訂單處理:訂單創建后,需要 30 分鐘后才會發貨。

RabbitMQ 本身沒有提供延遲隊列的功能,但是基于消息過期后會變成死信的特性,可以通過設置 TTL 和死信隊列來實現延遲隊列的功能。

代碼示例

@RequestMapping("/delay")
public String delay() {//發送帶ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s過期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s過期return messagePostProcessor;});return "發送成功!";
}

由于 RabbitMQ 檢查消息是否過期的機制,如果 20s 的消息先到隊列,那么 10s 的消息只會在 20s 后才會被檢查到過期。

延遲隊列插件

RabbitMQ 官方提供了延遲隊列插件,可以實現延遲隊列的功能。

延遲隊列插件

安裝隊列插件

延遲隊列插件下載地址

下載插件后,需要將插件放到 RabbitMQ 的插件目錄(/usr/lib/rabbitmq/plugins)下,然后重啟 RabbitMQ 服務。

代碼示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).durable(true).delayed() // ? 設置隊列為延遲隊列.build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void processDelay(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(20_000L); // ? 設置消息的延遲發送時間return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(10_000L); // ? 設置消息的延遲發送時間return message;});return "消息發送成功:" + new Date();}
}

事務機制

簡單介紹

RabbitMQ 是基于 AMQP 協議實現的,該協議實現了事務機制,因此RabbitMQ也支持事務機制事務。

Spring AMQP 也提供了對事務相關的操作。RabbitMQ 事務允許開發者確保消息的發送和接收是原子性的,要么全部成功,要么全部失敗。

代碼示例

配置事務管理器:

@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {String msg = "trans test...";System.out.println("發送第一條消息:" + msg + 1);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);int a = 3 / 0;System.out.println("發送第二條消息:" + msg + 2);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);return "消息發送成功:";
}

消息分發

簡單介紹

一個隊列可以給多個消費者消費,默認情況下,RabbitMQ 是以輪詢的方式將消息分發給這些消費者,不管該消費是否已經消費并且確認。

這種情況是不太合理的,如果每個消費者消費的能力都不同,有的消費者消費快,有的慢,這會極大降低整體系統的吞吐量和處理速度。

我們可以使用 channel.basicQos(int prefetchCount) 來限制當前信
道上的消費者所能保持的最大未確認消息的數量。

當該消費者達到最大的 prefetchCount 限制時,RabbitMQ 會停止向該消費者分發消息,直到該消費者的未確認消息數量小于 prefetchCount 時。

代碼示例

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手動確認prefetch: 5 # 隊列最大接收五條消息
@Bean("qosQueue")
public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos");
}
@RequestMapping("/qos")
public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");}return "消息發送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模擬處理業務邏輯");System.out.println("模擬處理業務完成");//            channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}
}

應用場景

  • 限流:可以根據消費者的處理能力,設置 prefetchCount 限制每個消費者所能接收的消息數量,從而達到限流的目的。
  • 負載均衡:通過將 prefetchCount 設置為 1,通過設 prefetch = 1 的方式,告訴 RabbitMQ 一次只給一個消費者一條消息,也就是說,在處理并確認前一條消息之前,不要向該消費者發送新消息。相反,它會將它分派給下一個不忙的消費者。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/80683.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/80683.shtml
英文地址,請注明出處:http://en.pswp.cn/web/80683.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python打卡訓練營打卡記錄day31

知識點回顧 規范的文件命名規范的文件夾管理機器學習項目的拆分編碼格式和類型注解 作業&#xff1a;嘗試針對之前的心臟病項目ipynb&#xff0c;將他按照今天的示例項目整理成規范的形式&#xff0c;思考下哪些部分可以未來復用。 心臟病項目目錄 目錄結構:heart/ ├── conf…

mac .zshrc:1: command not found: 0 解決方案

nano ~/.zshrc 使用自帶的nano命令打開文件&#xff0c;修改后 Ctrl X 然后輸入y 然后回車即可保存成功 一般情況下&#xff0c;不是常用這個命令&#xff0c;除非是遇到有問題的文件&#xff0c;才用&#xff0c; 例如 遇到下面的問題 /Users/xxli/.zshrc:1: command no…

uniapp生成的app,關于跟其他設備通信的支持和限制

以下內容通過AI生成&#xff0c;這里做一下記錄。 藍牙 移動應用&#xff08;App&#xff09;通過藍牙與其他設備通信&#xff0c;是通過分層協作實現的。 一、通信架構分層 應用層&#xff08;App&#xff09; 調用操作系統提供的藍牙API&#xff08;如Android的BluetoothA…

第50天-使用Python+Qt+DeepSeek開發AI運勢測算

1. 環境準備 bash 復制 下載 pip install pyside6 requests python-dotenv 2. 獲取DeepSeek API密鑰 訪問DeepSeek官網注冊賬號 進入控制臺創建API密鑰 在項目根目錄創建.env文件: env 復制 下載 DEEPSEEK_API_KEY=your_api_key_here 3. 創建主應用框架 python 復制…

上位機與Hid設備通信

前置知識 什么是HID&#xff1f; HID&#xff08;Human Interface Device&#xff09;是?直接與人交互的電子設備?&#xff0c;通過標準化協議實現用戶與計算機或其他設備的通信&#xff0c;典型代表包括鍵盤、鼠標、游戲手柄等。? 為什么HID要與qt進行通信&#xff1f; …

JVM 工具實戰指南(jmap / jstack / Arthas / MAT)

&#x1f50d; 從診斷到定位&#xff1a;掌握生產級 JVM 排查工具鏈 &#x1f4d6; 前言&#xff1a;系統故障時&#xff0c;如何快速定位&#xff1f; 無論 JVM 理論多么扎實&#xff0c;當線上服務出現 CPU 飆高、響應超時、內存泄漏或頻繁 Full GC 時&#xff0c;僅靠猜測…

mac上安裝 Rust 開發環境

1.你可以按照提示在終端中執行以下命令&#xff08;安全、官方支持&#xff09;&#xff1a; curl --proto https --tlsv1.2 -sSf https://sh.rustup.rs | sh然后按提示繼續安裝即可。 注意&#xff1a;安裝過程中建議選擇默認配置&#xff08;按 1 即可&#xff09;。 如果遇…

C++(5)switch語句 循環while

這是一個電影評分的程序 default 就是 如果上述的都沒有執行 就統一的執行default的內容。 然后記得break ___________________________________ 循環 &#xff08;while&#xff09; while的使用方式 輸出 0-9的while循環

[Linux] Linux線程信號的原理與應用

Linux線程信號的原理與應用 文章目錄 Linux線程信號的原理與應用**關鍵詞****第一章 理論綜述****第二章 研究方法**1. **實驗設計**1.1 構建多線程測試環境1.2 信號掩碼策略對比實驗 2. **數據來源**2.1 內核源碼分析2.2 用戶態API調用日志與性能監控 **第三章 Linux信號的用法…

25.5.20學習總結

做題思路 數列分段 Section IIhttps://www.luogu.com.cn/problem/P1182正如題目所說&#xff0c;我們需要得到一個最小的最大段的值&#xff0c;可能有人將注意力放在分段上&#xff0c;事實上&#xff0c;我們更多的應該關注結果。這是一道二分答案的題&#xff0c;你可以先確…

Python爬蟲-爬取百度指數之人群興趣分布數據,進行數據分析

前言 本文是該專欄的第56篇,后面會持續分享python爬蟲干貨知識,記得關注。 在本專欄之前的文章《Python爬蟲-爬取百度指數之需求圖譜近一年數據》中,筆者有詳細介紹過爬取需求圖譜的數據教程。 而本文,筆者將再以百度指數為例子,基于Python爬蟲獲取指定關鍵詞的人群“興…

【工具使用】STM32CubeMX-USB配置-實現U盤功能

一、概述 無論是新手還是大佬&#xff0c;基于STM32單片機的開發&#xff0c;使用STM32CubeMX都是可以極大提升開發效率的&#xff0c;并且其界面化的開發&#xff0c;也大大降低了新手對STM32單片機的開發門檻。 ????本文主要講述STM32芯片USB功能的配置及其相關知識。 二…

從ISO17025合規到信創適配 解密質檢lims系統實驗室的 AI 質檢全鏈路實踐

在北京某國家級質檢中心的 CMA 復評審現場&#xff0c;審核專家通過系統后臺調取近半年的檢測記錄&#xff0c;從樣品登記時的電子簽名到報告簽發的 CA 簽章&#xff0c;178 項合規指標全部自動校驗通過 —— 這是白碼質檢 LIMS 系統創造的合規奇跡。 一、智能合規引擎&#xf…

【操作系統】進程同步問題——生產者-消費者問題

問題描述 生產者進程負責生產產品&#xff0c;并將產品存入緩沖池&#xff0c;消費者進程則從緩沖池中取出產品進行消費。為實現生產者和消費者的并發執行&#xff0c;系統在兩者之間設置了一個包含n個緩沖區的緩沖池。生產者將產品放入緩沖區&#xff0c;消費者則從緩沖區中取…

SpringBoot-6-在IDEA中配置SpringBoot的Web開發測試環境

文章目錄 1 環境配置1.1 JDK1.2 Maven安裝配置1.2.1 安裝1.2.2 配置1.3 Tomcat1.4 IDEA項目配置1.4.1 配置maven1.4.2 配置File Encodings1.4.3 配置Java Compiler1.4.4 配置Tomcat插件2 Web開發環境2.1 項目的POM文件2.2 項目的主啟動類2.3 打包為jar或war2.4 訪問測試3 附錄3…

Vue3 父子組件傳值, 跨組件傳值,傳函數

目錄 1.父組件向子組件傳值 1.1 步驟 1.2 格式 2. 子組件向父組件傳值 1.1 步驟 1.2 格式 3. 跨組件傳值 運行 4. 跨組件傳函數 ?5. 總結 1. 父傳子 2. 子傳父 3. 跨組件傳值(函數) 1.父組件向子組件傳值 1.1 步驟 在父組件中引入子組件 在子組件標簽中自定義屬…

嵌入式學習筆記 - STM32 U(S)ART 模塊HAL 庫函數總結

一 串口發送方式&#xff1a; ①輪訓方式發送&#xff0c;也就是主動發送&#xff0c;這個容易理解&#xff0c;使用如下函數&#xff1a; HAL_UART_Transmit(UART_HandleTypeDef *huart, const uint8_t *pData, uint16_t Size, uint32_t Timeout); ②中斷方式發送&#xff…

AI無法解決的Bug系列(一)跨時區日期過濾問題

跨時區開發中&#xff0c;React Native如何處理新西蘭的日期過濾問題 有些Bug&#xff0c;不是你寫錯代碼&#xff0c;而是現實太魔幻。 比如我最近給新西蘭客戶開發一個React Native應用&#xff0c;功能非常樸素&#xff1a;用戶選一個日期范圍&#xff0c;系統返回該范圍內…

基于天貓 API 的高效商品詳情頁實時數據接入方法解析

一、引言 在電商大數據分析、競品監控及智能選品等場景中&#xff0c;實時獲取天貓商品詳情頁數據是關鍵需求。本文將詳細解析通過天貓開放平臺 API 高效接入商品詳情數據的技術方案&#xff0c;涵蓋接口申請、數據獲取邏輯及代碼實現&#xff0c;幫助開發者快速構建實時數據采…

系分論文《論遺產系統演化》

系統分析師論文范文系列 摘要 2022年6月,某金融機構啟動核心業務系統的技術升級項目,旨在對其運行超過十年的遺留系統進行演化改造。該系統承擔著賬戶管理、支付結算等關鍵業務功能,但其技術架構陳舊、擴展性不足,難以適應數字化轉型與業務快速增長的需求。作為系統分析師,…