大家好,我是G探險者!
📌 場景引入
在實際項目中,我們常常面臨以下挑戰:
- 監聽 MQ 消息失敗了,希望自動重試?
- 消費 MQ 消息后,要寫數據庫,但中間報錯了?
- 消息處理必須要么成功要么失敗,否則可能導致臟數據?
- 消息是冪等的嗎?可以重復投遞處理嗎?
這些都需要?事務性會話 + 容器回滾機制 + 冪等控制?組合拳來解決。
? 一、什么是 JMS 的事務性會話?
事務性會話(transacted = true
)是一種?將消息的接收與處理放入事務中控制?的機制。
與確認模式(acknowledge)對比:
特性 | 確認模式(ACK) | 事務性會話(Transacted) |
---|---|---|
消息確認 | AUTO_ACKNOWLEDGE 、CLIENT_ACKNOWLEDGE ?等 | 使用?session.commit() |
回滾方式 | 手動控制 ACK | 拋異常或手動?session.rollback() |
MQ是否重發消息 | 否(默認不重發) | ? 是,失敗自動重新投遞 |
一次事務包含消息數 | 一條(Spring容器下) | ? 默認一條,支持手動多條 |
🛠? 二、Spring 如何開啟事務性監聽?
Spring 中的?DefaultMessageListenerContainer
?支持事務模式:
DefaultMessageListenerContainer?container?=?new?DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("MY.QUEUE");
container.setMessageListener(new?MyListener());container.setSessionTransacted(true);?// ? 開啟事務會話
container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);?// 推薦container.afterPropertiesSet();
container.start();
這樣配置后,每條消息的處理會包裹在如下事務中:
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)
🧪 三、事務處理機制詳解
Spring 容器負責自動控制事務行為:
try?{messageListener.onMessage(message);session.commit();?// ? 成功后提交
}?catch?(Throwable ex) {session.rollback();?// ? 拋異常后回滾,MQ 重發消息throw?ex;
}
? 你只要記住:
- 成功就正常返回(容器幫你 commit)
- 失敗就拋出異常(容器自動 rollback)
🔂 四、消息重試機制聯動
Spring rollback → MQ 檢測未 commit → 觸發重投
🔁 每個 MQ 中間件(IBM MQ、ActiveMQ、TongLinkQ 等)都支持配置:
- 最大重投次數
- 重投間隔(redelivery delay)
- 超過重試后投遞到死信隊列(DLQ)
💥 五、事務作用范圍:是“一條消息”嗎?
這個問題很關鍵,我們以 Spring 默認配置為例說明:
場景 | 事務作用范圍 |
---|---|
DefaultMessageListenerContainer ?默認行為 | ? 每條消息單獨包裹事務 |
自定義?Session ?拉多條消息后統一 commit | ? 多條消息為一個事務 |
設置并發消費者(線程池) | 每條消息獨立事務(線程隔離) |
實戰建議:
? 在監聽容器中消費 MQ 消息,默認一條消息就是一個事務單元,安全可靠。
🎯 六、事務 + 冪等的設計建議
事務只能解決“要么成功、要么失敗”的問題,不能避免重復處理。
所以業務系統通常要配合冪等性策略:
場景 | 冪等性設計建議 |
---|---|
寫數據庫 | 利用主鍵/唯一索引避免重復插入 |
寫 Redis | 使用?SETNX ?保證消息只處理一次 |
寫業務日志 | 使用消息 ID 做去重處理 |
第三方調用 | 如果不能重復調用,要做冪等屏障 |
?? 七、監聽失敗常見問題排查
問題 | 排查建議 |
---|---|
沒開啟事務? | 是否調用了?setSessionTransacted(true) |
容器未啟動? | 是否漏了?afterPropertiesSet() ?調用 |
消息處理失敗后 MQ 不重發? | 是否吞掉異常了?應拋出異常給容器 |
重投失敗消息去哪了? | 查看 MQ 的 DLQ(死信隊列)配置 |
📘 小結
功能點 | 建議配置 |
---|---|
自動控制 commit/rollback | 使用?DefaultMessageListenerContainer |
每條消息開啟事務 | setSessionTransacted(true) |
拋異常觸發回滾 | 在?onMessage() ?中不要吞異常 |
冪等設計 | 配合事務做冪等邏輯 |
消息處理失敗自動重試 | 借助 MQ 的重投策略 + Spring 回滾機制 |
📘 下一篇預告:
第 4 篇:《JMS 消息重試機制與死信隊列配置指南》
我們將詳細講解 MQ 的 redelivery policy、最大重試次數配置、死信隊列處理策略,以及如何在 Spring 中優雅應對消費失敗。