如何監聽RabbitMQ隊列
簡單代碼實現RabbitMQ消息監聽
需要的依賴
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>x.x.x</version></dependency>
消息監聽示例
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class RabbitMQConfig {private String addresses = "localhost:5672";private String username = "xxx";private String password = "xxx";@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(this.addresses);connectionFactory.setUsername(this.username);connectionFactory.setPassword(this.password);return connectionFactory;}@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("queue1", "queue2");//要監聽的隊列名稱// 設置消息監聽器container.setMessageListener((message) -> {log.info("Received message: " + new String(message.getBody()));});return container;}}
手動ACK
前面的示例是默認的自動ACK,自動確認消息投遞成功。但是業務場景需要執行成功后,才確認這條消息被消費,此時我們需要設置手動ACK,只有當執行成功,才確認這條消息被消費。
@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("queue1", "queue2");//要監聽的隊列名稱// 設置消息監聽器container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手動ACK,不設置默認是AUTOcontainer.setMessageListener(messageListener());return container;}@Beanpublic ChannelAwareMessageListener messageListener() {return (message, channel) -> {try {String msg = new String(message.getBody());log.info("Received: " + msg);// 業務邏輯處理boolean success = processMessage(msg);if (success) {// 處理成功,手動ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {// 處理失敗,手動NACK(不重新入隊)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}} catch (Exception e) {// 異常情況,NACK并重新入隊(可選)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}};}/*** 處理消息* @param msg* @return* @throws Exception*/private boolean processMessage(String msg) throws Exception {boolean success = false;int i = RandomUtil.randomInt();if (i % 3 == 0) {success = true;} else if (i % 3 == 1) {success = false; //模擬失敗} else {log.error("處理消息:" + msg + ", 模擬發生故障");throw new Exception("模擬發生故障");//模擬異常}log.info("處理消息:" + msg + ", flag:" + success);return success;}