一、初識 MQ
傳統的單體架構,分布式架構的同步調用里,無論是方法調用,還是 OpenFeign 難免會有以下問題:
- 擴展性差(高耦合,需要依賴對應的服務,同樣的事件,不斷有新需求,這個事件的業務代碼會越來越臃腫,這些子業務都寫在一起)
- 性能下降(等待響應,最終整個業務的響應時長就是每次遠程調用的執行時長之和)
- 級聯失敗(如果是一個事務,一個服務失敗就會導致全部回滾,若是分布式事務就更加麻煩了,但其實一些行為的失敗不應該導致整體回滾)
- 服務宕機(如果服務調用者未考慮服務提供者的性能,導致提供者因為過度請求而宕機)
但如果不是很要求同步調用,其實也可以用異步調用,如果是單體架構,你可能很快能想到一個解決方案,就是阻塞隊列實現消息通知:
但是在分布式架構下,可能就需要一個中間件級別的阻塞隊列,這就是我們要學習的 Message Queue 消息隊列,簡稱 MQ,而現在流行的 MQ 還不少,在實現其基本的消息通知功能外,還有一些不錯的擴展
以 RabbitMQ 和 Kafka 為例:
RabbitMQ | Kafka | |
---|---|---|
公司/社區 | Rabbit | Apache |
開發語言 | Erlang | Scala & Java |
協議支持 | AMQP,XMPP,SMTP,STOMP | 自定義協議 |
可用性 | 高 | 高 |
單機吞吐量 | 一般 | 非常高(Kafka 亮點) |
消息延遲 | 微秒級 | 毫秒以內 |
消息可靠性 | 高 | 一般 |
消息延遲指的是,消息到隊列,并在隊列中“就緒”的時間與預期時間的差距,其實就是數據在中間件中流動的耗時,預期時間可以是現在、幾毫秒后、幾秒后、幾天后…
據統計,目前國內消息隊列使用最多的還是 RabbitMQ,再加上其各方面都比較均衡,穩定性也好,因此我們課堂上選擇 RabbitMQ 來學習。
二、RabbitMQ 安裝
Docker 安裝 RabbitMQ:
mkdir /root/mq
cd /root/mqdocker rm mq-server -f
docker rmi rabbitmq:3.8-management -f
docker volume rm mq-plugins -fdocker pull rabbitmq:3.8-management# 插件數據卷最好還是直接掛載 volume,而不是掛載我們的目錄
docker run \
--name mq-server \
-e RABBITMQ_DEFAULT_USER=xxx \
-e RABBITMQ_DEFAULT_PASS=xxx \
--hostname mq1 \
-v mq-plugins:/plugins \
-p 15672:15672 \
-p 5672:5672 \
-d rabbitmq:3.8-management
三、RabbitMQ 基本知識
(1)架構
15672:RabbitMQ 提供的管理控制臺的端口
5672:RabbitMQ 的消息發送處理接口
用戶名密碼就是安裝時,啟動容器時指定的用戶名密碼
MQ 對應的就是這里的消息代理 Broker:
RabbitMQ 詳細架構圖:
其中包含幾個概念:
publisher
:生產者,也就是發送消息的一方consumer
:消費者,也就是消費消息的一方queue
:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理exchange
:交換機,負責消息路由。生產者發送的消息由交換機決定投遞到哪個隊列。virtual host
:虛擬主機,起到數據隔離的作用。每個虛擬主機相互獨立,有各自的 exchange、queue
現在你可能只認識生產者、消費者、隊列,其他是什么呢?
其實你可以理解為 MQ 也是存儲東西的,存儲的就是消息,virtual host 就是數據庫,queue 就是表,消息就是一行數據,而 MQ 有特殊的機制,消息先通過 exchange 再決定前往哪個 queue
管理控制臺的使用就不多說了
(2)五大模式
這只是最常見的五種模式:
- 簡單模式
- 工作模式
- 發布訂閱模式
關聯交換機的隊列都能收到一份消息,廣播
- 路由模式
關聯交換機時,提供 routing key(可以是多個,隊列之間可以重復),發布消息時提供一個 routing key,由此發送給指定的隊列
值得注意的是,簡單模式和工作模式,其實也是有交換機的,任何隊列都會綁定一個默認交換機
""
,類型是 direct,routing key 為隊列的名稱
- 主題模式
路由模式的基礎上,隊列關聯交換機時 routing key 可以是帶通配符的
routing key 的單詞通過
.
分割,#
匹配 n 個單詞(n ≥ 0),*
只匹配一個單詞例如 #.red:
- 可以匹配的 routing key:p1.red、red、p2.p1.red
在發布消息時,要使用具體的 routing key,交換機發送給匹配的隊列
(3)數據隔離
- 隔離 virtual host
- 隔離用戶(賦予訪問權限)
四、RabbitMQ 基本使用 Spring AMQP
引入 RabbitMQ 相關的 SDK,可以通過創建連接 Connection、創建通道 Channel,用 Channel 進行操作,接受消息也差不多,不過多演示:
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數,分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("xx.xx.xx.xx");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("xxx");factory.setPassword("xxx");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創建通道ChannelChannel channel = connection.createChannel();// 3.創建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.發送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("發送消息成功:【" + message + "】");// 5.關閉通道和連接channel.close();connection.close();}
}
但比較麻煩,Spring AMQP 框架可以自動裝配 RabbitMQ 的操作對象 RabbitTemplate,這樣我們就可以更方便的操作 MQ,并充分發揮其特性
<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
默認包含 RabbitMQ 的實現,如果你想對接其他 AMQP 協議的 MQ,得自己實現其抽象封裝的接口
(1)發送消息
注意,下面是 Spring3 的寫法,所以會有點不一樣,可能看不懂,稍后解釋!
消息發送器封裝:
@Repository
@RequiredArgsConstructor
@Slf4j
public class RabbitMQSender {private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);}private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("處理 ack 回執失敗, {}", ex.getMessage());return null;};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即執行// setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());}/*** @param exchange 交換機* @param routingKey routing key* @param msg 消息* @param delay 延遲時間(如果是延遲交換機,delay 才有效)* @param maxRetries 最大重試機會* @param <T> 消息的對象類型*/private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info("準備發送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData = newCorrelationData();MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數器@Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c -> {if(c.isAck()) {log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason());} else {log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason());if(retryCount >= maxRetries) {log.error("次數到達上限 {}", maxRetries);return;}retryCount++;log.warn("開始第 {} 次重試", retryCount);CorrelationData cd = newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);}public void sendMessage(String exchange, String routingKey, Object msg) {send(exchange, routingKey, msg, 0, 0);}public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){send(exchange, routingKey, msg, delay, 0);}public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {send(exchange, routingKey, msg, 0, maxReties);}public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {send(exchange, routingKey, msg, delay, maxReties);}}
(2)接受消息
監聽器:
- RabbitTemplate 是可以主動獲取消息的,也可以不實時監聽,但是一般情況都是監聽,有消息就執行
- 監聽的是 queue,若 queue 不存在,就會根據注解創建一遍
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "xxx"),exchange = @Exchange(name = "xxx", delayed = "true"),key = {"xxx"}
))
public void xxx(X x) {}
(3)聲明交換機與隊列
可以通過 @Bean 創建 Bean 對象的方式去聲明,可以自行搜索,我更喜歡監聽器注解的形式,而且 Bean 的方式,可能會因為配置不完全一樣,導致其他配置類的交換機隊列無法聲明(現象如此,底層為啥我不知道)
(4)消息轉換器
消息是一個字符串,但為了滿足更多需求,需要將一個對象序列化成一個字符串,但默認的序列化實現貌似用的是 java 對象的序列化,這種方式可能得同一個程序的 java 類才能反序列化成功,所以我們應該選擇分布式的序列化方式,比如 json
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageConverterConfig {@Beanpublic MessageConverter messageConverter(){// 1. 定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);// 2. 配置自動創建消息 id,用于識別不同消息jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);return jackson2JsonMessageConverter;}
}
這里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己實現的 ObjectMapper
(5)配置文件
spring:rabbitmq:host: ${xxx.mq.host} # rabbitMQ 的 ip 地址port: ${xxx.mq.port} # 端口username: ${xxx.mq.username}password: ${xxx.mq.password}virtual-host: ${xxx.mq.virtual-host}publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true # 若是 false 則直接丟棄了,并不會發送者回執listener:simple:prefetch: 1 # 預取為一個(消費完才能拿下一個)concurrency: 2 # 消費者最少 2 個線程max-concurrency: 10 # 消費者最多 10 個線程auto-startup: true # 為 false 監聽者不會實時創建和監聽,為 true 監聽的過程中,若 queue 不存在,會再根據注解進行創建,創建后只監聽 queue,declare = "false" 才是不自動聲明default-requeue-rejected: false # 拒絕后不 requeue(成為死信,若沒有綁定死信交換機,就真的丟了)acknowledge-mode: auto # 消費者執行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack)retry: # 這個屬于 spring amqp 的 retry 機制enabled: false # 不開啟失敗重試
# initial-interval: 1000
# multiplier: 2
# max-attempts: 3
# stateless: true # true 代表沒有狀態,若有消費者包含事務,這里改為 false
五、常見問題
(1)RabbitMQ 如何保證消息可靠性
保證消息可靠性、不丟失。主要從三個層面考慮
如果報錯可以先記錄到日志中,再去修復數據(保底)
1、生產者確認機制
生產者確認機制,確保生產者的消息能到達隊列
- publisher-confirm,針對的是消息從發送者到交換機的可靠性,成功則進行下一步,失敗返回 NACK
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;@PostConstruct
public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);
}private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("處理 ack 回執失敗, {}", ex.getMessage());return null;
};private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即執行// setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};
};private CorrelationData newCorrelationData() {return new CorrelationData(UUIDUtil.uuid32());
}/*** @param exchange 交換機* @param routingKey routing key* @param msg 消息* @param delay 延遲時間(如果是延遲交換機,delay 才有效)* @param maxRetries 最大重試機會* @param <T> 消息的對象類型*/
private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){log.info("準備發送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);CorrelationData correlationData = newCorrelationData();MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {private int retryCount = 0; // 一次 send 從始至終都用的是一個 Consumer 對象,所以作用的都是同一個計數器@Overridepublic void accept(CorrelationData.Confirm confirm) {Optional.ofNullable(confirm).ifPresent(c -> {if(c.isAck()) {log.info("ACK {} 消息成功到達,{}", correlationData.getId(), c.getReason());} else {log.warn("NACK {} 消息未能到達,{}", correlationData.getId(), c.getReason());if(retryCount >= maxRetries) {log.error("次數到達上限 {}", maxRetries);return;}retryCount++;log.warn("開始第 {} 次重試", retryCount);CorrelationData cd = newCorrelationData();cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);}});}}, EXECUTOR);rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
}
Spring3 的 RabbitMQ Confirm,需要配置為 correlated,發送消息時提供 CorrelationData,也就是與消息關聯的數據,包括發送者確認時的回調方法
要想提供 Confirm 的回調辦法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 對象(新的 JUC 工具類,可以查一查如何使用)
配置后,在未來根據回調函數進行處理(當然也可以直接設置在 RabbitTemplate 對象的 ConfirmCallBack)
還可以自己實現消息的發送者重試:
- publisher-returns,針對的是消息從交換機到隊列的可靠性,成功則返回 ACK,失敗觸發 returns 的回調方法
@Component
@RequiredArgsConstructor
@Slf4j
public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {// 不存在 routing key 對應的隊列,那在我看來轉發到零個是合理的現象,但在這里也認為是路由失敗(MQ 認為消息一定至少要進入一個隊列,之后才能被處理,這就是可靠性)(反正就是回執了,你愛咋處理是你自己的事情)@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 可能一些版本的 mq 會因為是延時交換機,導致發送者回執,只要沒有 NACK 這種情況其實并不是不可靠(其實我也不知道有沒有版本會忽略)// 但是其實不忽略也不錯,畢竟者本來就是特殊情況,一般交換機是不存儲的,但是這個臨時存儲消息// 但這樣也就代表了,延時后消息路由失敗是沒法再次處理的(因為我們交給延時交換機后就不管了,可靠性有 mq 自己保持)MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();// 這里的 message 并不是原本的 message,是額外的封裝,x-delay 在 publish-returns 里面封裝到 receiveDelay 里了Integer delay = messageProperties.getReceivedDelay();// 如果不是延時交換機,卻設置了 delay 大于 0,是不會延時的,所以是其他原因導致的(以防萬一把消息記錄到日志里)if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {log.info("交換機 {}, 路由鍵 {} 消息 {} 延遲 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));return;}log.warn("publisher-returns 發送者回執(應答碼{},應答內容{})(消息 {} 成功到達交換機 {},但路由失敗,路由鍵為 {})",returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}
RabbitMQSender:
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");private final RabbitTemplate rabbitTemplate;private final PublisherReturnsCallBack publisherReturnsCallBack;@PostConstruct
public void init() {rabbitTemplate.setTaskExecutor(EXECUTOR);// 設置統一的 publisher-returns(confirm 也可以設置統一的,但最好還是在發送時設置在 future 里)// rabbitTemplate 的 publisher-returns 同一時間只能存在一個// 因為 publisher confirm 后,其實 exchange 有沒有轉發成功,publisher 沒必要每次發送都關注這個 exchange 的內部職責,更多的是“系統與 MQ 去約定”rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
}
同理你也可以按照自己的想法進行重試…
在測試練習階段里,這個過程是異步回調的,如果是單元測試,發送完消息進程就結束了,可能就沒回調,程序就結束了,自然就看不到回調時的日志
如果既沒有 ACK 也沒有 NACK,也沒有發布者回執,那就相當于這個消息銷聲匿跡了,沒有任何的回應,那么就會拋出異常,我們可以處理這個異常,比如打印日志、重發之類的…
private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {log.error("處理 ack 回執失敗, {}", ex.getMessage());return null;
};
2、持久化
消息隊列的數據持久化,確保消息未消費前在隊列中不會丟失,其中的交換機、隊列、和消息都要做持久化
默認都是持久化的
3、消費者確認
隊列的消息出隊列,并不會立即刪除,而是等待消費者返回 ACK 或者 NACK
消費者要什么時候發送 ACK 呢?
- 1)RabbitMQ投遞消息給消費者
- 2)消費者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費者宕機,消息尚未處理
如果出現這種場景,就是不可靠的,所以應該是消息處理后,再發送 ACK
Spring AMQP 有三種消費者確認模式:
- manual,手段 ack,自己用 rabbitTemplate 去發送 ACK/NACK(這個比較麻煩,不用 RabbitListener 接受消息才必須用這個)
- auto,配合 RabbitListener 注解,代碼若出現異常,NACK,成功則 ACK
- none,獲得消息后直接 ACK,無論是否執行成功
出現 NACK 后要如何處理(此過程還在我們的服務器):
- 拒絕(默認)
- 重新入隊列
- 返回 ACK,消費者重新發布消息指定的交換機
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageRecovererConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成為死信(默認)
// return new ImmediateRequeueMessageRecoverer(); // nack、requeue
// return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、發送給指定的交換機,confirm 機制需要設置到 rabbitTemplate 里}}
Spring 提供的 retry 機制,在消費者出現異常時利用本地重試,而不是無限制的requeue到mq隊列。
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 消費者執行成功 ack、異常 nack(manual 為手動、none 代表無論如何都是 ack)retry: # 這個屬于 spring amqp 的 retry 機制enabled: false # 不開啟失敗重試initial-interval: 1000 # 第一次重試時間間隔multiplier: 3 # 每次重試間隔的倍數max-attempts: 4 # 最大接受次數stateless: true # true 代表沒有狀態,若有消費者包含事務,這里改為 false
解釋:第一次失敗,一秒后重試、第二次失敗,三秒后重試,第三次失敗,九秒后重試,第四次失敗就沒機會了(SpringAMQP會拋出異常AmqpRejectAndDontRequeueException)
失敗之后根據對應的處理策略進行處理
(2)死信交換機
消息過期、消息執行失敗并且不重試也不重新入隊列,堆積過多等情況,消息會成為死信,若隊列綁定死信交換機,則轉發給死信交換機,若沒有則直接丟棄
隊列1 -> 死信交換機 -> 隊列2,這個過程是消息隊列內部保證的可靠性,消息也沒有包含原發送者的信息,甚至連接已經斷開了,所以沒有 publisher-confirm 也沒有 publisher-returns
這個機制和 republish 有點像,但是有本質的區別,republish 是消費者重發,而這里是隊列將死信轉發給死信交換機
死信的情況:
- nack && requeue == false
- 超時未消費
- 隊列滿了,由于隊列的特性,隊列頭會先成為死信
(3)延遲功能如何實現
剛才提到死信的誕生可能是超時未消費,那么其實這個點也可以簡單的實現一個延遲隊列:
隊列為一個不被監聽的專門用來延遲消息發送的緩沖帶,其死信交換機才是目標交換機,
message.getMessageProperties().setExpiration("1000");
設置的是過期時間,其本意并不是延遲,是可以實現延遲~
另外,隊列本身也能設置 ttl 過期時間,但并不是隊列的過期時間(顯然不合理,截止后無論啥都丟了,冤不冤啊,至少我想不到這種場景),而是隊列中的消息存活的最大時間,消息的過期時間和這個取一個最小值才是真實的過期時間
值得注意的是,雖然能實現延時消息的功能,但是
- 實現復雜
- 延遲可能不準確,因為隊列的特性,如果隊列頭未出隊列,哪怕其后者出現死信,也只能乖乖等前面的先出去之后才能前往死信交換機(例如消息的 ttl 分別為 9s、3s、1s,最終三個消息會被同時轉發,因為“最長壽的”排在了前面)
這種方式的順序優先級大于時間優先級
而 RabbitMQ 也提供了一個插件,叫 DelayExchange 延時交換機,專門用來實現延時功能
Scheduling Messages with RabbitMQ | RabbitMQ
- 請自行上網下載
延時交換機的聲明:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延遲消息:{}", msg);
}
延時消息的發送:
private MessagePostProcessor delayMessagePostProcessor(long delay) {return message -> {// 小于 0 也是立即執行message.getMessageProperties().setDelay((int) Math.max(delay, 0));return message;};
};
這里設置的是 Delay,不是過期時間,哪怕超過了時間也不叫做死信
期間一直存在延時交換機的硬存里,延遲消息插件內部會維護一個本地數據庫表,同時使用 Elang Timers 功能實現計時。如果消息的延遲時間設置較長,可能會導致堆積的延遲消息非常多,會帶來較大的CPU開銷,同時延遲消息的時間會存在誤差。
(4)消息堆積如何解決
死信的成因還可能是堆疊過多
我在實際的開發中,沒遇到過這種情況,不過,如果發生了堆積的問題,解決方案也所有很多的
- 提高消費者的消費能力 ,可以使用多線程消費任務
- 增加更多消費者,提高消費速度,使用工作隊列模式, 設置多個消費者消費消費同一個隊列中的消息
- 擴大隊列容積,提高堆積上限
但是,RabbitMQ 隊列占的是內存,間接性的落盤,提高上限最終的結果很有可能就是反復落庫,特別不穩定,且并沒有解決消息堆積過多的問題
我們可以使用 RabbitMQ 惰性隊列,惰性隊列的好處主要是
- 接收到消息后直接存入磁盤而非內存,雖然慢,但沒有間歇性的 page-out,性能比較穩定
- 消費者要消費消息時才會從磁盤中讀取并加載到內存,正常消費后就刪除了
- 基于磁盤存儲,消息上限高,支持數百萬條的消息存儲
聲明方式:
而要設置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可。可以通過命令行將一個運行中的隊列修改為惰性隊列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一個策略Lazy
:策略名稱,可以自定義"^lazy-queue$"
:用正則表達式匹配隊列的名字'{"queue-mode":"lazy"}'
:設置隊列模式為lazy模式--apply-to queues
:策略的作用對象,是所有的隊列
- x-queue-mode 參數的值為 lazy
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "xxx"),value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),key = "xxx"
))
交換機、隊列擴展屬性叫參數,消息的拓展屬性叫頭部,擴展屬性一般都以 x- 開頭(extra)
消息堆積問題的解決方案?
- 隊列上綁定多個消費者,提高消費速度
- 使用惰性隊列,可以再mq中保存更多消息
惰性隊列的優點有哪些?
- 基于磁盤存儲,消息上限高
- 沒有間歇性的 page-out,性能比較穩定
惰性隊列的缺點有哪些?
- 基于磁盤存儲,消息時效性會降低
- 性能受限于磁盤的IO
(5)高可用如何保證
RabbitMQ 在服務大規模項目時,一般情況下不會像數據庫那樣存儲的瓶頸,用惰性隊列已經是很頂天了的,其特性和用途不會有太極端的存儲壓力
更多的是在并發情況下,處理消息的能力有瓶頸,可能出現節點宕機的情況,而避免單節點宕機,數據丟失、無法提供服務等問題需要解決,也就是需要保證高可用性
Erlang 是一種面向并發的語言,天然支持集群模式,RabbitMQ 的集群有兩種模式:
- 普通集群:是一種分布式集群,將隊列分散到集群的各個節點,從而提高整個集群的并發能力
- 鏡像集群:是一種主從集群,在普通集群的基礎上,添加了主從備份的功能,提高集群的數據可用性
鏡像集群雖然支持主從,但主從同步并不是強一致的,某些情況下可能有數據丟失的風險(雖然重啟能解決,但那不是強一致,而是最終一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁隊列來代替鏡像集群,底層采用 Raft 協議確保主從的數據一致性
1、普通集群
各個節點之間,實時同步 MQ 元數據(一些靜態的共享的數據):
- 交換機的信息
- 隊列的信息
但不包括隊列中的消息(動態的數據不同步)
監聽隊列的時候,如果監聽的節點不存在該隊列(只是知道元數據),當前節點會訪問隊列所在的節點,該節點返回數據到當前節點并返回給監聽者
隊列所在節點宕機,隊列中的消息就會“丟失”(是在重啟之前,這個消息就消失無法被處理的意思)
如何部署,上網搜搜就行
2、鏡像集群
各個節點之間,實時同步 MQ 元數據(一些靜態的共享的數據):
- 交換機的信息
- 隊列的信息
本質是主從模式,創建隊列的節點為主節點,其他節點為鏡像節點,隊列中的消息會從主節點備份到鏡像節點中
注意
- 像 Redis 那樣的主從集群,同步都是全部同步來著
- 但 RabbitMQ 集群的主從模式比較特別,他的粒度是隊列,而不是全部
也就是說,一個隊列的主節點,可能是另一個隊列的鏡像節點,所以分析某個場景的時候,要確認是哪個隊列,單獨進行觀察分析討論
- 不同隊列之間只有交互,不會相互影響數據同步
針對某一個隊列,所有寫操作都在主節點完成,然后同步給鏡像節點,讀操作任何一個都 ok
主節點宕機,鏡像節成為新的主節點
鏡像集群有三種模式:
- exactly 準確模式,指定副本數 count = 主節點數 1 + 鏡像節點數,集群會盡可能的維護這個數值,如果鏡像節點出現故障,就在另一個節點上創建鏡像,比較建議這種模式,可以設置為 N/2 + 1
- all 全部模式,count = N,主節點外全部都是鏡像節點
- nodes 模式,指定鏡像節點名稱列表,隨機一個作為主節點,如果列表里的節點都不存在或不可用,則創建隊列時的節點作為主節點,之后訪問集群,列表中的節點若存在才會創建鏡像節點
沒有鏡像節點其實就相當于普通模式了
如何配置上網搜搜就行,比較麻煩,需要設置策略,以及匹配的隊列(不同隊列分開來討論,可以設置不同的策略)
3、仲裁隊列
RabbitMQ 3.8 以后,推出了新的功能仲裁隊列來
- 代替鏡像集群,都是主從模式,支持主從數據同步,默認是 exactly count = 5
- 約定大于配置,使用非常簡單沒有復雜的配置,隊列的類型選擇 Quorum 即可
- 底層采用 Raft 協議確保主從的數據強一致性
Spring Boot 配置:
仲裁隊列聲明:
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "xxx"),value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),key = "xxx"
))
隊列不聲明默認就是普通集群,這里聲明的仲裁隊列也只是針對一個隊列
(6)消息重復消費問題
在保證MQ消息不重復的情況下,MQ 的一條消息被消費者消費了多次
消費者消費消息成功后,在給MQ發送消息確認的時候出現了網絡異常或者是服務宕機,MQ 遲遲沒有接收到 ACK 也沒有 NACK,此時 MQ 不會將發送的消息刪除,按兵不動,消費者重新監聽或者有其他消費者的時候,交由它消費,而這條消息如果在之前就消費過了的話,則會導致重復消費
解決方案:
- 消息消費的業務本身具有冪等性,再次處理相同消息時不會產生副作用,一些時候可能需要用到分布式鎖去維護冪等性
- 比如一個訂單的狀態設置為結束,那重復消費的結果一致
- 記錄消息的唯一標識,如果消費過了的,則不再消費
- 消費成功將 id 緩存起來,消費時查詢緩存里是否有這條消息
- 設置允許的緩存時間時,你不必想得太極端,一般很快就有消費者繼續監聽拿到消息,哪怕真有那個情況,這里帶來的損失大概率可以忽略不記了,一切要結合實際情況!
有時候兩種方案沒有嚴格的界定