速學 RocketMQ

目錄

本地啟動&測試&可視化

核心概念

集群

主從 集群

Dledger 集群?

總結

客戶端消息確認機制

廣播模式

消息過濾機制

順序消息機制

延遲消息與批量消息

事務消息機制

ACL權限控制體系

RocketMQ客戶端注意事項

消息的 ID、Key、Tag

最佳實踐

消費者端進行冪等控制

關注錯誤消息重試

手動處理死信隊列

MQ如何保證消息不丟失

1、哪些環節可能丟消息

2、生產者發送消息如何保證不丟

3、Borker寫入數據如何保證不丟失

4、Broker主從同步如何保證不丟失

5、消費者消費消息如何不丟失

6、如果MQ服務全部掛了,如何保證不丟失

7、MQ消息零丟失方案總結

MQ如何保證消息的順序性

MQ如何保證消息冪等性

MQ如何快速處理積壓的消息


本地啟動&測試&可視化

官網文檔及下載地址:https://rocketmq.apache.org/zh/docs/

RocketMQ 也是Java開發的程序,所有需要有JDK環境

1、可以在 bin/runserver.sh 文件中修改初始堆內存、最大堆內存、年輕代初始內存大小,默認都是2g,太大了我的電腦吃不消,調小一點。注意jdk8及之前的是在圖片中的 if 修改,后面的版本要在 else 中修改

2、在bin/runbroker.sh中修改

3、配置RocketMQ環境變量:

# RocketMQ 相關環境變量配置,:$PATH注意這里要追加到現有的 PATH 而不是覆蓋,否則所有基礎命令都會失效
export ROCKETMQ_HOME=/home/app/rocketmq/rocketmq-5.3.1-bin
export PATH=$ROCKETMQ_HOME/bin:$PATH
export NAMESRV_ADDR='localhost:9876'

刷新?source /etc/profile

4、啟動NameServer,進入RocketMQ的bin目錄運行

?nohup bin/mqnamesrv &

cat nohup.out? ?查看輸出日志

出現The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876 表示成功

5、啟動Broker

在/conf/broker.conf后面添加:
brokerIP1 = 服務器ip
autoCreateTopicEnable=true
namesrvAdd=localhost:9876? #(NameServer的地址)

啟動命令:nohup mqbroker -c ../conf/broker.conf &

如果報錯:Unrecognized VM option 'UseBiasedLocking' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
這個是jdk版本已經不再支持UseBiasedLocking虛擬機,只需要在runbroker 將包含UseBiasedLocking這個一行注釋掉即可

查看命令:cat nohup.out 出現The broker[iZ7xvhygbzymeb78axyu9pZ, 172.18.226.151:10911] boot success. serializeType=JSON and name server is localhost:9876 表示成功

6、?發送測試消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

7、?消費測試消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

8、RocketMQ0-Dashboard 可視化頁面
docker run -d --name rocketmq-dashboard \
? --network docker-net \
? -p 8080:8080 \
? --restart=always \
? -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv_dj:9876" \
? apacherocketmq/rocketmq-dashboard

然后將在容器中的jar拷貝出來:
docker cp abc123:/rocketmq-dashboard.jar /home/app/rocketmq
啟動:
nohup java --add-opens java.base/java.nio=ALL-UNNAMED \
? ? ? ? ? ?--add-opens java.base/sun.nio.ch=ALL-UNNAMED \
? ? ? ? ? ?-Drocketmq.namesrv.addr=localhost:9876 \
? ? ? ? ? ?-jar rocketmq-dashboard.jar > rocketmq-dashboard.log 2>&1 &

訪問ip:8080 即可

核心概念

Topic:主題是消息的一級分類單元,生產者將消息發送到特定的主題,而消費者訂閱該主題以接收消息。一個Topic可以有多個MessageQueue。
MessageQueue(消息隊列):每個Topic可以包含一個或多個消息隊列。RocketMQ通過在不同的消息隊列之間分配消息來實現負載均衡和高吞吐量。MessageQueue專屬于某個Topic:一個MessageQueue僅存儲其所屬Topic的消息,不會跨Topic存儲消息
Consumer Group(消費者組):一組邏輯上相同的消費者構成一個消費者組。同一個消費者組內的所有消費者共同消費來自某個Topic的所有消息,但每條消息只會被該組中的一個消費者處理。
Consumer Instance(消費者實例):這是實際運行的消費者程序的一個實例(代碼中 new 出來就算一個實例)。一個消費者組可以包含多個消費者實例。


關系與工作機制
消息隊列與消費者的對應關系:
在RocketMQ中,對于任何一個給定的消費者組,它所訂閱的Topic下的每個MessageQueue最多只能由該組內的一個Consumer實例進行消費。這種一對一的關系確保了每條消息只會在消費者組內被處理一次,從而避免重復消費的問題。
負載均衡機制:
當有多個消費者實例存在于同一消費者組時,RocketMQ會自動在這幾個消費者實例之間平均分配Topic下的所有MessageQueue。
如果增加的消費者實例數量超過了Topic下MessageQueue的數量,則多余的消費者實例將處于空閑狀態,因為沒有額外的MessageQueue供它們消費。
實際應用場景示例
假設有一個名為OrderTopic的主題,它包含4個MessageQueue,并且有兩個消費者實例C1和C2屬于同一個消費者組G1:

RocketMQ會將這4個MessageQueue平均分配給C1和C2,例如,C1負責消費兩個MessageQueue,C2也負責另外兩個MessageQueue。
如果再啟動第三個消費者實例C3加入到消費者組G1,由于只有4個MessageQueue,因此其中一個消費者實例將會閑置,不會分配到任何MessageQueue。
反之,如果減少至只有一個消費者實例C1,那么這個實例將會負責消費所有的4個MessageQueue。

集群

主從 集群


修改配置就可以搭建了對應的集群了,slave負責給broker做備份

優點:性能快

缺點,當broker掛了之后 slave 不會自動升級為broker,可用性沒有Dledger集群高

如果業務對高可用要求沒那么高,對性能要求較高,可選用該種方式搭建集群。

Dledger 集群?

卻點:性能沒有上一種方式的快

優點,當broker(leader)掛了之后,其他的broker會自動選舉leader,高可用

如果業務對性能要求沒那么高,對高可用要求較高,可選用該種方式搭建集群。

總結

broker要有多數存活才能從新選舉為leader,例如有5個broker,但是只有1個存活,這樣就不能選舉,而nameserver只要有一個存活就可以正常工作

1、nameServer 命名服務:
nameServer不依賴于任何其他的服務,自己獨立就能啟動。并且,不管是broker還是客戶端,都需要明確指定nameServer的服務地址。以一臺電腦為例,nameServer可以理解為是整
個RocketMQ的CPU,整個RocketMQ集群都要在CPU的協調下才能正常工作。
2、broker 核心服務:
broker是RocketMQ中最為嬌貴的一個組件。RockeMQ提供了各種各樣的重要設計來保護broker的安全。同時broker也是RocketMQ中配置最為繁瑣的部分。同樣以電腦為例,broker就是整個
RocketMQ中的硬盤、顯卡這一類的核心硬件。RocketMQ最核心的消息存儲、傳遞、查詢等功能都要由broker提供。
3、client 客戶端
Client包括消息生產者和消息消費者。同樣以電腦為例,Client可以認為是RocketMQ中的鍵盤、鼠標、顯示器這類的輸入輸出設備。鼠標、鍵盤輸入的數據需要傳輸到硬盤、顯卡等硬件才能進行處理。但是鍵盤、鼠標是不能
直接將數據輸入到硬盤、顯卡的,這就需要CPU進行協調。通過CPU,鼠標、鍵盤就可以將輸入的數據最終傳輸到核心的硬件設備中。經過硬件設備處理完成后,再通過CPU協調,顯示器這樣的輸出設備就能最終從核心硬件設備中獲取到輸出的數據。

topic是邏輯結構,而真正的物理存儲結構是MessageQueue隊列?
最小位點是當前隊列中最開始的消息編號,如果刪掉了前面的消息,那么最小位點就往前移了
最大位點最小位點 + 消息數(前提是位點連續,消息被刪除或者由于消費失敗而進行重試時,可能會導致位點不連續

消息模型圖:

客戶端消息確認機制

消息確認機制:生產者向broker發送一個消息,broker會個生產者響應一個確認消息

單向發送:生產者只負責發送消息,不關心是否發送成功,也不等待服務器響應。這種方式適用于對消息可靠性要求不高的場景。

?同步發送確認
生產者發送消息后,?同步等待 Broker 的響應。若 Broker 返回?SEND_OK?狀態碼(如?SendStatus.SEND_OK),說明消息已持久化到 CommitLog 并被主從同步。若超時未收到響應或返回錯誤,生產者會觸發重試(默認重試 2 次)。

// 示例:生產者同步發送消息(網頁4)
SendResult sendResult = defaultMQProducer.send(message);
System.out.println("發送結果:" + sendResult.getSendStatus());

優點消息傳遞可靠性很強:適用于需嚴格保證消息送達的場景(如訂單創建)。

缺點:需要阻塞性的等待broker的響應,吞吐量受限,頻繁的同步等待限制并發性能。

適用場景:金融交易、關鍵業務通知(如支付結果)

?異步發送確認
生產者通過回調函數異步處理 Broker 的響應,適用于高吞吐場景。發送線程不阻塞,通過?SendCallback?接收成功或失敗通知:

producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) { /* 處理成功邏輯 */ }@Overridepublic void onException(Throwable e) { /* 處理失敗并重試 */ }
});

優點低延遲:響應時間短,提升系統整體性能,消息傳遞可靠性相對較強

缺點:?業務層處理可能的重復或丟失消息(如冪等設計),producer主線程需要一直開啟,比較消耗服務器資源

適用場景:日志采集、實時監控等允許少量數據丟失的高吞吐場景

廣播模式

廣播模式和集群模式是RocketMQ的消費者端處理消息最基本的兩種模式。
集群模式:一條消息只會被分配給同一個消費者組中的一個消費者實例進行處理。在此模式下,broker會在內部以消費者組的概念來維護一個消費者位。

廣播模式:每條消息都會被發送到每個訂閱了該主題的消費者實例。在此模式下,各自的Consumer內部(消費者客戶端本地電腦offset.json文件中,該文件目前無法在Windows上創建)自行維護消費者位點,如果該文件丟失會自動創建并從MessageQueue的最后一條開始消費;消費失敗無法重試

消息過濾機制

1、Tag過濾:生產者在發送消息的時候不僅可以指定Topic,還可以知道Tag,進一步細分消息類型,那么在消費者端就可以通過Topic + Tag 拿到指定的消息,但是這種過濾方式只能簡單的進行字符匹配,無法進行復雜匹配,例如匹配字符數字大于3的所有Tag。

2、SQL過濾:RocketMQ還支持基于SQL92表達式的高級消息過濾功能。通過這種方式,可以根據消息屬性進行更復雜的過濾操作,如數值比較、字符串匹配等。使用方法:在發送消息時,除了基本的消息體外,還可以添加一些自定義屬性(當消息非常多的時候,不建議使用自定義屬性來過濾)。訂閱消息時,使用SQL表達式來描述過濾條件。例如,

Message message = new Message("TopicTest", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty("a", "3"); // 添加自定義屬性
SendResult sendResult = producer.send(message);
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 3 and b = 'abc'"));

表示只消費那些屬性a大于3且屬性b等于"abc"的消息。**但是這種功能需要手動開啟:在broker.conf文件中添加 enablePropertyFilter=true 即可。

性能影響:雖然SQL92表達式過濾提供了更大的靈活性,但它可能會對消息隊列的性能產生一定影響,因為它需要更多的計算資源來進行條件判斷。相比之下,Tag過濾由于其實現較為簡單直接,性能開銷較小(甚至可以忽略不記)。用SQL92表達式過濾時,要注意合理設置過濾條件,避免過于復雜的查詢導致性能下降。

順序消息機制

保證局部有序(不保證全局有序),好比每一筆訂單中的每一個步驟都是有序的(局部有序),但是不保證所有的訂單都按照創建時間來依次處理(全局無序,因為全局有序是沒有意義的)

原理就是將一筆訂單中的所有步驟都發送到同一個massegeQueue,然后broker按照massegeQueue中的消息順序依次推送給消費者?就可以消息的順序了,因為在不同的massegeQueue去拉去消息會存在網絡因素導致順序不一致,所有所有步驟都發送到同一個massegeQueue就可以保證了。也就是說如果把所有的消息都存儲到用一個massegeQueue中就可以保證全局消息的順序性了,但是這樣隊列就會有性能問題。因此我們在存儲消息的時候盡可能大散在不同的隊列中。
費者端如果確實處理邏輯中出現問題,不建議拋出異常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作為替代,用于指示當前消息隊列暫停片刻再嘗試消費;例如一個隊列中的步驟1成功,步驟2失敗,那么就會將整個隊列暫時阻塞,一段時間(基于內部實現中的退避算法,通常是幾秒到幾十秒不等)后在從步驟2開始重試來保證順序

延遲消息與批量消息

延遲消息:

Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 設置延遲級別為3,即表示延遲10秒(根據默認的延遲級別表)
message.setDelayTimeLevel(3);
producer.send(message);

RocketMQ內置有一些延遲時間,可以直接使用,但是不支持在代碼中自定義延遲時間,如果想在代碼中設置延遲,可以考慮spring的Task等內部實現定時然后發送到topic;要自定義延遲消息的時間間隔,您需要編輯Broker的配置文件broker.conf,并調整messageDelayLevel參數。這個參數允許您定義一系列以逗號分隔的延遲級別,每個級別表示一個從消息發送到可消費的時間間隔。

messageDelayLevel=1s 5s 10s 30s 1m

批量消息:

每次發送消息都要進行一次網絡的io,所有可以批量發送消息減少網絡io,但是一次發送的消息量不宜過多,具體多少需要自行在客戶端寫邏輯判斷。批量消息也有限制,如不能做延遲消息,要求所有消息都是同一個topic等。

事務消息機制

** 事務控制是在生產者端的 **,rockermq的事務和我們常規的事務不一樣,rockermq的事務是保證某個事件與發送消息組成原子性

事務消息是RocketMQ非常有特色的一個高級功能。他的基礎訴求是通過RocketMQ的事務機制,來保證上下游的數據一致性。以電商為例,用戶支付訂單這一核心操作的同時會涉及到下游物流發貨、積分變更、購物車狀態清空等多個子系統的變更。這種場景,非常適合使用RocketMQ的解耦功能來進行串聯。

1.發送方向 MQ 服務端發送事務消息(就和正常的消息一樣,只不過消費者不可見,實現原理是將消息存儲到系統的另一個特殊的topic中);
2.MQ Server 將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功(回調),此時消息為半消息。如果ACK 失敗,那么客戶端就重試,當超過最大重試次數就可以做告警了,比如郵箱告警等。
3.發送方開始執行本地事務邏輯。
4.發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或?Rollback 或),MQ Server 收到 Commit 狀態則將半消息標記為可投遞(將消息轉存到正常的topic),訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
5.在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間后 MQ Server 將對該消息發起消息回查。如果會查狀態是UNKNOWN(待檢測),那么broker就會隔一段時間重試,如果超過重試次數就會放到死信隊列中。
6.發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。?
7.發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 執行本地事務try {// 假設這里是您的本地事務邏輯System.out.println("Executing local transaction for message: " + new String(msg.getBody()));return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 檢查本地事務狀態System.out.println("Checking local transaction for message: " + new String(msg.getBody()));return RocketMQLocalTransactionState.UNKNOWN;}
}
@Service
public class TransactionService {private final RocketMQTemplate rocketMQTemplate;public TransactionService(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}public void sendMessageInTransaction() {Message<String> message = MessageBuilder.withPayload("Hello RocketMQ").setHeader("tag", "TagA").build();rocketMQTemplate.sendMessageInTransaction("TopicTest:TagA", message, null);}
}

對于事務消息還有一個使用場景---充當定時任務:

訂單創建等待支付,比如訂單要在15分鐘內支付,否則關閉訂單,常規做法是使用Xxl-Job等類似的定時任務中間件,比如每個30秒一次查詢用戶是否支付。

更優雅的做法是使用 rockermq 事務消息充當定時任務:
當訂單創建時向 broker 發送一個?UNKNOWN 狀態的事務消息,broker 就會自動的向客戶端發送回調檢測狀態,這個就可以在回調方法里面查詢用戶15分鐘之內是否支付成功,如果支付失敗就將優惠卷狀態恢復等等并返回Rollback,成功就返回Commit,消費者就可以看到這個消息,就可以通知到后面的物流系統等;如果支付成功了,但是消費者執行失敗了(例如庫存扣減失敗),那么broker就會進行重試,會在某一次執行成功,達到最終一致性。

ACL權限控制體系

1、是否允許自動創建 topic :?autoCreateTopicEnable=true 表示允許,在生產環境一般不允許

2、topic權限:

perm 代表權限,對生產者和消費者做限權

  • 0 (0000):無權限(Neither Read nor Write)
  • 2 (0010):僅讀權限(Only Read)
  • 4 (0100):僅寫權限(Only Write)
  • 6 (0110):讀寫權限(Both Read and Write)

當消費者消費失敗會創建一個perm為6的重試隊列,當重試次數達到一定次數就會創建一個perm為2死信隊列。要處理死信隊列只能手動去修改權限后再處理。

也可以在 /conf/plain_acl.yml文件中配置其他權限,例如:

如果再這個配置中配置了賬號和密碼,那么在客戶端聲明RocketMQ實例的時候要去指明即可

RocketMQ 作為一個內部服務,并不需要對外,所有權限控制很少使用。

RocketMQ客戶端注意事項

消息的 ID、Key、Tag

這里有個小細節需要注意,producer生產者端發送的是Message對象,而Consumer消費端處理的卻是MessageExt對象。也就是說,雖然都是傳遞消息,但是Consumer端拿到的信息會比Producer端發送的消息更多,也就有幾個重點的參數需要理解。那就是Messageld,Key和Tag

Messageld是RocketMQ內部給每條消息分配的唯一索引:
Producer發送的Message對象是沒有msgld屬性的。Broker端接收到Producer發過來的消息后,會給每條消息單獨分配一個唯一的msgld。這個msgID可以作為消息的唯一主鍵來使用。但是需要注意,對于客戶端來說,畢竟是不知道這個msgld是如何產生的。實際上,在RocketMQ內部,也會針對批量消息、事務消息等特殊的消息機制,有特殊的msgld分配機制(當使用某些框架的時候,可能會導致Messageld不唯一)。因此,在復雜業務場景下,不建msgld來作為消息的唯一索引l,而建議采用下面的key屬性自行指定業務層面上的唯一索引。例如訂單消息就將訂單ID設置為key。

最佳實踐

一個應用盡可能用一個Topic,而消息子類型則可以用tags來標識。tags可以由應用自由設置,只有生產者在發送消息設置了tags,消費方在訂閱消息時才可以利用tags通過broker做消息過濾:message.setTags("tags")
Kafka的一大問題是Topic過多,會造成Partition文件過多,影響性能。而RocketMQ中的Topic完全不會對消息轉發性能有影響。但是Topic過多,還是會加大RocketMQ的元數據維護的性能消耗。所以,在使用時,還是需要對Topic進行合理的分配。使用Tag區分消息時,盡量直接使用Tag過濾,不要使用復雜的SQL過濾。因為消息過濾機制雖然可以減少網絡IO,但是畢竟會加大Broker端的消息處理壓力。所以,消息過濾的邏輯,還是越簡單越好。

消費者端進行冪等控制

官方回答中說道:RocketMQ確保所有消息至少傳遞一次。在大多數情況下,消息不會重復。

消息冪等的必要性:
在互聯網應用中,尤其在網絡不穩定的情況下,消息隊列RocketMQ的消息有可能會出現重復,這個重復簡單可以概括為以下情況:
1、發送時消息重復:
當一條消息已被成功發送到服務端并完成持久化,此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。如果此時生產者意識到消息發送失敗并嘗試再次發送消息,消費者后續會收到兩條內容相同并且 MessageID 也相同的消息。
2、投遞時消息重復
消息消費的場景下,消息已投遞到消費者并完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。為了保證消息至少被消費一次,消息隊列RocketMQ的服務端將在網絡恢復后再次嘗試投遞之前已被處理過的消息,消費者后續會收到兩條內容相同并且MessageID 也相同的消息。
3、負載均衡時消息重復(包括但不限于網絡抖動、Broker重啟以及訂閱方應用重啟)
當消息隊列RocketMQ的Broker或客戶端重啟、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重復消息。

處理方式
從上面的分析中知道,在RocketMQ中,是無法保證每個消息只被投遞一次的,所以要在業務上自行來保證消息消費的冪等性。而要處理這個問題,RocketMQ的每條消息都有一個唯一的Messageld,這個參數在多次投遞的過程中是不會改變的,所以業務上可以用這個Messageld來作為判斷冪等的關鍵依據。但是,這個Messageld是無法保證全局唯一的,也會有沖突的情況。所以在一些對冪等性要求嚴格的場景,最好是使用業務上唯一的一個標識比較靠譜。例如訂單ID。而這個業務標識可以使用Message的Key來進行傳遞。

關注錯誤消息重試

如果消費者返回的狀態是?RECONSUME_LATER(稍后重試),那么這個消息會被放到重試隊列中,以消費者組的規則和分配策略重新推送給消費者,這個重試隊列是系統自動創建的,一般來說,有了重試隊列就代表消費者處理有異常,那么我們可以監控是否有重試隊列以此來錯告警。

手動處理死信隊列

死信隊列的特征:
1、一個死信隊列對應一個ConsumGroup,而不是對應某個消費者實例。
2、如果一個ConsumeGroup沒有產生死信隊列,RocketMQ就不會為其創建相應的死信隊列。
3、一個死信隊列包含了這個ConsumeGroup里的所有死信消息,而不區分該消息屬于哪個Topic。
4、死信隊列中的消息不會再被消費者正常消費。
5、死信隊列的有效期跟正常消息相同。默認3天,對應broker.conf中的fileReservedTime屬性。超過這個最長時間的消息都會被刪除,而不管消息是否消費過。
注:默認創建出來的死信隊列,他里面的消息是無法讀取的,在控制臺和消費者中都無法讀取。這是因為這些默認的死信隊列,他們的權限perm被設置成了2:禁讀(這個權限有三種2:禁讀,4:禁寫,6:可讀可寫)。需要手動將死信隊列的權限配置成6,才能被消費(可以通過mqadmin指定或者web控制臺)。
?

MQ如何保證消息不丟失

1、哪些環節可能丟消息

其中,1,2,4三個場景都是跨網絡的,而跨網絡就肯定會有丟消息的可能。
然后關于3這個環節,通常MQ存盤時都會先寫入操作系統的緩存pagecache中,然后再由操作系統異步的將消息寫入硬盤。這個中間有個時間差,就可能會造成消息丟失。如果服務掛了,緩存中還沒有來得及寫入硬盤的消息就會丟失。

2、生產者發送消息如何保證不丟

生產者發送消息之所以可能會丟消息,都是因為網絡。因為網絡的不穩定性,容易造成請求丟失。怎么解決這樣的問題呢?其實一個統一的思路就是生產者確認。簡單來說,就是生產者發出消息后,給生產者一個確定的通知,這個消息在Broker端是否寫入完成了。就好比打電話,不確定電話通沒通,那就互相說個“喂”,具體確認一下。只不過基于這個同樣的思路,各個MQ產品有不同的實現方式。

1、通過RockerMQ客戶端的消息確認機制保證消息不丟失

2、通過發送事務消息來保證息不丟失

3、Borker寫入數據如何保證不丟失


broker接收到消息并不會馬上寫到磁盤上,而是先寫到操作系統的pagecache緩存頁中,過一段時間才才寫到磁盤沖。以Linux為例,用戶態的應用程序,不管是什么應用程序,想要寫入磁盤文件時,都只能調用操作系統提供的write系統調用,申請寫磁盤。至于消息如何經過PageCache再寫入到磁盤中,這個過程,這個過程是在內核態執行的,也就是操作系統自己執行的,應用程序無法干預。這個過程中,應用系統唯一能夠干預的,就是調用操作系統提供的sync系統調用,申請一次刷盤操作,主動將PageCache中的數據寫入到磁盤。

RocketMQ如何調用fsync的?
RocketMQ的Broker提供了一個很明確的配置項flushDiskType,可以選擇刷盤模式。有兩個可選項,SYNC_FLUSH同步刷盤ASYNC_FLUSH異步刷盤
所謂同步刷盤,是指broker每往日志文件中寫入一條消息,就調用一次刷盤操作。而異步刷盤,則是指broker每隔一個固定的時間,才去調用一次刷盤操作。異步刷盤性能更穩定,但是會有丟消息的可能。而同步刷盤的消息安全性就更高,但是操作系統的IO壓力就會非常大。
在RocketMQ中,就算是同步刷盤,其實也并不是真的寫一次消息就刷盤一次,這在海量消息的場景下,操作系統是撐不住的。所以,我們在之前梳理RocketMQ核心源碼的過程中看到,RocketMQ的同步刷盤的實現方式其實也是以10毫秒的間隔去調用刷盤操作。從理論上來說,也還是會有非正常斷電造成消息丟失的可能,甚至嚴格意義上來說,任何應用程序都不可能完全保證斷電消息不丟失。但是,RocketMQ的這一套同步刷盤機制,卻可以
通過絕大部分業務場景的驗證。這其實就是一種平衡。

4、Broker主從同步如何保證不丟失

在這種集群機制下,消息的安全性還是比較高的。但是有一種極端的情況需要考慮。因為消息需要從Master往Slave同步,這個過程是跨網絡的,因此也是有時間延遲的。所以,如果Master出現非正常崩潰,那么就有可能有一部分數據是已經寫入到了Master但是還來得及同步到Slave。這一部分未來得及同步的數據,在RocketMQ的這種集群機制下,就會一直記錄在Master節點上。等到Master重啟后,就可以繼續同步了。另外由于Slave并不會主動切換成Master,所以Master服務崩潰后,也不會有新的消息寫進來,因此也不會有消息沖突的問題。所以,只要Mater的磁盤沒有壞,那么在這種普通集群下,主從同步通常不會造成消息丟失。

他優先保證的是集群內的數據一致性,而并不是保證不丟失。在某些極端場景下,比如出現網絡分區情況時,也會丟失一些未經過集群內確認的消息。不過,基于RocketMQ的使用場景,這種丟失消息的可能性非常小。另外,這種服務端無法保證消息安全的問題,其實結合客戶端的生產者確認機制,是可以得到比較好的處理的。因此,在RocketMQ中使用Dledger集群的話,數據主從同步這個過程,數據安全性還是比較高的。基本可以認為不會造成消息丟失。

5、消費者消費消息如何不丟失

消費者消費消息的過程中,需要從Broker上拉取消息,這些消息也是跨網絡的,所以拉取消息的請求也可能丟失。這時,會不會有丟消息的可能呢?
幾乎所有的MQ產品都設置了消費狀態確認機制。也就是消費者處理完消息后,需要給Broker一個響應,表示消息被正常處理了。如果Broker端沒有拿到這個響應,不管是因為Consumer沒有拿到消息,還是Consumer處理完消息后沒有給出相應,Broker都會認為消息沒有處理成功。之后,Broker就會向Consumer重復投遞這些沒有處理成功的消息(如果超過重試次數機會放到死信隊列)。RocketMQ和Kafka是根據Offset機制重新投遞,而RabbitMQ的ClassicQueue經典對列,則是把消息重新入隊。因此,正常情況下,Consumer消費消息這個過程,是不會造成消息丟失的,相反,可能需要考慮下消息冪等的問題。

6、如果MQ服務全部掛了,如何保證不丟失

最后有一種小概率的極端情況,就是MQ的服務全部掛掉了,這時,要如何保證業務能夠繼續穩定進行,同時業務數據不會丟失呢?
通常的做法是設計一個降級緩存。Producer往MQ發消息失敗了,就往降級緩存中寫,然后,依然正常去進行后續的業務。此時,再啟動一個線程,不斷嘗試將降級緩存中的數據往MQ中發送。這樣,至少當MQ服務恢復過來后,這些消息可以盡快進入到MQ中,繼續往下游Conusmer推送,而不至于造成消息丟失。

7、MQ消息零丟失方案總結

最后要注意到,這里討論到的各種MQ消息防止丟失的方案,其實都是以增加集群負載,降低吞吐為代價的。這必然會造成集群效率下降。因此,這些保證消息安全的方案通常都需要根據業務場景進行靈活取舍,而不是一股腦的直接用上。
這些消息零丟失方案,其實是沒有最優解的。因為如果有最優解,那么這些MQ產品,就不需要保留各種各樣的設計了。這和很多面試八股文是有沖突的。面試八股文強調標準答案,而實際業務中,這個問題是沒有標準答案的,一切,都需要根據業務場景去調整。

MQ如何保證消息的順序性

這里首先需要明確的是,通常討論MQ的消息順序性,其實是在強調局部有序,而不是全局有序。就好比QQ和微信的聊天消息,通常只要保證同一個聊天窗口內的消息是嚴格有序的。至于不同窗口之間的消息,順序出了點偏差,其實是無所謂的。所謂全局有序,通常在業務上沒有太多的使用場景。在RocketMQ和Kafka中把Topic的分區數設置成1,這類強行保證消息全局有序的方案,純屬思維體操。

這個機制需要兩個方面的保障。

1、Producer將一組有序的消息寫入到同一個MessageQueue中。

2、Consumer每次集中從一個MessageQueue中拿取消息。

在Producer端,RocketMQ和Kafka都提供了分區計算機制,可以讓應用程序自己決定消息寫入到哪一個分區。所以這一塊,是由業務自己決定的。只要通過定制數據分片算法,把一組局部有序的消息發到同一個對列當中,就可以通過對列的FIFO特性,保證消息的處理順序。對于RabbitMQ,則可以通過維護Exchange與Queue之間的綁定關系,將這一組局部有序的消息轉發到同一個對列中,從而保證這一組有序的消息,在RabbitMQ內部保存時,是有序的。在Conusmer端RocketMQ是通過讓Consumer注入不同的消息監聽器來進行區分的。而具體的實現機制,核心是通過Consumer的消費線程進行并發控制,來保證消息的消費順序的。類比到Kafka呢。Kafka中并沒有這樣的并發控制。而實際上,Kafka的Consumer對某一個Partition拉取消息時,天生就是單線程的,所以,參照RocketMQ的順序消費模型,Kafka的Consumer天生就是能保證局部順序消費的。

至于RabbitMQ,以他的ClassicQueue經典對列為例,他的消息被一個消費者從隊列中拉取后,就直接從隊列中把消息刪除了。所以,基本不存在資源競爭的問題。那就簡單的是一個隊列只對應一

個Consumer,那就是能保證順序消費的。如果一個隊列對應了多個Consumer,同一批消息,可能會進入不同的Consumer處理,所以也就沒法保證消息的消費順序

MQ如何保證消息冪等性

通用解決方案

1. 使用唯一消息標識

在發送消息時,為每條消息生成一個全局唯一的標識符(如UUID、訂單號等)。在消費端,通過檢查這個唯一標識來判斷消息是否已被處理過。

實現步驟

  • 發送端在發送消息時,將唯一標識作為消息的Keys或屬性。
  • 消費端在接收到消息后,首先檢查這個唯一標識是否已存在于數據庫中。
  • 如果存在,則說明消息已被處理過,直接跳過;如果不存在,則進行業務處理,并將唯一標識存入數據庫。

2. 引入消息去重表或者布隆過濾器

在數據庫中創建一個消息去重表,用于記錄已處理消息的唯一標識。消費端在處理消息前,先查詢去重表,判斷消息是否已被處理。或者使用布隆過濾器來判斷是否消費過了。

實現步驟

  • 定義一個消息去重表,包含唯一標識和消息狀態等字段。
  • 消費端在處理消息前,先插入一條記錄到去重表中(使用唯一標識作為主鍵,以處理并發插入時的沖突)。
  • 如果插入成功,則說明消息是新的,進行業務處理;如果插入失敗(主鍵沖突),則說明消息已被處理過,直接跳過。

3. 利用RocketMQ的消息ID

雖然RocketMQ的消息ID在大多數情況下是唯一的,但不建議直接依賴它來實現消息的冪等性,因為存在生產者手動重發相同消息(但Message ID不同)的情況。

然而,在某些場景下,可以結合業務唯一標識和消息ID來輔助實現冪等性。例如,在數據庫中同時記錄業務唯一標識和RocketMQ的消息ID,通過這兩個字段的組合來確保消息的唯一性。

4. 引入分布式鎖

對于需要嚴格保證冪等性的場景,可以考慮在消費消息前引入分布式鎖。通過分布式鎖來確保同一時間只有一個消費者能處理某條消息。

實現步驟

  • 在處理消息前,嘗試獲取分布式鎖(以消息的唯一標識作為鎖鍵)。
  • 如果獲取成功,則進行業務處理;如果獲取失敗,則說明有其他消費者正在處理該消息,直接跳過。

注意事項

  • 性能考慮:在實現冪等性時,要注意避免引入過多的數據庫操作或分布式鎖,以免影響系統的整體性能。
  • 容錯性:要確保冪等性實現方案具有容錯性,能夠在各種異常情況下正確運行。
  • 業務邏輯適配:冪等性實現應緊密結合業務邏輯,確保在復雜業務場景下的有效性和正確性。

MQ如何快速處理積壓的消息

1、消息積壓會有哪些問題。
對RocketMQ和Kafka來說,他們的消息積壓能力本來就是很強的,因此,短時間的消息積壓,是沒有太多問題的。但是需要注意,如果消息積壓問題一直得不到解決,RocketMQ和Kafka在日志文件過期后,就會直接刪除過期的日志文件。而這些日志文件上未消費的消息,就會直接丟失。
?
2、怎么處理大量積壓的消息
產生消息積壓的根本原因還是Consumer處理消息的效率太低,所以最核心的目標還是要提升Consumer消費消息的效率。如果不能從業務上提升Consumer消費消息的性能,那么最直接的辦法就是針對處理消息比較慢的消費者組,增加更多的Consumer實例。但是這里需要注意一下,增加Consumer實例是不是會有上限。因為同一個消費者組下的多個Cosumer需要和對應Topic下的MessageQueue建立對應關系,而一個MessageQueue最多只能被一個Consumer消費,因此,增加的Consumer實例最多也只能和Topic下的MessageQueue個數相同。如果此時再繼續增加Consumer的實例,那么就會有些Consumer實例是沒有MessageQueue去消費的,因此也就沒有用了。
這時,如果Topic下的MessageQueue配置本來就不夠多的話,那就無法一直增加Consumer節點個數了。這時怎么處理呢?如果要快速處理積壓的消息,可以創建一個新的Topic,配置足夠多的MessageQueue。然后把Consumer實例的Topic轉向新的Topic(在消費者代碼中發消息),并緊急上線一組新的消費者,只負責消費舊Topic中的消息,并轉存到新的Topic中。這個速度明顯會比普通Consumer處理業務邏輯要快很多。然后在新的Topic上,就可以通過添加消費者個數來提高消費速度了。之后再根據情況考慮是否要恢復成正常情況。其實這種思路和RocketMQ內部很多特殊機制的處理方式是一樣的。例如固定級別的延遲消息機制,也是把消息臨時轉到一個系統內部的Topic下,處理過后,再轉回來。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913971.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913971.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913971.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【個人思考】不點菜的美學:Omakase 的信任、四季與食藝

本文原創作者:姚瑞南 AI-agent 大模型運營專家/音樂人/野生穿搭model,先后任職于美團、獵聘等中大廠AI訓練專家和智能運營專家崗;多年人工智能行業智能產品運營及大模型落地經驗,擁有AI外呼方向國家專利與PMP項目管理證書。(轉載需經授權) 目錄 ?? 什么是 Omakase?…

vivo Pulsar 萬億級消息處理實踐(3)-KoP指標異常修復

作者&#xff1a;vivo 互聯網大數據團隊- Chen Jianbo 本文是《vivo Pulsar萬億級消息處理實踐》系列文章第3篇。 Pulsar是Apache基金會的開源分布式流處理平臺和消息中間件&#xff0c;它實現了Kafka的協議&#xff0c;可以讓使用Kafka API的應用直接遷移至Pulsar&#xff0c;…

Marin說PCB之Allegro高亮BOM器件技巧詳解

一&#xff0c;首先在原理圖輸出BOM的時候&#xff0c;只需要勾選器件的位號這個選項即可&#xff0c;具體操作如下所示&#xff1a;二&#xff0c;輸出BOM完成后&#xff0c;打開表格選擇我們器件的位號那列即可&#xff0c;然后復制到我們的TEXT文本中。三&#xff0c;接著就…

數據結構與算法——從遞歸入手一維動態規劃【2】

前言&#xff1a; 記錄一下對左程云系列算法課程--算法講解066【必備】的剩余習題的學習。本文主要簡單記錄個人學習心得和提供C版本代碼。如需要題目的細致講解&#xff0c;請前往原視頻。 涉及內容&#xff1a; 動態規劃、三指針、 參考視頻&#xff1a; 左程云--算法講…

【理念●體系】Windows AI 開發環境搭建實錄:六層架構的逐步實現與路徑治理指南

【理念●體系】從零打造 Windows WSL Docker Anaconda PyCharm 的 AI 全鏈路開發體系-CSDN博客 Windows AI 開發環境搭建實錄&#xff1a;六層架構的逐步實現與路徑治理指南 ——理念落地篇&#xff0c;從路徑規劃到系統治理&#xff0c;打造結構化可復現的 AI 開發環境 AI…

5G標準學習筆記15 --CSI-RS測量

5G標準學習筆記15 --CSI-RS測量 前言 前面講了&#xff0c;在5GNR中&#xff0c;CSI-RS 是支持信道狀態評估、波束管理和無線資源管理&#xff08;RRM&#xff09;的關鍵參考信號。下面孬孬基于3GPP TS 38.331中的內容&#xff0c;詳細定義了基于 CSI-RS 的測量程序&#xff0c…

第P28:阿爾茨海默病診斷(優化特征選擇版)

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 一、進階說明 針對于特征對模型結果的影響我們做了特征分析 特征選擇 1. SelectFromModel 工作原理&#xff1a;基于模型的特征選擇方法&#xff0c;使用…

AI的歐幾里得要素時刻:從語言模型到可計算思維

引言 人工智能正在經歷一個關鍵的轉折點。就像歐幾里得的《幾何原本》為數學奠定了公理化基礎一樣&#xff0c;AI也正在尋找自己的"要素時刻"——一個能夠將當前的語言模型能力轉化為真正可計算、可驗證思考的轉變。 最近發表的論文《AI’s Euclid’s Elements Momen…

番外-linux系統運行.net framework 4.0的項目

基礎環境&#xff1a;linux系統&#xff0c;.net framework 4.0&#xff0c;npgsql 2.2.5.0 &#xff08;版本不同&#xff0c;構建可能失敗&#xff09; 方法背景&#xff1a;linux不支持運行.net framework 4.0&#xff0c;高版本mono不支持npgsql 2.x 主要使用&#xff1a…

國內AI訓練都有哪些企業?:技術深耕與場景實踐

國內AI訓練都有哪些企業&#xff1f;當人工智能從實驗室走向產業一線&#xff0c;AI 訓練就像為智能系統 “施肥澆水” 的關鍵環節&#xff0c;讓技術根系在各行業土壤里扎得更深。國內一批 AI 訓練企業正各展所長&#xff0c;有的專攻技術優化&#xff0c;有的深耕場景應用。它…

微算法科技基于格密碼的量子加密技術,融入LSQb算法的信息隱藏與傳輸過程中,實現抗量子攻擊策略強化

隨著量子計算技術的發展&#xff0c;傳統加密算法面臨被量子計算機破解的風險&#xff0c;LSQb 算法也需考慮應對未來可能的量子攻擊。微算法科技基于格密碼的量子加密技術&#xff0c;融入LSQb算法的信息隱藏與傳輸過程中&#xff0c;實現抗量子攻擊策略強化。格密碼在面對量子…

xAI發布Grok4+代碼神器Grok4 Code,教你如何在國內升級訂閱SuperGrok并使用到Grok4教程

就在今天&#xff0c;馬斯克旗下xAI發布了其最新的旗艦AI模型Grok4&#xff0c;并同步推出專為開發者打造的編程利器 Grok 4 Code&#xff0c;還推出了一項全新的AI訂閱計劃——每月300美元的SuperGrokHeavy。 那最新發布的Grok4以及有哪些特性呢&#xff1f;以及如何才能使用…

Rust 變量遮蔽(Variable Shadowing)

在 Rust 中&#xff0c;變量遮蔽&#xff08;Variable Shadowing&#xff09; 是一種在同一作用域內重新聲明同名變量的特性。它允許你創建一個新變量覆蓋之前的同名變量&#xff0c;新變量與舊變量類型可以不同&#xff0c;且舊變量會被完全隱藏。核心特點允許同名變量重復聲明…

【VScode | 快捷鍵】全局搜索快捷鍵(ctrl+shift+f)失效原因及解決方法

&#x1f601;博客主頁&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客內容&#x1f911;&#xff1a;&#x1f36d;嵌入式開發、Linux、C語言、C、數據結構、音視頻&#x1f36d; &#x1f60e;金句分享&#x1f60e;&a…

Windows 與 Linux 內核安全及 Metasploit/LinEnum 在滲透測試中的綜合應用

目錄 &#x1f6e0;? 1. 內核安全如何助力滲透測試與黑客行業 1.1 內核安全的戰略價值 1.2 結合 Metasploit 與 LinEnum 的作用 &#x1f50d; 2. Metasploit 信息收集模塊及其在內核安全中的應用 2.1 Windows 信息收集模塊 2.2 Linux 信息收集模塊 2.3 使用步驟 Wind…

京東攜手HarmonyOS SDK首發家電AR高精擺放功能

在電商行業的演進中&#xff0c;商品的呈現方式不斷升級&#xff1a;從文字、圖片到視頻&#xff0c;再到如今逐漸興起的3D與AR技術。作為XR應用探索的先行者&#xff0c;京東正站在這場體驗革新的最前沿&#xff0c;不斷突破商品展示的邊界&#xff0c;致力于通過創新技術讓消…

瞄準Win10難民,蘋果正推出塑料外殼、手機CPU的MacBook

最近有消息稱&#xff0c;蘋果正在研發一款定位“低價”的MacBook&#xff0c;售價可能低于800美元&#xff08;約合人民幣5800元&#xff09;&#xff0c;采用的是A18 Pro芯片&#xff0c;也就是未來iPhone 16 Pro同款的“手機芯片”&#xff0c;而不是現有的M系列。這款產品預…

原子級 macOS 信息竊取程序升級:新增后門實現持久化控制

臭名昭著的 Atomic macOS Stealer&#xff08;AMOS&#xff0c;原子級 macOS 竊取程序&#xff09;惡意軟件近期完成危險升級&#xff0c;全球 Mac 用戶面臨更嚴峻威脅。這款與俄羅斯有關聯的竊密程序首次植入后門模塊&#xff0c;使攻擊者能維持對受感染系統的持久訪問、執行遠…

Shader面試題100道之(81-100)

Shader面試題&#xff08;第81-100題&#xff09; 以下是第81到第100道Shader相關的面試題及答案&#xff1a; 81. Unity中如何實現屏幕空間的熱扭曲效果&#xff08;Heat Distortion&#xff09;&#xff1f; 熱扭曲效果可以通過GrabPass抓取當前屏幕圖像&#xff0c;然后在片…

C#洗牌算法

洗牌算法是一種將序列&#xff08;如數組、列表&#xff09;元素隨機打亂的經典算法&#xff0c;核心目標是讓每個元素在打亂后出現在任意位置的概率均等。在 C# 中&#xff0c;常用的洗牌算法有Fisher-Yates 洗牌算法&#xff08;也稱 Knuth 洗牌算法&#xff09;&#xff0c;…