Mybatis框架相關問題
RabbitMQ相關問題
- 一、RabbitMQ的核心組件和工作原理?
- 二、如何保證消息可靠投遞不丟失的?
- 三、RabbitMQ如何保證消息的冪等性?
- 四、什么是死信隊列?死信隊列是如何導致的?
- 五、RabbitMQ死信隊列是如何導致的?
- 六、什么是延遲隊列?RabbitMQ 如何實現延遲隊列?
- 七、RabbitMQ的高可用機制有了解嘛?
- 八、如果有百萬消息堆積在MQ中,如何解決?
- 九、如何解決RabbitMQ中因為消息堆積而導致的消息過期失效的問題?
一、RabbitMQ的核心組件和工作原理?
交換機有四類:
1、Fanout Exchange
投遞到所有綁定的隊列,不需要規則,不需要匹配,相當于廣播、群發;
2、Direct Exchange
根據路由鍵精確匹配進行路由消息隊列;
3、Topic Exchange
通配符匹配,相當于模糊匹配;
# 匹配多個單詞,* 匹配一個單詞,用 . 隔開的為一個單詞:
4、Headers Exchange
基于消息內容中的headers
屬性進行匹配;
二、如何保證消息可靠投遞不丟失的?
要確保一下四個步驟全部成功;
① 代表消息從生產者發送到Exchange
;
② 代表消息從Exchange
路由到Queue
;
③ 代表消息在Queue
中存儲;
④ 代表消費者監聽Queue
并消費消息;
-
① 代表消息從生產者發送到Exchange;
- 方案:開啟
Confirm
確認模式; - 消息未投遞成功,采取補償措施,或是記錄日志,或是發送通知讓負責人知道;
// 實現接口 implements RabbitTemplate.ConfirmCallback// 重寫方法 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause)
- 方案:開啟
-
② 代表消息從Exchange路由到Queue;
- 方案:開啟
Return
返回模式; - 消息未投遞成功,采取補償措施,或是記錄日志,或是發送通知讓負責人知道;
// 實現接口 implements RabbitTemplate.ReturnsCallback/*** 消息從 交換機 --> 到 --> 隊列,如果失敗了,就會回調該方法* (失敗了才觸發該方法,成功是不會觸發該方法的)** 比如說磁盤滿** @param returned*/ @Override public void returnedMessage(ReturnedMessage returned)
- 方案:開啟
-
③ 代表消息在Queue中存儲;
- 方案:
1、交換機持久化
2、隊列持久化/*** 聲明創建一個FanoutExchange交換機** @return*/@Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true) // 持久化.build();}
3、消息持久化/*** 聲明創建一個隊列** @return*/@Beanpublic Queue directQueue() {return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();}
4、集群高可用部署//消息體Message message = MessageBuilder.withBody(json.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化.build();
- 方案:
-
④ 代表消費者監聽Queue并消費消息;
- 方案:手動確認消息
@Slf4j @Component public class MyRabbitListener {@RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE_NAME})public void onMessage(String msg, @Headers Map<String, Object> header, Message message, Channel channel) {try {System.out.println("[RabbitListener]接收到的消息: " + msg);//處理業務//業務處理成功,手動確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//業務處理失效,不確認消息,并且重新入隊,這樣又可以重新消費// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒絕消息// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();}} }
三、RabbitMQ如何保證消息的冪等性?
冪等性:多次操作,重復操作,對系統不會造成影響; 比如:消息重復發送兩次或多次,消息重復消費兩次或多次;
保證MQ的冪等性,只要保證消費者不會重復消費相同的消息即可;
- 方案:
- 生產者發送消息時,為每條消息設置一個全局唯一的id;
- 消費者消費消息時,使用 redis 的
setnx
命令:SET id 1 NX
,若返回OK
,說明該消息之前沒有消費過,正常消費;若返回nil
,說明該消息之前已消費過,那就不用處理;
四、什么是死信隊列?死信隊列是如何導致的?
死信隊列即DLX,全稱為Dead-Letter-Exchange
,翻譯為:死信交換機。當一個消息在隊列中變成死信 (dead message) 之后,它能被重新發送到另外一個交換機中,這個交換機就是DLX,綁定到DLX的隊列就稱為死信隊列;
死信隊列本身也是一個普通的消息隊列,可以通過設置一些參數將其設置為死信隊列;
死信隊列是一個用于存放無法被消費的消息的隊列,這些消息被稱為死信,死信隊列可以避免消息一直被消費卻無法消費成功的情況;
五、RabbitMQ死信隊列是如何導致的?
RabbitMQ導致死信的幾種原因:
1、消息被拒
- Basic.Nack 且 requeue = false
- Basic.Reject 且 requeue = false
2、消息 TTL 過期;
3、隊列達到最大長度,即隊列滿了;
六、什么是延遲隊列?RabbitMQ 如何實現延遲隊列?
- 延遲隊列是用來存放“延遲消息”的隊列;
- 所謂“延遲消息”是指消息被發送到隊列以后,并不想讓消費者立刻拿到消息,而是等待指定的時間后,消費者才能拿到這個消息;
RabbitMQ 本身是沒有直接可以使用的延遲隊列;要實現延遲隊列,一般有兩種方式:
-
使用TTL消息過期 + DLX死信隊列來實現;
-
使用使用
rabbitmq-delayed-message-exchange
延遲插件來實現;
七、RabbitMQ的高可用機制有了解嘛?
RabbitMQ有三種模式:
1)單機模式
2)普通集群模式
3)鏡像集群模式 (高可用)
八、如果有百萬消息堆積在MQ中,如何解決?
- 原因:生產和消費失衡;
- 解決:
1、提前預防;
- 流量預估,預估每秒產生的消息數量;
- 做好壓測,壓測系統的消費能力;
- 做好預案,準備好可能發生的超出預估的突發情況;
2、應急處理;
- 消費端
- 臨時增加消費者數量:增加更多的消費者實例、增加消費者線程數量;
- 臨時增加消費者把消息寫入其他設備,后續處理;
- 生產端
- 可以適當限流,降低消息發送速度;
3、事后優化;
- 優化消費端業務處理邏輯,提升業務執行效率,減少io操作,減少數據庫操作,減少網絡連接,業務異步處理等;
九、如何解決RabbitMQ中因為消息堆積而導致的消息過期失效的問題?
消息堆積,導致消息大量存儲在消息隊列中得不到消費,而消息又設置了TTL過期時間,當到達過期時間時,消息被過期丟棄;
解決:
1、讓消息不要被堆積;
2、不設置TTL過期時間或者增加過期時間時長;
3、設置死信隊列
4、編寫臨時程序補發消息;