-
開啟消息確認機制:
在發布消息時,可以設置
deliveryMode
為2(持久化),以確保消息不會因為RabbitMQ的崩潰而丟失。 -
使隊列持久化:
通過設置
durable
為true
,可以確保隊列在RabbitMQ重啟后依然存在。 -
使消費者確認機制:
啟用手動確認模式,并在消費完消息后手動確認。
-
以下是使用Java和Spring AMQP的示例代碼:
@Bean
public Queue myQueue() {return QueueBuilder.durable("myQueue").build();
}@Bean
public DirectExchange myExchange() {return new DirectExchange("myExchange");
}@Bean
public Binding myBinding() {return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
}@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("myQueue");container.setMessageListener(listenerAdapter);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動確認模式return container;
}@Bean
public MessageListenerAdapter listenerAdapter(MyConsumer myConsumer) {return new MessageListenerAdapter(myConsumer, "handleMessage");
}public class MyConsumer {public void handleMessage(Message message) {// 處理消息// ...// 確認消息channel.basicAck(envelope.getDeliveryTag(), false);}
}
在發送消息時:
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 設置消息持久化return message;
});
確保你的消費者在處理完消息后調用basicAck
來確認消息,這樣即使消費者崩潰,未確認的消息也會被重新傳遞給另一個消費者。如果你希望在消費者異常時自動重新將消息放回隊列,可以在handleMessage
方法中捕獲異常,并在異常處理邏輯中調用basicNack
或basicReject
方法,并設置重回隊列的參數。