rabbitmq動態創建交換機、隊列、動態綁定,銷毀

 // 緩存已創建的綁定,避免重復聲明private final Map<String, Date> createdBindings = new ConcurrentHashMap<>(); 
public void createAndBindQueueToExchange(String type,String clinetId,  String routingKey) {String   queueName = routingKey;log.info("初始化類型:{}",type);QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);if(queueInformation == null) {//隊列不存在則創建Map<String, Object> args = new HashMap<>();args.put("x-max-priority", maxPriority); // 設置優先級范圍Queue queue = new Queue(queueName, true, false, false, args);rabbitAdmin.declareQueue(queue);log.info("創建隊列: {} ", queueName);}else{log.info("隊列已存在: {} ", queueName);}String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;//還要判斷監聽容器是否存在if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {log.info("綁定已存在緩存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));createdBindings.put(containerKey,new Date());}else{//stopContainerListenerAndCleanCash(containerKey,"緩存 無Key(有無監聽容器)或(有緩存Key無監聽容器)");// 2. 聲明綁定到已存在的交換機Binding binding = new Binding(queueName,Binding.DestinationType.QUEUE,mainDirectExchange,routingKey,null);rabbitAdmin.declareBinding(binding);// 添加到緩存createdBindings.put(containerKey,new Date());log.info("成功創建綁定 for queue: {} to exchange: {} with routing key: {}",queueName, mainDirectExchange, routingKey);}// 3. 注冊監聽器if (!registry.getListenerContainerIds().contains(containerKey)) {SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();endpoint.setId(containerKey);endpoint.setQueueNames(queueName);//endpoint.setAutoStartup(true);// 使用手動ACK的消息監聽器endpoint.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//處理消息完成后,不退回為trueboolean okMessage = false;String consumerTag = "";Exception exception = null;FlowInstanceNode flowInstanceNode = null;try {consumerTag = message.getMessageProperties().getConsumerTag();String clientId = consumerTag.split(":")[3];String messageBody = new String(message.getBody());flowInstanceNode = processMessage(clientId,messageBody);if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {//消費正常channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);okMessage = true;}} catch (Exception e) {log.error("指令包消息處理失敗: {}", truncateString(new String(message.getBody())), e);exception = e;}finally {// 根據異常類型決定是否重新入隊// 網絡異常的情況都應返回隊列重新消費boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);log.info("指令執行包異常標志exceptionFlag: {}",exceptionFlag);if (exceptionFlag) {okMessage = false;// 消息重新入隊channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.warn("指令包消息處理失敗,已重新入隊: {}", truncateString(new String(message.getBody())));} else {// 消息丟棄(不重新入隊)if(!okMessage){channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);log.error("指令包消息處理失敗,已丟棄: {}", truncateString(new String(message.getBody())));}okMessage = true;}if (okMessage) {// 消費消息序列ID寫入緩存Long sequence = flowInstanceNode.getSequence();String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();String sequenceKey = professionSoftwareCode+":message:finishedSequence";cacheService.set(sequenceKey, sequence);}String exceptionType =  flowInstanceNode.getExceptionType();//網絡異常if(exceptionFlag){//調用專業軟件API網絡異常//停止監聽stopContainerListenerAndCleanCash(consumerTag,"網絡異常");//數據返回隊列}//判斷心跳是否超時stopContainer(consumerTag,"每次檢查心跳超時");log.info("本次消費執行完畢");}}});registry.registerListenerContainer(endpoint, factory, false);SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return containerKey;}});simpleMessageListenerContainer.start();// 設置監聽器容器log.info("設置監聽器容器并啟動監聽 queue: {},containerKey :{}" , queueName,containerKey);}else {// 啟動已存在的監聽器if (!registry.getListenerContainer(containerKey).isRunning()) {//啟動監聽registry.getListenerContainer(containerKey).start();log.info("啟動監聽  queue: {},containerKey:{}" , queueName,containerKey);} else {log.info("監聽已運行 queue: {},containerKey:{}" , queueName,containerKey);}}}

動態銷毀:

   public void stopContainerListenerAndCleanCash(String containerKey,String tip) {try {log.info(tip+",清除緩存,停止監聽容器:{}", containerKey);if (registry.getListenerContainerIds().contains(containerKey)) {registry.getListenerContainer(containerKey).stop();registry.unregisterListenerContainer(containerKey);}// 刪除隊列// rabbitAdmin.deleteQueue(queueName);// 清理該隊列相關的綁定緩存createdBindings.remove(containerKey);}catch (Exception e){log.error("清除緩存,停止監聽容器異常",e);}}

容器事件監聽:

//容器異常

@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@AutowiredRabbitMQService rabbitMQService;@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {Throwable t = event.getThrowable();Object object = event.getSource();if (object instanceof SimpleMessageListenerContainer){SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;String queueName = container.getQueueNames()[0];String listenerId = container.getListenerId();rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器異常");}log.error("RabbitMQ監聽容器異常", t);// 這里可以判斷異常類型,比如隊列不存在、連接斷開等if (t instanceof ShutdownSignalException) {// 處理隊列被刪除、服務失聯等}}
}

//

    @PostConstructpublic void init() {//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//factory.setReceiveTimeout();//factory.setReceiveTimeout(1L);factory.setContainerCustomizer(container -> {// 設置消費者超時時間(需RabbitMQ服務端支持)container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));});factory.setErrorHandler(t -> {// 這里可以捕獲到消息處理時的異常log.error("RabbitMQ消息處理異常", t);// 可以根據異常類型做不同處理});connectionFactory.addConnectionListener(new ConnectionListener() {@Overridepublic void onClose(Connection connection) {stopAllContainerListeners();log.warn("RabbitMQ連接關閉");}@Overridepublic void onCreate(Connection connection) {stopAllContainerListeners();log.info("RabbitMQ連接創建");}@Overridepublic void onShutDown(ShutdownSignalException signal) {stopAllContainerListeners();log.error("RabbitMQ連接異常關閉", signal);}});}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87039.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87039.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87039.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

云效代碼倉庫導入自建gitlab中

登錄自建GitLab 在瀏覽器中輸入GitLab訪問地址http://192.168.1.111:81/users/sign_in&#xff0c;輸入賬號和密碼登錄GitLab服務&#xff0c;如下圖&#xff1a; 新建一個空的代碼庫 按照以下截圖順序&#xff0c;創建一個新的空項目&#xff0c;如下&#xff1a; 克隆鏡像 …

業界優秀的零信任安全管理系統產品介紹

騰訊 iOA 零信任安全管理系統 簡介&#xff1a;騰訊 iOA 零信任安全管理系統是騰訊終端安全團隊針對企業安全上云和數字化轉型&#xff0c;提供的企業網絡邊界處的應用訪問管控系統&#xff0c;為企業應用提供統一、安全、高效的訪問入口&#xff0c;同時提供終端安全加固、軟…

從設計到開發一個小程序頁面

巧婦難為無米之炊&#xff0c;想寫功能但是沒有好看的設計&#xff0c;邊寫邊設計效率又不夠高。mastergoAi生成的頁面又不夠好看&#xff0c;而且每月給的免費積分用得又超快&#xff0c;so決定自給自足。能有多難&#xff0c;先做&#xff0c;做了再改。 于是決定踏足設計&a…

Linux系統 / Ubuntu虛擬機 安裝DHCP服務

一、安裝DHCP服務 xxx:~$ sudo apt install isc-dhcp-server 正在讀取軟件包列表... 完成 正在分析軟件包的依賴關系樹 正在讀取狀態信息... 完成 將會同時安裝下列軟件&#xff1a; libirs-export161 libisccfg-export163 建議安裝&#xff1a; isc-dhcp-s…

Spring中 BeanFactory和FactoryBean分別是什么?

Spring 中 BeanFactory 是什么? BeanFactory其實就是IoC的底層容器&#xff0c;它本身只是一個接口&#xff0c;顧名思義Bean工廠&#xff0c;定義了Spring的基本功能框架&#xff0c;主要功能就是 負責從配置源中讀取 Bean 的定義&#xff0c;并創建、管理這些 Bean 的生命周…

langchain從入門到精通(三十二)——RAG優化策略(八)自查詢檢索器實現動態數據過濾

1. 查詢構建與自查詢檢索器 在 RAG 應用開發中&#xff0c;檢索外部數據時&#xff0c;前面的優化案例中&#xff0c;無論是生成的 子查詢、問題分解、生成假設性文檔&#xff0c;最后在執行檢索的時候使用的都是固定的篩選條件&#xff08;沒有附加過濾的相似性搜索&#xff…

面向安全產品測試的靜態混淆型 Shellcode Loader 設計與對抗分析

github 地址&#xff1a;https://github.com/LilDean17/ShellcodeLoader2025 一、項目背景 近年來&#xff0c;隨著 C2 框架廣泛應用于安全對抗模擬&#xff0c;各大安全廠商也不斷提升其檢測能力&#xff0c;那么安全廠商自研的安全軟件&#xff0c;是否能有效防御此類威脅&…

深度強化學習DRL——策略學習

一、策略網絡 策略函數 π \pi π的輸入是狀態 s s s和動作 a a a&#xff0c;輸出是一個介于0和1之間的概率值&#xff0c;用神經網絡 π ( a ∣ s ; θ ) \pi(a \mid s; \boldsymbol{\theta}) π(a∣s;θ)近似策略函數 π ( a ∣ s ) \pi(a\mid s) π(a∣s)&#xff0c; θ …

ISP Pipeline(5): Auto White Balance Gain Control (AWB) 自動白平衡

G_gain 1.0 # 常作為參考通道 R_gain G_avg / R_avg B_gain G_avg / B_avgAuto White Balance Gain Control&#xff08;AWB&#xff09;自動調整圖像中紅色、綠色、藍色通道的增益&#xff0c;使圖像中灰白區域的顏色看起來為“中性白”或“灰白”&#xff0c;從而矯正因光…

Python中鉤子函數的實現方式

在Python中&#xff0c;鉤子函數(Hook)是一種允許你在程序執行的特定點插入自定義代碼的技術。它本質上是一種回調機制&#xff0c;當特定事件發生時自動調用預先注冊的函數。 Python中鉤子函數的實現方式 Python中實現鉤子主要有以下幾種方式&#xff1a; ?回調函數?&…

【RTSP從零實踐】3、實現最簡單的傳輸H264的RTSP服務器

&#x1f601;博客主頁&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客內容&#x1f911;&#xff1a;&#x1f36d;嵌入式開發、Linux、C語言、C、數據結構、音視頻&#x1f36d; &#x1f923;本文內容&#x1f923;&a…

零開始git使用教程-傳html文件

1. 準備工作 (1) 確保你已經安裝&#xff1a; Visual Studio (VS)&#xff08;任何版本&#xff0c;社區版也行&#xff09; Git&#xff08;去官網 git-scm.com 下載安裝&#xff09; (2) 注冊 Gitee/GitHub 賬號 國內推薦 Gitee&#xff08;碼云&#xff09;&#xff1a;…

CPT204-Advanced OO Programming: Lists, Stacks, Queues, and Priority Queues

目錄 1.Java 集合框架層次結構Java Collection Framework hierarchy 1.1Java 集合框架描述&#xff1a; 1.2數據結構Data structures 1.3 Java 集合框架支持兩種類型的容器&#xff08;數據結構&#xff09;&#xff1a; 1.4 Java 集合框架的設計 2.Collection 2.1 Coll…

【網絡安全】Mysql注入中鎖機制

前言 在sql注入的延時注入中&#xff0c;常見的函數有sleep()直接延時、BENCHMARK()通過讓數據庫進行大量的計算而達到延時的效果、笛卡爾積、正則匹配等&#xff0c;但還有一個常常被忽略的函數&#xff0c;也就是Mysql中的鎖機制。雖然早些年就已經出現過相關的技術文章&…

博途多重背景、參數實例

1&#xff1a;我們在博途中先新建一個工程&#xff0c;并且建立一個FB塊名字為motor_fb&#xff0c;同樣建立一個FC塊名字為MOTOR_FC&#xff0c;里面寫上我們電機程序里常用的邏輯控制。二者程序內容相同。下面是motor_fb塊的程序截圖: 2:我們再新建一個FB塊&#xff0c;名字為…

運維的利器–監控–zabbix–第三步:配置zabbix–中間件–Tomcat–步驟+驗證

&#x1f3e0;個人主頁&#xff1a;fo安方的博客? &#x1f482;個人簡歷&#xff1a;大家好&#xff0c;我是fo安方&#xff0c;目前中南大學MBA在讀&#xff0c;也考取過HCIE Cloud Computing、CCIE Security、PMP、CISP、RHCE、CCNP RS、PEST 3等證書。&#x1f433; &…

大模型在重癥哮喘手術全流程風險預測與治療方案制定中的應用研究

目錄 一、引言 1.1 研究背景與意義 1.2 研究目標與方法 1.3 研究創新點 二、重癥哮喘概述 2.1 定義與發病機制 2.2 分類與臨床表現 2.3 診斷標準與方法 三、大模型技術原理與應用現狀 3.1 大模型的基本原理 3.2 在醫療領域的應用案例分析 3.3 適用于重癥哮喘預測的…

Webpack的插件機制Tapable

Tapable 是一個輕量級的庫&#xff0c;用于創建和管理插件鉤子&#xff08;hooks&#xff09;&#xff0c;它在 Webpack 中廣泛應用&#xff0c;用于實現插件系統。Tapable 提供了一種機制&#xff0c;允許插件在特定的生命周期階段插入自定義邏輯&#xff0c;從而擴展應用程序…

FRONT歸因-兩階段訓練流程

FRONT, Fine-Grained Grounded Citations歸因 FRONT歸因&#xff0c;首先從檢索到的源文檔中選擇支持性引用&#xff0c;然后基于這些引用指導生成過程&#xff0c;確保生成回答有據可依&#xff0c;引用準確無誤。 FRONT的特色在于兩階段歸因訓練&#xff0c;要點如下: 階…

單端轉差分放大器AD8138

根據 AD8138 的數據手冊特性及參數&#xff0c;可以實現單端 5Vpp&#xff08;偏置 0V&#xff09;正弦波轉差分 5Vpp&#xff08;共模 2.5V&#xff09;的功能&#xff0c;但需注意以下細節&#xff1a; 1. 信號幅度匹配性 輸入信號&#xff1a;單端 5Vpp&#xff08;峰峰值…