1. 安裝和配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
1.2 yml 配置
### 生產端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虛擬主機username: guestpassword: guestpublisher-returns: true #確認消息已經發送到隊列,生產上無需開啟# simple:同步等待confirm結果,直到超時#開啟消息確認 :correlated:異步回調,MQ返回結果時會回調這個ComfirmCallbackpublisher-confirm-type: correlated #確認消息已發送到交換機
## 生產端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虛擬主機username: guestpassword: guestpublisher-returns: true #確認消息已經發送到隊列,生產上無需開啟# simple:同步等待confirm結果,直到超時#開啟消息確認 :correlated:異步回調,MQ返回結果時會回調這個ComfirmCallbackpublisher-confirm-type: correlated #確認消息已發送到交換機
2.生產端的消息確認發送代碼
/*** (1) RabbitTemplate.ConfirmCallback 這個接口是用來確定消息是否到達交換器的* (2) RabbitTemplate.ReturnsCallback 這個則是用來確定消息是否到達隊列的,未到達隊列時會被調用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一個參數表示交換機,第二個參數表示 routing key,第三個參數即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意輸入一個不存在的交換機rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意輸入一個不存在的隊列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--發送結束");}/*** 需要給ConfirmCallback賦值 不然不會走回調方法,默認是null* //將當前類的實例設置為 RabbitMQ 的確認回調處理器,跟下面的confirm方法聯合使用,* // 還需要打開配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
// rabbitTemplate.setConfirmCallback(this);}/** 此方法用于監聽消息是否發送到交換機* 回調*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 監聽消息成功發送到交換機--回調id = {}", correlationData);} else {log.info("confirm -- 消息沒有發送到交換機回調id= {},消息發送失敗:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到達隊列 --- returnedMessage= " + returnedMessage);}
}
2.2 生產端的截圖

3.消費端代碼
@Component
@Slf4j
public class RabbitConfirmConsumer {// 交換機public static final String confirm_exchange_name = "confirm_exchange";// 隊列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 聲明交換機@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 聲明隊列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 綁定隊列到交換機@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功處理消息,RabbitMQ從隊列中刪除該消息* nack:消息處理失敗,RabbitMQ需要再次投遞消息* reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//獲取消息的唯一標記long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息為:{},消息的唯一標記={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//獲取消息的內容byte[] body = message.getBody();//basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。channel.basicAck(deliveryTag,false);//false 表示僅確認當前消息消費成功log.info("接收的消息為:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消費數據");}}}
3.2消費端截圖
4 消費端重試機制
@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//隊列與交換機進行綁定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count = 0;//測試重試,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重試次數 = {},重試接收數據為:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}
4.2 重試機制截圖

5. 限流設置--消費端
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 開啟手動確認模式prefetch: 5 #控制消費者從隊列中預取(prefetch)消息的數量,以此來實現流控制
5.1 生產端--發送19條信息
@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流測試--" + i);rabbitMqProducer.xianliuTest(map);}return "限流測試發送成功";}/**** 限流消息的發送測試*/public void xianliuTest(Map<String, String> map) {// 第一個參數表示交換機,第二個參數表示 routing key,第三個參數即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}
5.2 消費端
/*** ack:成功處理消息,RabbitMQ從隊列中刪除該消息* nack:消息處理失敗,RabbitMQ需要再次投遞消息* reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//獲取消息的唯一標記long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息為:{},消息的唯一標記={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。//channel.basicAck(deliveryTag,false);//false 表示僅確認當前消息消費成功log.info("接收的消息為:{}", map);}else{//否定確認//channel.basicNack(deliverTag,false,true);//requeue為false,則變成死信隊列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消費數據");}}
5.3 注釋掉channel.basicAck--堵塞了

5.4 注釋掉了?prefetch -- 19條全部被消費,即使沒有ack
