大家好,我是G探險者!
📌 背景場景
在高可用分布式系統中,我們經常面臨:
- MQ 集群重啟 → 消息監聽中斷
- MQ 網絡短暫抖動 → 發送端連接失敗
- 一端恢復正常,另一端仍處于掛死狀態
如果你只配置了“連接工廠層”的重連,卻忽略了監聽容器或發送客戶端的容錯設計,重連機制可能失效,業務陷入長時間不可用。
? 核心理念:監聽和發送是兩個不同的連接“通道”
通道 | 用途 | 組件 |
---|---|---|
監聽通道 | 從 MQ 拉取消息 | Spring JMS 的?MessageListenerContainer |
發送通道 | 發送消息到 MQ | Spring 的?JmsTemplate |
這兩個通道各自有自己的連接池和生命周期,不能指望一個設置就解決全部問題。
🔁 一、監聽端的自動重連機制
推薦做法:使用?DefaultMessageListenerContainer
?并設置重連間隔
DefaultMessageListenerContainer?container?=?new?DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("MY.QUEUE");
container.setMessageListener(new?MyListener());// ? 開啟事務模式可選
container.setSessionTransacted(true);// ? 開啟自動重連機制(默認是 true)
container.setRecoveryInterval(5000L);?// 每 5 秒重試連接一次container.afterPropertiesSet();
container.start();
DefaultMessageListenerContainer
?內部會捕獲?ConnectionException
?等連接中斷異常,自動重試連接。
📤 二、發送端的容災重連策略
監聽容器有容器幫你維護連接,而?發送端(JmsTemplate)則需要連接池支撐。
推薦:配合使用?CachingConnectionFactory
ConnectionFactory?factory?=?createIBMConnectionFactory();?// 原始 MQ 工廠
CachingConnectionFactory?cachingFactory?=?new?CachingConnectionFactory(factory);// 可選設置緩存大小(緩存 session 的數量)
cachingFactory.setSessionCacheSize(10);JmsTemplate?jmsTemplate?=?new?JmsTemplate(cachingFactory);
jmsTemplate.convertAndSend("MY.QUEUE",?"Hello MQ");
📌 為啥要用?CachingConnectionFactory
?
原因 | 描述 |
---|---|
重用連接 | 避免每次發送都新建連接(開銷大) |
支持連接斷開重建 | 內部封裝連接失效后重建邏輯 |
提供 session 緩存 | 提升發送效率,降低資源消耗 |
🧰 三、JMS 廠商參數補充(IBM MQ 舉例)
若你使用 IBM MQ,可以在底層工廠設置:
MQQueueConnectionFactory?factory?=?new?MQQueueConnectionFactory();factory.setHostName("192.168.1.102");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("CHANNEL1");
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);// ? 啟用自動重連
factory.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS,WMQConstants.WMQ_CLIENT_RECONNECT);// ? 設置最大重連時間(秒)
factory.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT,?30);// ? 設置連接列表(用于集群 HA)
factory.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST,"192.168.1.102(1414),192.168.1.103(1414)");
🔗 四、總結策略建議表
場景 | 推薦設置 |
---|---|
MQ監聽端 | DefaultMessageListenerContainer ?+?setRecoveryInterval |
MQ發送端 | JmsTemplate ?+?CachingConnectionFactory |
多 broker/集群 | 設置?CONNECTION_NAME_LIST |
事務性保障 | setSessionTransacted(true) ?+?onMessage() ?異常觸發 rollback |
監聽不生效 | 檢查是否調用了?afterPropertiesSet() |
📘 下一篇預告:
《JMS事務性會話徹底解析:消息監聽中的 commit、rollback 和冪等設計》
我們將深入剖析如何使用事務控制 MQ 消息的消費與回滾,Spring 容器如何自動幫你 commit/rollback,以及如何設計冪等保證系統不重復處理失敗消息。