目錄
- 一、消息不丟失
- 1.消息確認
- 2.消息確認業務封裝
- 2.1 發送確認消息測試
- 2.2 消息發送失敗,設置重發機制
一、消息不丟失
消息的不丟失,在MQ角度考慮,一般有三種途徑:
1,生產者不丟數據
2,MQ服務器不丟數據
3,消費者不丟數據
保證消息不丟失有兩種實現方式:
1,開啟事務模式
2,消息確認模式
說明:開啟事務會大幅降低消息發送及接收效率,使用的相對較少,因此我們生產環境一般都采取消息確認模式,以下我們只是講解消息確認模式
1.消息確認
消息持久化
如果希望RabbitMQ重啟之后消息不丟失,那么需要對以下3種實體均配置持久化
Exchange
聲明exchange時設置持久化(durable = true)并且不自動刪除(autoDelete = false)
Queue
聲明queue時設置持久化(durable = true)并且不自動刪除(autoDelete = false)
message
發送消息時通過設置deliveryMode=2持久化消息
處理消息隊列丟數據的情況,一般是開啟持久化磁盤的配置。這個持久化配置可以和confirm機制配合使用,你可以在消息持久化磁盤后,再給生產者發送一個Ack信號。這樣,如果消息持久化磁盤之前,rabbitMQ陣亡了,那么生產者收不到Ack信號,生產者會自動重發。那么如何持久化呢,其實也很容易,就下面兩步:
1、將queue的持久化標識durable設置為true,則代表是一個持久的隊列
2、發送消息的時候將deliveryMode=2
這樣設置以后,rabbitMQ就算掛了,重啟后也能恢復數據
發送確認
有時,業務處理成功,消息也發了,但是我們并不知道消息是否成功到達了rabbitmq,如果由于網絡等原因導致業務成功而消息發送失敗,那么發送方將出現不一致的問題,此時可以使用rabbitmq的發送確認功能,即要求rabbitmq顯式告知我們消息是否已成功發送。
手動消費確認
有時,消息被正確投遞到消費方,但是消費方處理失敗,那么便會出現消費方的不一致問題。比如:訂單已創建的消息發送到用戶積分子系統中用于增加用戶積分,但是積分消費方處理卻都失敗了,用戶就會問:我購買了東西為什么積分并沒有增加呢?
要解決這個問題,需要引入消費方確認,即只有消息被成功處理之后才告知rabbitmq以ack,否則告知rabbitmq以nack
2.消息確認業務封裝
service-mq修改配置
開啟rabbitmq消息確認配置,在common的配置文件中都已經配置好了!
spring:rabbitmq:host: 192.168.121.140port: 5672username: adminpassword: adminpublisher-confirms-type: correlated #交換機的確認publisher-returns: true #隊列的確認listener:simple:acknowledge-mode: manual #默認情況下消息消費者是自動確認消息的,如果要手動確認消息則需要修改確認模式為manualprefetch: 1 # 消費者每次從隊列獲取的消息數量。此屬性當不設置時為:輪詢分發,設置為1為:公平分發
搭建rabbit-util模塊
由于消息隊列是公共模塊,我們把mq的相關業務封裝到該模塊,其他service微服務模塊都可能使用,因此我們把他封裝到一個單獨的模塊,需要使用mq的模塊直接引用該模塊即可
搭建方式如:
pom.xml
<dependencies><!--rabbitmq消息隊列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 協議--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies>
4.2.4 封裝發送端消息確認
/*** @Description 消息發送確認* <p>* ConfirmCallback 只確認消息是否正確到達 Exchange 中* ReturnCallback 消息沒有正確到達隊列時觸發回調,如果正確到達隊列不執行* <p>* 1. 如果消息沒有到exchange,則confirm回調,ack=false* 2. 如果消息到達exchange,則confirm回調,ack=true* 3. exchange到queue成功,則不回調return* 4. exchange到queue失敗,則回調return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修飾一個非靜態的void()方法,在服務器加載Servlet的時候運行,并且只會被服務器執行一次在構造函數之后執行,init()方法之前執行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this); //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息發送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息發送失敗:" + cause + " 數據:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化對象輸出System.out.println("消息主體: " + new String(message.getBody()));System.out.println("應答碼: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交換器 exchange : " + exchange);System.out.println("消息使用的路由鍵 routing : " + routingKey);}}
封裝消息發送
@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送消息* @param exchange 交換機* @param routingKey 路由鍵* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}
2.1 發送確認消息測試
消息發送端
@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息發送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "來人了,開始接客吧!");return Result.ok();}
}
消息接收端
@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// false 確認一個消息,true 批量確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
測試:http://localhost:8282/mq/sendConfirm
2.2 消息發送失敗,設置重發機制
實現思路:借助redis來實現重發機制
模塊中添加依賴
<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>
自定義一個實體類來接收消息
@Data
public class GmallCorrelationData extends CorrelationData {// 消息主體private Object message;// 交換機private String exchange;// 路由鍵private String routingKey;// 重試次數private int retryCount = 0;// 消息類型 是否是延遲消息private boolean isDelay = false;// 延遲時間private int delayTime = 10;
}
修改發送方法
// 封裝一個發送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){// 將發送的消息 賦值到 自定義的實體類GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();// 聲明一個correlationId的變量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);// 發送消息的時候,將這個gmallCorrelationData 對象放入緩存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);// 調用發送消息方法//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);// 默認返回truereturn true;
}發送失敗調用重發方法 MQProducerAckConfig 類中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {// ack = true 說明消息正確發送到了交換機if (ack){System.out.println("哥們你來了.");log.info("消息發送到了交換機");}else {// 消息沒有到交換機log.info("消息沒發送到交換機");// 調用重試發送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主體: " + new String(message.getBody()));System.out.println("應答碼: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交換器 exchange : " + exchange);System.out.println("消息使用的路由鍵 routing : " + routingKey);// 獲取這個CorrelationData對象的Id spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");// 因為在發送消息的時候,已經將數據存儲到緩存,通過 correlationDataId 來獲取緩存的數據String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);// 消息沒有到隊列的時候,則會調用重試發送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);// 調用方法 gmallCorrelationData 這對象中,至少的有,交換機,路由鍵,消息等內容.this.retrySendMsg(gmallCorrelationData);
}/*** 重試發送方法* @param correlationData 父類對象 它下面還有個子類對象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {// 數據類型轉換 統一轉換為子類處理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;// 獲取到重試次數 初始值 0int retryCount = gmallCorrelationData.getRetryCount();// 判斷if (retryCount>=3){// 不需要重試了log.error("重試次數已到,發送消息失敗:"+JSON.toJSONString(gmallCorrelationData));} else {// 變量更新retryCount+=1;// 重新賦值重試次數 第一次重試 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重試次數:\t"+retryCount);// 更新緩存中的數據this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);// 調用發送消息方法 表示發送普通消息 發送消息的時候,不能調用 new RabbitService().sendMsg() 這個方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}
測試:只需修改(錯誤信息)