最近在做一個內部應用的時候,應用到了ActiveMQ作為服務之間消息傳遞,解耦服務之間的關聯,但是在應用的過程中遇到了連接斷線無法重連的問題,下面基于這個問題,深入了解一下ActiveMQ的一些相關原理和知識。
一、前置知識
1.1 基礎概念
ActiveMQ中有3個重要的角色:Broker、Producer、Consumer。
Broker為消息代理,它是ActiveMQ服務端角色,接收客戶端的鏈接并提供消息通信的核心服務。
Producer是消息生產者,客戶端角色
Consumer是消息消費者,客戶端角色。
要保證Producer和Consumer正常通信,主要是通過Broker來實現的,Broker既代理了Producer同時也代理了Consumer,這樣Broker才能知道哪些是生產者,哪些是消費者,不至于不同的消息被本不屬于它的消費者給消費了。Broker內部的機制我們下一節學習。
那么生產者和消費者如何實現通信的呢?ActiveMQ定義的連接器(connector)就是用來約定ActiveMQ的節點之間如何通信的。
1.2 連接器
ActiveMQ通過網絡連接器這種連接機制來實現客戶端與服務端之間的通信。ActiveMQ提供了兩種連接器:
- 傳輸連接器(transport connector):用于客戶端和服務端之間( client-to-broker)的通信。
- 網絡連接器(network connector):用戶集群中多個服務端之間(broker-to-broker)的通信。
1.3 傳輸連接器包括的協議
- tcp,默認使用的協議,符合大多數的使用場景。
- udp,客戶端使用udp協議和服務端通信,當客戶端和服務端之間存在防火墻可以考慮使用udp協議。
- vm,當客戶端和服務端在同一個JVM中可以考慮使用。直接使用虛擬機本地方法調用,從而避免網絡通信的開銷。
- nio,本質上還是tcp,只是使用了java NIO包,某些場景下可能性能更好。
- ssl,基于tcp提供安全的通信。
- http/https,允許客戶端使用REST或Ajax的方式進行連接,可以通過JS給ActiveMQ發送消息。
- multicast,客戶端使用組播的方式連接到服務端。
- websocket,可以通過HTML5中的websocket技術連接服務端。
- amqp,高級消息隊列協議,很多消息中間件都支持該協議。ActiveMQ5.8版本開始支持。
- mqtt,MQTT是一個基于客戶端-服務器的消息發布/訂閱傳輸協議,主要應用在loT(物聯網)。
- stomp,STOMP是在WebSocket之上提供了一個基于幀的線路格式(frame-based wire format)層,用來定義消息的語義,就像HTTP在TCP套接字之上添加了請求-響應模型層一樣。ActiveMQ5.6版本開始支持
二、ActiveMQ斷線發生的場景
ActiveMQ的客戶端與ActiveMQ的Broker(消息代理)之間的網絡連接發生斷開,如果未采用高可用的配置,那么Producer無法向MQ中生產對象,同理Customer也無法消費MQ中的消息,整個業務就會出現暫停。
默認的情況下如果ActiveMQ服務正常,那么所有Client服務啟動,都會自動在Broker中進行注冊,這樣就能實現消息生產和消費。但是如果Client服務正常,ActiveMQ服務宕機了進行重啟或當網絡不穩定或出現故障導致連接斷開時,ActiveMQ是不會主動實現Client與Broker進行重連的,此時所有服務都正常,由于連接未建立,所以整個業務也無法實現消息的生產和消費。
三、ActiveMQ斷線重連的實現原理
ActiveMQ的斷線重連機制的實現原理主要是基于網絡通信和消息重試機制。當ActiveMQ的客戶端與Broker之間的連接斷開時,客戶端會檢測到這個事件,并嘗試重新建立連接。,ActiveMQ的客戶端會檢測到連接中斷事件,然后觸發一個重連機制。客戶端會嘗試重新連接到一個或多個Broker的URL。在默認情況下,如果連接斷開,客戶端會新起一個線程,不斷的從url參數中獲取一個url來重試連接。這種重試機制通常會進行一定的次數限制,以避免無限制的重試導致資源浪費或其他問題。
另外,在重連過程中,ActiveMQ的客戶端還會嘗試恢復之前未發送成功的消息。這個過程主要是通過持久化消息存儲來實現的。在連接斷開時,客戶端會將未發送成功的消息存儲到持久化存儲中,如數據庫或文件系統等。當客戶端成功連接到Broker后,會從持久化存儲中恢復這些消息,并進行重新發送。
四、ActiveMQ斷線重連的配置
ActiveMQ提供了客戶端和服務器端通信高可用的配置,
failover,為客戶端提供重連服務端的邏輯,允許配置多個上面介紹的不同協議的連接配置,并隨機的從其中選擇一個進行連接,如果失敗則繼續選擇其他服務重試。failover的配置格式:failover:(tcp://ip1:61616,tcp://ip2:61616)?initialReconnectDelay=100。
fanout,采用復制的方式將消息發送給多個服務端,配置格式為:fanout:(tcp://localhost:61629,udp://localhost:61639,tcp://localhost:61649)
4.1 failover
failover是一種ActiveMQ提供的失效轉移(也叫故障轉移)的策略。其原理是如果服務先連接到tcp://ip1:61616這個消息隊列,如果因為網絡抖動或其他意外情況導致ip1無法連接,failover會自動切換到ip2:61616這個消息隊列,實現了消息高可用,如果ip1的網路正常了,failover又會嘗試連接回來。但是這與斷線重連有什么關系呢?
經過我的驗證,如果只配置了一個ActiveMQ,如:failover:(tcp://ip1:61616)?initialReconnectDelay=100 當ip1上的ActiveMQ出現了問題,此時failover無法進行故障轉移,他就會在initialReconnectDelay定義的100毫秒后進行ip1的重連,從而導致Client與ip1的Broker重新建立鏈接,實現了斷線重連的功能。
4.2 fanout
采用復制的方式將消息發送給多個服務端,這里面雖然沒有斷線重連,但是實現了消息發送的高可用,這里面需要注意一點,如果Customer在沒有很好的處理消息的情況下,有可能Productor生產了一個消息,發送給多個消息隊列,Customer消費了多次消息,導致數據重復,所以需要注意Customer消息消費邏輯的冪等性。
4.3 自定義函數實現斷線重連
JMS提供了ExceptionListener接口用于偵聽JMS消息鏈接異常,以下是基于JMS的ExceptionListener接口實現的斷線自動重連的示例:
import java.util.Timer;
import java.util.TimerTask;import javax.jms.ExceptionListener;
import javax.jms.JMSException;/*** JMS 重連接實現<br>* 通過實現{@link ExceptionListener}接口偵聽連接異常,* 使用定時任務遲延執行重連接嘗試直至連接成功* @author guyadong* @since 2.3.8*/
class AutoReconnectAdapter implements ExceptionListener,JmsConstants{private static long START_RECONNECTDELAY = 1;/*** 用于執行自動重連的定時器對象*/private static final Timer reconnectTimer = new Timer("AMQP Reconnect"); /*** 定時重連的延遲時間(秒),從1秒開始,每次增加一倍,最大128*/private long reconnectDelay = START_RECONNECTDELAY; /*** 最大重連延遲時間*/private long maxReconnectDelay = 128;/*** 應用層實現的重連回調接口*/private final JMSReconnectCallback jmsReconnectCallback;public AutoReconnectAdapter(JMSReconnectCallback jmsReconnectCallback) {this.jmsReconnectCallback = jmsReconnectCallback;}@Overridepublic void onException(JMSException exception) {if(null != jmsReconnectCallback) {try {jmsReconnectCallback.onConnectionLost();scheduleReconnectCycle();} catch (Exception e) {logger.error(e.getMessage(),e);}}}/*** 嘗試將客戶端重新連接到服務器。如果成功,它將確保不再計劃重新連接。* 但是,如果連接失敗,延遲將增加一倍(最大128秒),并將在延遲后重新安排重新連接。*/private void attemptReconnect() {if(null != jmsReconnectCallback) {try {jmsReconnectCallback.tryReconnecting();// restore to default valuereconnectDelay = START_RECONNECTDELAY;} catch (Exception e) {if(e instanceof JMSException || e.getCause() instanceof JMSException ) {reconnectDelay = Math.min(reconnectDelay*2, maxReconnectDelay);scheduleReconnectCycle();}else {logger.error(e.getMessage(),e);}}}}/*** 安排在{@link #reconnectDelay}指定的延遲時間后執行重連接嘗試*/private void scheduleReconnectCycle() {logger.info("{} Scheduling reconnect timer, delay {} seconds",jmsReconnectCallback.ownerName(), reconnectDelay);reconnectTimer.schedule(new TimerTask() {@Overridepublic void run() {attemptReconnect();}}, reconnectDelay*1000);}
}
為了適應應用層不同的重連接實現需要,通過定義JMSReconnectCallback接口,來讓斷連接和重連實現抽象化,應用層可以根據自己的需要,實現此接口,執行斷開連接和重連的動作
/*** JMS 重連機制回調接口* @author guyadong* @since 2.3.8*/
public interface JMSReconnectCallback{/*** 連接異常偵聽* @throws Exception */public void onConnectionLost() throws Exception;/*** 嘗試重連動作* @throws Exception 失敗拋出異常 JMSException 或異常原因(cause)為JMSException 則繼續異步執行重試*/public void tryReconnecting() throws Exception;/*** 返回當前接口對象所屬的模塊名,用于日志輸出*/public String ownerName();
}
完整代碼參見碼云倉庫:https://gitee.com/l0km/simplemq/blob/dev/simplemq-jms/src/main/java/gu/simplemq/jms/AutoReconnectAdapter.java
五、ActiveMQ斷線重連的優化策略
斷線重連如果過于頻繁,也會導致網絡與服務器的壓力,建議從以下幾點進行斷線重連的優化:
減少斷線重連的頻率, 適當提升斷線重連中間的延時
優化網絡質量, 比如Client與MQ都部署在內網
調整連接超時和重試次數
使用多線程處理連接失敗
使用負載均衡和集群提高可用性
優化消息恢復和持久化性能, 確保未被正常處理的消息有恢復機制
選擇合適的消息恢復策略
斷線重連的日志監控和告警機制
日志監控工具的選擇和使用
告警機制的建立和維護,檢測到斷線之后最好給運維人員發送一條消息
六、總結
消息隊列在大型項目的建設過程中被廣泛的應用,雖然能夠實現削峰異步處理,但是消息隊列的引入會增加服務運維的風險和成本。今天總結了Client與ActiveMQ出現了斷開鏈接如何實現自動重連的機制和原理,以及相關高可用配置方法。文章中如有錯誤,請多多指正,您的指正是我們共同進步的基石。