ActiveMQ斷線重連技巧,即通信高可用的配置

最近在做一個內部應用的時候,應用到了ActiveMQ作為服務之間消息傳遞,解耦服務之間的關聯,但是在應用的過程中遇到了連接斷線無法重連的問題,下面基于這個問題,深入了解一下ActiveMQ的一些相關原理和知識。

一、前置知識

1.1 基礎概念

ActiveMQ中有3個重要的角色:Broker、Producer、Consumer。
Broker為消息代理,它是ActiveMQ服務端角色,接收客戶端的鏈接并提供消息通信的核心服務。
Producer是消息生產者,客戶端角色
Consumer是消息消費者,客戶端角色。
要保證Producer和Consumer正常通信,主要是通過Broker來實現的,Broker既代理了Producer同時也代理了Consumer,這樣Broker才能知道哪些是生產者,哪些是消費者,不至于不同的消息被本不屬于它的消費者給消費了。Broker內部的機制我們下一節學習。
那么生產者和消費者如何實現通信的呢?ActiveMQ定義的連接器(connector)就是用來約定ActiveMQ的節點之間如何通信的。

1.2 連接器

ActiveMQ通過網絡連接器這種連接機制來實現客戶端與服務端之間的通信。ActiveMQ提供了兩種連接器:

  1. 傳輸連接器(transport connector):用于客戶端和服務端之間( client-to-broker)的通信。
  2. 網絡連接器(network connector):用戶集群中多個服務端之間(broker-to-broker)的通信。

1.3 傳輸連接器包括的協議

  • tcp,默認使用的協議,符合大多數的使用場景。
  • udp,客戶端使用udp協議和服務端通信,當客戶端和服務端之間存在防火墻可以考慮使用udp協議。
  • vm,當客戶端和服務端在同一個JVM中可以考慮使用。直接使用虛擬機本地方法調用,從而避免網絡通信的開銷。
  • nio,本質上還是tcp,只是使用了java NIO包,某些場景下可能性能更好。
  • ssl,基于tcp提供安全的通信。
  • http/https,允許客戶端使用REST或Ajax的方式進行連接,可以通過JS給ActiveMQ發送消息。
  • multicast,客戶端使用組播的方式連接到服務端。
  • websocket,可以通過HTML5中的websocket技術連接服務端。
  • amqp,高級消息隊列協議,很多消息中間件都支持該協議。ActiveMQ5.8版本開始支持。
  • mqtt,MQTT是一個基于客戶端-服務器的消息發布/訂閱傳輸協議,主要應用在loT(物聯網)。
  • stomp,STOMP是在WebSocket之上提供了一個基于幀的線路格式(frame-based wire format)層,用來定義消息的語義,就像HTTP在TCP套接字之上添加了請求-響應模型層一樣。ActiveMQ5.6版本開始支持

二、ActiveMQ斷線發生的場景

ActiveMQ的客戶端與ActiveMQ的Broker(消息代理)之間的網絡連接發生斷開,如果未采用高可用的配置,那么Producer無法向MQ中生產對象,同理Customer也無法消費MQ中的消息,整個業務就會出現暫停。
默認的情況下如果ActiveMQ服務正常,那么所有Client服務啟動,都會自動在Broker中進行注冊,這樣就能實現消息生產和消費。但是如果Client服務正常,ActiveMQ服務宕機了進行重啟或當網絡不穩定或出現故障導致連接斷開時,ActiveMQ是不會主動實現Client與Broker進行重連的,此時所有服務都正常,由于連接未建立,所以整個業務也無法實現消息的生產和消費。

三、ActiveMQ斷線重連的實現原理

ActiveMQ的斷線重連機制的實現原理主要是基于網絡通信和消息重試機制。當ActiveMQ的客戶端與Broker之間的連接斷開時,客戶端會檢測到這個事件,并嘗試重新建立連接。,ActiveMQ的客戶端會檢測到連接中斷事件,然后觸發一個重連機制。客戶端會嘗試重新連接到一個或多個Broker的URL。在默認情況下,如果連接斷開,客戶端會新起一個線程,不斷的從url參數中獲取一個url來重試連接。這種重試機制通常會進行一定的次數限制,以避免無限制的重試導致資源浪費或其他問題。

另外,在重連過程中,ActiveMQ的客戶端還會嘗試恢復之前未發送成功的消息。這個過程主要是通過持久化消息存儲來實現的。在連接斷開時,客戶端會將未發送成功的消息存儲到持久化存儲中,如數據庫或文件系統等。當客戶端成功連接到Broker后,會從持久化存儲中恢復這些消息,并進行重新發送。

四、ActiveMQ斷線重連的配置

ActiveMQ提供了客戶端和服務器端通信高可用的配置,

failover,為客戶端提供重連服務端的邏輯,允許配置多個上面介紹的不同協議的連接配置,并隨機的從其中選擇一個進行連接,如果失敗則繼續選擇其他服務重試。failover的配置格式:failover:(tcp://ip1:61616,tcp://ip2:61616)?initialReconnectDelay=100。

fanout,采用復制的方式將消息發送給多個服務端,配置格式為:fanout:(tcp://localhost:61629,udp://localhost:61639,tcp://localhost:61649)

4.1 failover

failover是一種ActiveMQ提供的失效轉移(也叫故障轉移)的策略。其原理是如果服務先連接到tcp://ip1:61616這個消息隊列,如果因為網絡抖動或其他意外情況導致ip1無法連接,failover會自動切換到ip2:61616這個消息隊列,實現了消息高可用,如果ip1的網路正常了,failover又會嘗試連接回來。但是這與斷線重連有什么關系呢?
經過我的驗證,如果只配置了一個ActiveMQ,如:failover:(tcp://ip1:61616)?initialReconnectDelay=100 當ip1上的ActiveMQ出現了問題,此時failover無法進行故障轉移,他就會在initialReconnectDelay定義的100毫秒后進行ip1的重連,從而導致Client與ip1的Broker重新建立鏈接,實現了斷線重連的功能。

4.2 fanout

采用復制的方式將消息發送給多個服務端,這里面雖然沒有斷線重連,但是實現了消息發送的高可用,這里面需要注意一點,如果Customer在沒有很好的處理消息的情況下,有可能Productor生產了一個消息,發送給多個消息隊列,Customer消費了多次消息,導致數據重復,所以需要注意Customer消息消費邏輯的冪等性。

4.3 自定義函數實現斷線重連

JMS提供了ExceptionListener接口用于偵聽JMS消息鏈接異常,以下是基于JMS的ExceptionListener接口實現的斷線自動重連的示例:

import java.util.Timer;
import java.util.TimerTask;import javax.jms.ExceptionListener;
import javax.jms.JMSException;/*** JMS 重連接實現<br>* 通過實現{@link ExceptionListener}接口偵聽連接異常,* 使用定時任務遲延執行重連接嘗試直至連接成功* @author guyadong* @since 2.3.8*/
class AutoReconnectAdapter implements ExceptionListener,JmsConstants{private static long START_RECONNECTDELAY = 1;/*** 用于執行自動重連的定時器對象*/private static final Timer reconnectTimer = new Timer("AMQP Reconnect"); /*** 定時重連的延遲時間(秒),從1秒開始,每次增加一倍,最大128*/private long reconnectDelay = START_RECONNECTDELAY; /*** 最大重連延遲時間*/private long maxReconnectDelay = 128;/*** 應用層實現的重連回調接口*/private final JMSReconnectCallback jmsReconnectCallback;public AutoReconnectAdapter(JMSReconnectCallback jmsReconnectCallback) {this.jmsReconnectCallback = jmsReconnectCallback;}@Overridepublic void onException(JMSException exception) {if(null != jmsReconnectCallback) {try {jmsReconnectCallback.onConnectionLost();scheduleReconnectCycle();} catch (Exception e) {logger.error(e.getMessage(),e);}}}/*** 嘗試將客戶端重新連接到服務器。如果成功,它將確保不再計劃重新連接。* 但是,如果連接失敗,延遲將增加一倍(最大128秒),并將在延遲后重新安排重新連接。*/private void attemptReconnect() {if(null != jmsReconnectCallback) {try {jmsReconnectCallback.tryReconnecting();// restore to default valuereconnectDelay = START_RECONNECTDELAY;}	catch (Exception e) {if(e instanceof JMSException || e.getCause() instanceof JMSException ) {reconnectDelay = Math.min(reconnectDelay*2, maxReconnectDelay);scheduleReconnectCycle();}else {logger.error(e.getMessage(),e);}}}}/*** 安排在{@link #reconnectDelay}指定的延遲時間后執行重連接嘗試*/private void scheduleReconnectCycle() {logger.info("{} Scheduling reconnect timer, delay {} seconds",jmsReconnectCallback.ownerName(), reconnectDelay);reconnectTimer.schedule(new TimerTask() {@Overridepublic void run() {attemptReconnect();}}, reconnectDelay*1000);}
}

為了適應應用層不同的重連接實現需要,通過定義JMSReconnectCallback接口,來讓斷連接和重連實現抽象化,應用層可以根據自己的需要,實現此接口,執行斷開連接和重連的動作

/*** JMS 重連機制回調接口* @author guyadong* @since 2.3.8*/
public interface JMSReconnectCallback{/*** 連接異常偵聽* @throws Exception */public void onConnectionLost() throws Exception;/*** 嘗試重連動作* @throws Exception 失敗拋出異常 JMSException 或異常原因(cause)為JMSException 則繼續異步執行重試*/public void tryReconnecting() throws Exception;/*** 返回當前接口對象所屬的模塊名,用于日志輸出*/public String ownerName();
}

完整代碼參見碼云倉庫:https://gitee.com/l0km/simplemq/blob/dev/simplemq-jms/src/main/java/gu/simplemq/jms/AutoReconnectAdapter.java

五、ActiveMQ斷線重連的優化策略

斷線重連如果過于頻繁,也會導致網絡與服務器的壓力,建議從以下幾點進行斷線重連的優化:

減少斷線重連的頻率, 適當提升斷線重連中間的延時
優化網絡質量, 比如Client與MQ都部署在內網
調整連接超時和重試次數
使用多線程處理連接失敗
使用負載均衡和集群提高可用性
優化消息恢復和持久化性能, 確保未被正常處理的消息有恢復機制
選擇合適的消息恢復策略
斷線重連的日志監控和告警機制
日志監控工具的選擇和使用
告警機制的建立和維護,檢測到斷線之后最好給運維人員發送一條消息

六、總結

消息隊列在大型項目的建設過程中被廣泛的應用,雖然能夠實現削峰異步處理,但是消息隊列的引入會增加服務運維的風險和成本。今天總結了Client與ActiveMQ出現了斷開鏈接如何實現自動重連的機制和原理,以及相關高可用配置方法。文章中如有錯誤,請多多指正,您的指正是我們共同進步的基石。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/209905.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/209905.shtml
英文地址,請注明出處:http://en.pswp.cn/news/209905.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

springboot2 在Java項目中你們是如何配置時間格式響應給前端呢

在 Spring Boot 2 項目中配置時間格式&#xff0c;通常可以通過配置文件&#xff08;application.properties 或 application.yml&#xff09;或者通過 Java 代碼進行配置。以下是兩種常見的配置方式&#xff1a; 1. 通過配置文件配置時間格式&#xff1a; 在 application.pr…

mybaties plus插入數據,自動回顯 機制

結論&#xff1a;mybaties plus會將庫里數據自動回顯到 要插入的數據上 測試表格 SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS 0;-- 表結構 DROP TABLE IF EXISTS t_stu; CREATE TABLE t_stu (id int NOT NULL COMMENT id,name varchar(255) CHARACTER SET utf8mb4 COLLATE…

【PyTorch】計算設備

文章目錄 1. 介紹2. 查詢和使用 1. 介紹 CPU設備意味著所有物理CPU和內存&#xff0c; 這意味著PyTorch的計算將嘗試使用所有CPU核心。可以用以下方式表示&#xff1a; torch.device(cpu) GPU設備只代表一個GPU和相應的顯存。 torch.device(cuda)如果有多個GPU&#xff0c;我們…

Java解決矩陣對角線元素的和問題

Java解決矩陣對角線元素的和問題 01 題目 給你一個正方形矩陣 mat&#xff0c;請你返回矩陣對角線元素的和。 請你返回在矩陣主對角線上的元素和副對角線上且不在主對角線上元素的和。 示例 1&#xff1a; 輸入&#xff1a;mat [[1,2,3],[4,5,6],[7,8,9]] 輸出&#xff1a…

為什么流量對店鋪轉化率重要?亞馬遜、速賣通等跨境賣家通過自養號測評提升店鋪轉化率

亞馬遜、速賣通等電商平臺賣家非常清楚流量對店鋪轉化率的重要性&#xff0c;測評補單在跨境電商賣家中扮演著重要的角色&#xff0c;是一種必要的運營手段之一。在追求更好的產品曝光和更高的轉化率時&#xff0c;Listing的排名是關鍵因素之一。而在各個平臺的Listing中&#…

正確使用AFX_MANAGE_STATE宏管理MFC模塊狀態, AFX_MANAGE_STATE宏作用,真的很重要!!!

簡介&#xff1a; 在使用 MFC&#xff08;Microsoft Foundation Classes&#xff09;開發 DLL&#xff08;動態鏈接庫&#xff09;時&#xff0c;正確管理 MFC 模塊狀態是確保功能正常運行的關鍵。本文將深入探討使用 AFX_MANAGE_STATE 宏的重要性&#xff0c;以及在 DLL 中正確…

連接Redis報錯解決方案

連接Redis報錯&解決方案 問題描述&#xff1a;Could not connect to Redis at 127.0.0.1:6379: 由于目標計算機積極拒絕&#xff0c;無法連接。 問題原因&#xff1a;redis啟動方式不正確 解決方案&#xff1a; 在redis根目錄下打開命令行窗口&#xff0c;輸入命令redi…

聽GPT 講Rust源代碼--src/tools(12)

File: rust/src/tools/rust-analyzer/crates/rust-analyzer/src/config.rs 在Rust源代碼中&#xff0c;rust/src/tools/rust-analyzer/crates/rust-analyzer/src/config.rs文件的作用是定義和解析rust-analyzer的配置文件。該文件包含了各種配置項的數據結構和枚舉類型&#xf…

MQTT主題、通配符和最佳實踐

MQTT主題在MQTT生態系統非常重要&#xff0c;因為代理&#xff08;broker&#xff09;依賴主題確定哪個客戶端接收指定的主題。本文我們將聚集MQTT主題、MQTT通配符&#xff0c;詳細討論使用它們的最佳實踐&#xff0c;也會探究SYS主題&#xff0c;提供給代理&#xff08;broke…

【npm | npm常用命令及鏡像設置】

npm常用命令及鏡像設置 概述常用命令對比本地安裝全局安裝--save &#xff08;或 -S&#xff09;--save-dev &#xff08;或 -D&#xff09; 鏡像設置設置鏡像方法切換回npm官方鏡像選擇鏡像源 主頁傳送門&#xff1a;&#x1f4c0; 傳送 概述 npm致力于讓 JavaScript 開發變得…

iOS——UIPickerView選擇器

UIPickerView UIPickerView是 iOS 開發中常用的用戶界面組件之一&#xff0c;用于在垂直方向上顯示一個滾動的列表&#xff0c;用戶可以通過滾動選擇其中的一項。 UIPickerView的協議方法 UIPickerView和UItableView差不多&#xff0c;UIPickerView也要設置代理和數據源。UI…

fl studio2024試用版本如何漢化中文?

fl studio2024全稱Fruity Loops Studio2024&#xff0c;這款軟件也被人們親切的稱之為水果&#xff0c;它是一款功能強大的音樂創作編輯軟件&#xff0c;擁有全功能的錄音室&#xff0c;大混音盤以及先進的音樂制作工具&#xff0c;用戶通過使用該軟件&#xff0c;就可以輕松制…

git上傳流程

git安裝網址&#xff1a;https://git-scm.com 如果您要將本地文件夾上傳到名為"compiling"的GitHub倉庫&#xff0c;可以按照以下步驟進行操作&#xff1a; 1.安裝無腦下一步 2.cd到想上傳的文件夾的上一級目錄 2.初始化Git倉庫&#xff1a;git init 設置分支&a…

C++特殊類設計

1.設計不能被拷貝的類 解析&#xff1a;拷貝只會放生在兩個場景中 拷貝構造函數賦值運算符重載 因此想要讓一個類禁止拷貝&#xff0c; 就需讓該類不能調用“拷貝構造函數”以及“賦值運算符重載”&#xff0c;而C11提供的delete重載關鍵字可以讓這件事情變得更加簡單。 1.1.C9…

stl庫之list鏈表與例題

stl中的list是雙向鏈表&#xff0c;優點在于插入/刪除元素方便&#xff0c;缺點是隨機訪問元素時間長 所需頭文件&#xff1a;#include <list> 初始化 list<類型名> 變量名 定義一個int類型的變量a list<int> a; 在末尾插入元素 a.push_back(i); 在開…

LeetCode 每日一題 Day 8 || 簡單枚舉

2048. 下一個更大的數值平衡數 如果整數 x 滿足&#xff1a;對于每個數位 d &#xff0c;這個數位 恰好 在 x 中出現 d 次。那么整數 x 就是一個 數值平衡數 。 給你一個整數 n &#xff0c;請你返回 嚴格大于 n 的 最小數值平衡數 。 示例 1&#xff1a; 輸入&#xff1a;n …

Error: Cannot find module ‘@npmcli/config‘ 最新解決辦法

看了網上許多這個問題的小伙伴&#xff0c;都是降級node版本來解決的。但是降級并不是我想要的結果。 真正的解決辦法就是更新nvm&#xff0c;將你的nvm升級到最新版本&#xff0c;然后卸載掉npm報錯的node版本&#xff0c;重新安裝即可使用。 解決辦法&#xff1a;更新nvm nv…

2020年第九屆數學建模國際賽小美賽B題血氧飽和度的變異性解題全過程文檔及程序

2020年第九屆數學建模國際賽小美賽 B題 血氧飽和度的變異性 原題再現&#xff1a; 脈搏血氧飽和度是監測患者血氧飽和度的常規方法。在連續監測期間&#xff0c;我們希望能夠使用模型描述血氧飽和度的模式。 ??我們有36名受試者的數據&#xff0c;每個受試者以1 Hz的頻率連…

【開源視頻聯動物聯網平臺】J2mod庫寫一個Modbus RTU 服務器

J2Mod是一個Java編寫的Modbus通信庫&#xff0c;可以用于實現Modbus RTU服務器。以下是一個簡單的示例&#xff0c;演示如何使用J2Mod庫創建一個Modbus RTU服務器&#xff1a; 添加J2Mod庫依賴項&#xff1a; 首先&#xff0c;確保在項目中包含J2Mod庫。你可以將J2Mod庫添加到…

CSPNet: A New Backbone that can Enhance Learning Capability of CNN(2019)

文章目錄 -Abstract1 Introduction2 Related workformer work 3 Method3.1 Cross Stage Partial Network3.2 Exact Fusion Model 4 Experiments5 Conclusion 原文鏈接 源代碼 - 梯度信息重用&#xff08;有別于冗余的梯度信息&#xff09;可以減少計算量和內存占用提高效率&am…