目錄
1.重試配置
2.配置交換機&隊列
3.發送消息
4.消費消息
5. 運行程序觀察結果
6. 手動確認
?注意:
在消息傳遞過程中, 可能會遇到各種問題, 如網絡故障, 服務不可用, 資源不足等, 這些問題可能導致消息處理失敗. 為了解決這些問題, RabbitMQ 提供了重試機制, 允許消息在處理失敗后重新發送.
但如果是程序邏輯引起的錯誤, 那么多次重試也是沒有用的, 可以設置重試次數
1.重試配置
spring:rabbitmq:addresses: amqp://study:study@你的服務器IP:15673/你的虛擬機名listener:simple:acknowledge-mode: auto #消息接收確認retry:enabled: true # 開啟消費者失敗重試initial-interval: 5000ms # 初始失敗等待時?為5秒max-attempts: 5 # 最?重試次數(包括自身消費的一次)
2.配置交換機&隊列
//重試機制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";/重試機制 發布訂閱模式
//1. 交換機
@Bean("retryExchange")
public Exchange retryExchange() {return
ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 隊列
@Bean("retryQueue")
public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
//3. 隊列和交換機綁定 Binding
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange
exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);
}
3.發送消息
@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry
test...");return "發送成功!";
}
4.消費消息
@Component
public class RetryQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.RETRY_QUEUE)public void ListenerQueue(Message message) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//模擬處理失敗try {int num = 3/0;System.out.println("處理完成");
}catch (Exception e){System.out.println("處理失敗");
}}
}
5. 運行程序觀察結果
接收到消息: consumer ack test..., deliveryTag: 1
處理失敗
6. 手動確認
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());//模擬處理失敗int num = 3/0;System.out.println("處理完成");//3. ?動簽收channel.basicAck(deliveryTag, true);}catch (Exception e){//4. 異常了就拒絕簽收Thread.sleep(1000);//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送,,若為false, 則直接
丟棄channel.basicNack(deliveryTag, true,true);}
}
運行結果:
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 2
接收到消息: retry test..., deliveryTag: 3
接收到消息: retry test..., deliveryTag: 4
接收到消息: retry test..., deliveryTag: 5
接收到消息: retry test..., deliveryTag: 6
接收到消息: retry test..., deliveryTag: 7
接收到消息: retry test..., deliveryTag: 8
接收到消息: retry test..., deliveryTag: 9
接收到消息: retry test..., deliveryTag: 10
接收到消息: retry test..., deliveryTag: 11
可以看到, 手動確認模式時, 重試次數的限制不會像在自動確認模式下那樣直接生效, 因為是否重試以及何時重試更多地取決于應用程序的邏輯和消費者的實現.
自動確認模式下, RabbitMQ 會在消息被投遞給消費者后自動確認消息. 如果消費者處理消息時拋出異常, RabbitMQ 根據配置的重試參數自動將消息重新?隊, 從而實現重試. 重試次數和重試間隔等參數可以直接在RabbitMQ的配置中設定,并且RabbitMQ會負責執行這些重試策略.
手動確認模式下, 消費者需要顯式地對消息進行確認. 如果消費者在處理消息時遇到異常, 可以選擇不確認消息使消息可以重新?隊. 重試的控制權在于應用程序本身, 而不是RabbitMQ的內部機制. 應用程序可以通過自己的邏輯和利用RabbitMQ的高級特性來實現有效的重試策略
?注意:
1. 自 動確認模式下: 程序邏輯異常, 多次重試還是失敗, 消息就會被自動確認, 那么消息就丟失
了
2. 手 動確認模式下: 程序邏輯異常, 多次重試消息依然處理失敗, 無法被確認, 就?直是
unacked的狀態, 導致消息積壓