rabbitMQ在發送消息時,會出現交換機不存在(交換機名字寫錯等消息),這種情況如何會退給生產者重新處理?【交換機層】
生產者發送消息時,消息未送達到指定的隊列,如何消息回退?
核心:對類RabbitTemplate.ConfirmCallback 和RabbitTemplate.ReturnCallback的重寫。
RabbitTemplate.ConfirmCallback:交換機在收到消息或者沒收到消息時會被觸發
RabbitTemplate.ReturnCallback:消息進入交換機,不能達到指定目的地時被出發。
開啟交換機確認
開啟消息不可達回退
配置文件不開啟 這兩項
spring:rabbitmq:
# 交換機進行確認消息publisher-confirm-type: correlated
# 交換機不可以路由消息時 消息回退publisher-returns: true
配置類聲明
package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 發布確認**/
@Configuration
public class ConfirmConfig {//交換機public static final String CONFIRM_EXCHANGE = "confirm.exchange";//隊列public static final String CONFIRM_QUEUE = "confirm.queue";//routing-keypublic static final String CONFIRM_ROUTING_KEY = "key1";//聲明 交換機@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE);}//聲明 隊列@Bean("confrimQueue")public Queue confrimQueue(){return QueueBuilder.durable(CONFIRM_QUEUE).build();}//綁定@Beanpublic Binding queueBindingExchange(@Qualifier("confrimQueue") Queue confrimQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){return BindingBuilder.bind(confrimQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}
消費者:
package com.esint.controller;import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;//發消息@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message){//普通發送模式 無是否發送成功回調CorrelationData correlationData = new CorrelationData("101");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message);log.info("發送消息為:{}",message);}}
消費者:
package com.esint.consumer;import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void receiveConfrimMessage(Message message){log.info("接收到的消息為:" + new String(message.getBody()));}
}
核心修改的重寫的類:
package com.esint.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{/*** 注入:本類為實現了RabbitTemplate的內部類,所以在RabbitTemplate發送消息的時候不會調用到我們自己的實現,所以需要把這個類在注入到RabbitTemplate中。*/@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** RabbitTemplate.ConfirmCallback 是在【生產者】發送【交換機】 交換機的感知回應調去方法** 交換機確認回調方法* 1.交換機接收消息成功* 參數1 correlationData保存了回調消息ID和相關信息* 參數2 交換機收到消息 true* 參數3 失敗原因 為 null* 2.交換機接受消息失敗* 參數1 correlationData保存了回調消息ID和相關信息* 參數2 交換機收到消息 false* 參數3 失敗原因* @param correlationData 來源于生產者 所以在發消息時 需要帶有這個屬性* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if(ack){log.info("交換機確認收到 ID:{}" ,id);}else {log.info("交換機未收到ID:{}的消息,原因:{}",id,cause);//這里實現發送交換機失敗的存儲邏輯}}/*** 回退消息* 在消息傳遞過程不可達目標地時 返還給生產者 只有消息不可達,才會執行這個方法** @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("消息{} 被交換機{} 退回,原因:{} 路由:{}",new String(message.getBody()),exchange,replyText,routingKey);//這里實現發送消息不到達的邏輯 發送消息無法被邏輯 默認就會被交換機丟掉 這里重寫后 可以在這里處理存儲}
}
故意發送一個錯誤路由時:
消息能發出 交換機有確認 消息可以被回退