事務設置和消息分發

事務

RabbitMQ是基于AMQP協議實現的,該協議實現了事務機制,因此RabbitMQ也支持事務機制.

SpringAMQP也提供了對事務相關的操作,RabbitMQ事務允許開發者確保消息的發送和接收是原子性的,要么
全部成功,要么全部失敗.|

前期準備工作:

    //事務public static final String TRANS_QUEUE = "TRANS_QUEUE";public static final String TRANS_EXCHANGE = "TRANS_EXCHANGE";public static final String TRANS_KEY = "TRANS_KEY";
    //事務@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(MQConstants.TRANS_QUEUE).build();}@Bean("transExchange")public Exchange transExchange() {return ExchangeBuilder.directExchange(MQConstants.TRANS_EXCHANGE).build();}@Bean("transBinding")public Binding transBinding(@Qualifier("transExchange") Exchange exchange, @Qualifier("transQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.TRANS_KEY).noargs();}

配置事務管理器

@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}

添加 @Transactional

如果不添加 @Transactional,我們的事務管理器是不會在這個代碼上生效的

    @Transactional@RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans1 ");int n = 10 / 0;rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans2 ");return "消息發送成功";}

經過觀察,我們可以看到這兩條消息要么同時發送成功,要么同時發送失敗

消息分發

RabbitMQ隊列擁有多個消費者時,隊列會把收到的消息分派給不同的消費者,每條消息只會發送給訂閱列表里的一個消費者,這種方式非常適合擴展,如果現在負載加重,那么只需要創建更多的消費者來消費處理消息即可。

默認情況下,RabbitMQ是以輪詢的方法進行分發的,而不管消費者是否已經收到消費并已經確認了消息,這種方式是不太合理的,試想一下,如果某些消費者消費速度慢,而某些消費者消費速度快,就可能會導致某些消費者消息積壓,某些消費者空閑,進而應用整體的吞吐量下降。

如何處理呢?我們可以使用前面章節講到的**channel.basicQos(intprefetchCount)**方法,來限制當前信道上的消費者所能保持的最大未確認消息的數量比如:消費端調用了channelbasicQos(5),RabbitMQ會為該消費者數,發送一條消息計數+1,消費一條消息計數-1,當達到了設定的上限,RabbitMQ就不會再向它發送消息了,直到消費者確認了某條消息。

類似TCP/IP中的"滑動窗口".
prefetchCount設置為0時表示沒有上限。
basicQos對拉模式的消費無效

限流和負載均衡

配置信息:

    listener:simple:acknowledge-mode: manual # 設置確認模式prefetch: 5 

前期準備:

常量類:

    //限流public static final String QOS_QUEUE = "QOS_QUEUE";public static final String QOS_EXCHANGE = "QOS_EXCHANGE";public static final String QOS_KEY = "QOS_KEY";

聲明:

    //限流@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(MQConstants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(MQConstants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.QOS_KEY).noargs();}

生產者:

    @RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.QOS_EXCHANGE, MQConstants.QOS_KEY, "qos");}return "消息發送成功";}

如果我們不進行手動確認,觀察最大未確認的消息接收量:

在這里插入圖片描述
在這里插入圖片描述

可以得知消費者最大接收到的未確認消息數量為 我們設置的 prefetch 值


我們可以通過限流這種方式實現負載均衡

@Component
public class QosListener {@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle1(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費消息:" + messageContent);Thread.sleep(10000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle2(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費消息:" + messageContent);Thread.sleep(5000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

可以觀察到消費能力強的隊列會持續消費消息,消費能力弱的隊列消費的消息會相對較少

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/96111.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/96111.shtml
英文地址,請注明出處:http://en.pswp.cn/web/96111.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python 中 try / except / else / finally 異常處理詳解

1. 基本結構 try:# 可能會拋出異常的代碼 except SomeException as e:# 捕獲并處理異常 else:# 如果 try 中代碼沒有異常&#xff0c;就執行這里 finally:# 無論是否發生異常&#xff0c;最后都會執行這里2. 各部分的作用 try 用途&#xff1a;包含可能發生異常的代碼段。如果代…

冰火島 Tech 傳:Apple Foundation Models 心法解密(下集)

引子 上集說到冰火島冰屋內,謝遜、張翠山、殷素素三人親見 “指令(Instructions)” 如何讓 AI 脫胎換骨,從木訥報地名的 “愣頭青”,變身為文采斐然的 “旅行作家”。 正當素素驚嘆這 AI 武學的奇妙時,謝遜卻突然神色一凜,指著手腕上用冰屑刻的 “4096” 字樣道:“這等…

Qt信號與槽機制全面解析

? 1. 核心概念信號與槽是Qt獨創的一種對象間通信機制&#xff0c;它使得一個對象的狀態變化或事件發生能夠自動通知其他對象作出響應&#xff0c;從而實現高度解耦的代碼設計。1.1 信號&#xff08;Signals&#xff09;定義&#xff1a;信號是由對象在特定事件發生時發出&…

2025年COR SCI2區,基于近似細胞分解的能源高效無人機路徑規劃問題用于地質災害監測,深度解析+性能實測

目錄1.摘要2.問題描述與數學模型3.能源網格混合元啟發式算法4.結果展示5.參考文獻6.代碼獲取7.算法輔導應用定制讀者交流1.摘要 本文提出了一種能源高效的無人機路徑規劃方法&#xff08;EURP&#xff09;用于監測分散的地質災害易發區域&#xff0c;通過建立無人機飛行模式的…

LangChain關于提示詞的幾種寫法

上文中 <---->(可點擊移步查閱),我們已經了解到LangChain如何編寫調用大模型的一個基本框架, 在編寫完這個基本框架后,就要與大模型進行交互了,這時候,就需要我們編寫提示詞了, 那么以下,就會介紹提示詞的幾種編寫方式 1 可攜帶變量的提示詞 1.1 PromptTempla…

CyberPoC 是一個現代化的網絡安全練習和競賽平臺,支持容器化部署的安全挑戰,為用戶提供實踐網絡安全技能的環境。

工具介紹 CyberPoC 是一個現代化的網絡安全練習和競賽平臺&#xff0c;支持容器化部署的安全挑戰&#xff0c;為用戶提供實踐網絡安全技能的環境。 &#x1f680; 快速開始 使用 Docker Compose (推薦) 克隆項目 git clone https://github.com/dushixiang/cyberpoc.gitcd cy…

【單片機day03】

UART串口通用異步收發器&#xff0c;2個串口(1個串口被用于ISP下載程序&#xff0c;1個串口被用于和主機之間的通信)全雙工&#xff08;兩根線TX、RX&#xff09;、串行、異步通信方式&#xff1a;單工&#xff1a;發送方和接收方是固定的&#xff0c;數據只能從發送方 發給 …

react16到react19更新及底層實現是什么以及區別

好問題 &#x1f44d;。React 16 → 19 的更新跨度其實涵蓋了 React 的幾個“時代級”的演進&#xff1a;從 Fiber 架構 的重寫、到 并發特性 的引入&#xff0c;再到 React Server Components 和 現代語法支持。我幫你梳理下主要更新點和底層實現的區別&#xff0c;方便你系統…

【分享】基于百度腦圖,并使用Vue二次開發的用例腦圖編輯器組件

偶然間發現了這個項目&#xff0c;分享出來。 【分享】基于百度腦圖&#xff0c;并使用Vue二次開發的用例腦圖編輯器組件1 項目地址2 項目簡介3 項目部署3.1 安裝node和npm3.2 項目下載3.3 修改npm鏡像源3.4 部署4 項目中使用1 項目地址 基于百度腦圖&#xff0c;并使用Vue二次…

Kotlin中抽象類和開放類

抽象類 (Abstract Class) 定義和特點 抽象類使用 abstract 關鍵字聲明&#xff0c;是一種不能被直接實例化的特殊類&#xff0c;主要用于被其他類繼承。 abstract class Base {open fun f() {} }abstract class Derived : Base() {override abstract fun f() // 抽象成員在類中…

TensorFlow深度學習實戰(37)——深度學習的數學原理

TensorFlow深度學習實戰&#xff08;37&#xff09;——深度學習的數學原理0. 前言1. 反向傳播歷史2. 微積分相關概念2.1 向量2.2 導數和梯度2.3 梯度下降2.4 鏈式法則2.5 常用求導公式2.6 矩陣運算3. 激活函數4. 反向傳播4.1 前向計算4.2 反向傳播5. 交叉熵及其導數6. 批量梯度…

1.1 汽車運行滾動阻力

汽車運行阻力由4部分構成&#xff1a;滾動阻力、空氣阻力、坡度阻力、加速阻力。 1).汽車在水平道路上等速行駛時&#xff0c;必須克服來自地面的滾動阻力和來自空氣的空氣阻力。 2). 當汽車在坡道上上坡行駛時&#xff0c;還必須克服重力沿坡道的分力&#xff0c;稱為坡度阻…

e203000

1&#xff09;①BIU作為核心通信樞紐&#xff0c;主要承擔兩大功能&#xff1a;一是連接處理器核內的關鍵執行單元&#xff08;包括IFU、LSU和EAI協處理器&#xff09;&#xff0c;統一管理指令和數據的內部傳輸路徑&#xff1b;二是作為"核內計算"與"核外資源&…

Infortrend普安科技IEC私有云平臺VM解決方案

Infortrend企業云&#xff08;IEC&#xff09;內置Hypervisor運行VM。功能完整、無需額外付費。在本文中&#xff0c;我們將為您詳細介紹IEC是如何支持 VM的。市場現狀與挑戰市場現狀 虛擬化市場面臨轉型&#xff0c;主流廠商&#xff08;如 VMware&#xff09;改用訂閱制…

【代碼隨想錄算法訓練營——Day6(Day5周日休息)】哈希表——242.有效的字母異位詞、349.兩個數組的交集、202.快樂數、1.兩數之和

LeetCode題目鏈接 https://leetcode.cn/problems/valid-anagram/ https://leetcode.cn/problems/intersection-of-two-arrays/ https://leetcode.cn/problems/happy-number/ https://leetcode.cn/problems/two-sum/ 題解 242.有效的字母異位詞 這道題要想到用哈希表來做。同時注…

安科瑞基站智慧運維云平臺:安全管控與節能降耗雙效賦能

功能&#xff1a;基站智慧用電云平臺通過對5G宏站和室分站點加裝交/直流智能監控設備、無線采集設備以及系統管理平臺&#xff0c;完成夜間無業務時段的下電操作&#xff0c;減少電能消耗&#xff0c;降低運營成本支出&#xff0c;以及提升通信設備供電線路狀態的實時監測保護功…

處理省市區excel數據加工成SQL

原始數據相關內容鏈接 處理excel數據加工成SQL的腳本 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Excel行政區域數據轉SQL腳本 - 支持特殊行政單位處理&#xff08;如省直轄縣級行政單位&#xff09; - 支持批量處理 """import pand…

雙碳目標下的24小時分時綜合能源系統低碳優化調度:基于 Matlab/YALMIP/CPLEX的方法與仿真

在“雙碳”戰略目標的推動下&#xff0c;綜合能源系統&#xff08;Integrated Energy System, IES&#xff09;已成為實現能源結構優化與碳排放控制的重要途徑。本文以光伏、風電、燃氣—電熱聯產&#xff08;CHP&#xff09;、燃氣鍋爐、電鍋爐、電儲能以及碳捕集&#xff08;…

TDengine 選擇函數 Last() 用戶手冊

LAST() 函數用戶手冊 函數定義 LAST(expr)功能說明 LAST() 函數統計表/超級表中某列的值最后寫入的非 NULL 值&#xff0c;即返回時間戳最大的非 NULL 值。 版本要求 最低版本: v3.0.0.0 返回值 數據類型: 同應用的字段返回內容: 時間戳最大的非 NULL 值及其對應的時間戳…

< 自用文 學習 > 用 Claude Code 做一個日歷

環境&#xff1a; OS: Windows 11 IDE&#xff1a;TREA Model: Sonnet / Qwen (免費 Token 用完) 參考&#xff1a; Claude Code Beginner Guide – Get Started in 20 Minutes (2025) by Alex Finn 油管博客 https://www.youtube.com/watch?viYiuzAsWnHU&listTLGG1L…