RabbitMQ 4.1.1-Local random exchange體驗

Local Random Exchange

一種 RabbitMQ 4.0+ 引入的新型交換機,主要是為 request-reply(RPC)場景 設計的。

  • 使用這種交換機時,消息只會被路由到本地節點上的隊列,可以確保極低的消息發布延遲。
  • 如果有多個本地隊列綁定到該交換機,它會隨機選擇一個隊列接收消息。

關鍵點總結:

  • 本地傳輸:不會把消息發到其他節點的隊列。
  • 隨機選隊列:多個本地隊列中隨機挑一個。
  • 發布快:避免了跨節點網絡通信,延遲低。
  • 最適合用于 RPC 模式,即“請求-響應”

建議將 Local Random Exchange 和 exclusive(獨占)隊列搭配使用,這樣可以為 RPC 場景提供更低延遲的組合。
注意

  • Exclusive 隊列是 RabbitMQ 中只對某個連接開放的臨時隊列(通常用于響應)。
  • LRE + Exclusive Queue,可以避免消息在集群中轉發,提高響應速度。

LRE 不轉發消息到其他節點,所以如果當前節點沒有合適的隊列,消息會被直接丟棄!
所以使用時你必須確保每個節點上都至少有一個消費者綁定的隊列

在這里插入圖片描述

在 RabbitMQ 前面加負載均衡器(load balancer)會讓這種交換機類型幾乎無法正常工作。

原因分析

  • Local Random Exchange 依賴于消息被投遞到“本地綁定隊列(local queues)”
  • 如果用了負載均衡,客戶端連接可能隨機落在任何節點上,消息將發給該節點的本地隊列
  • 如果該節點上沒有消費者綁定本地隊列,消息就會被丟棄

實操如下
application.properties

# JVM內存配置
# 設置較小的堆內存,避免占用過多系統資源
spring.jvm.memory=-Xmx256m -Xms128m -XX:MaxMetaspaceSize=128m# 設置較小的線程棧大小
spring.jvm.thread-stack-size=-Xss256k# 啟用GC日志,幫助診斷內存問題
spring.jvm.gc-log=-Xlog:gc*:file=./logs/gc.log:time,uptime,level,tags:filecount=5,filesize=10m# 設置較小的代碼緩存大小
spring.jvm.code-cache=-XX:ReservedCodeCacheSize=128m# 啟用內存壓縮指針基址設置,將Java堆放在4GB以上地址空間
spring.jvm.heap-base=-XX:HeapBaseMinAddress=4g# 啟用G1垃圾收集器的更積極設置
spring.jvm.gc-tuning=-XX:G1ReservePercent=10 -XX:G1HeapRegionSize=4m -XX:InitiatingHeapOccupancyPercent=30# 禁用顯式GC調用
spring.jvm.disable-explicit-gc=-XX:+DisableExplicitGC

application.yml

#定義要使用的交換機和隊列名稱
spring:application:name: local-random-exchange#配置連接 rabbitmq服務器rabbitmq:#mq服務器的iphost: 127.0.0.1#訪問端口號port: 5672#用戶名稱username: admin#密碼password: 123456#虛擬主機virtual-host: my-virtual-host# 連接超時時間connection-timeout: 10000# 日志配置
logging:level:org.springframework.amqp: INFO     # AMQP日志級別com.example: DEBUG                 # 應用日志級別
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ 配置類,用于創建 Local Random Exchange(本地隨機交換機)和綁定的 RPC 隊列。* 本配置主要用于實現基于 RabbitMQ 的 RPC 模式,使用 Local Random Exchange 類型降低延遲。*/
@Configuration
public class RabbitConfig {// Local Random Exchange 名稱(自定義交換機)public static final String LRE_EXCHANGE = "lre.exchange";// RPC 使用的隊列名稱public static final String RPC_QUEUE_NAME = "rpc.queue";/*** 聲明一個 Local Random Exchange(x-local-random 類型的交換機)。** - durable: true 表示交換機會持久化* - autoDelete: false 表示不會在沒有綁定隊列時自動刪除* - arguments: 可選參數,此處為空*/@Beanpublic CustomExchange lreExchange() {Map<String, Object> args = new HashMap<>();return new CustomExchange(LRE_EXCHANGE, "x-local-random", true, false, args);}/*** 聲明一個 RPC 隊列。** - durable: false 表示不持久化(重啟后丟失)* - exclusive: false 表示不是只被當前連接獨占* - autoDelete: true 表示連接斷開后自動刪除隊列** 如果你要模擬 RPC Client 的 exclusive 回調隊列,建議用 `exclusive = true`。*/@Beanpublic Queue rpcQueue() {return new Queue(RPC_QUEUE_NAME, false, false, true);}/*** 將 RPC 隊列綁定到 Local Random Exchange 上。** - routingKey 設置為 "",因為 Local Random Exchange 不關心路由鍵*/@Beanpublic Binding binding(Queue rpcQueue, CustomExchange lreExchange) {return BindingBuilder.bind(rpcQueue).to(lreExchange).with("").noargs();}
}

方式一、手動監聽

package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** 模擬 RPC 客戶端,用于通過 RabbitMQ 的 Local Random Exchange 發送請求并接收異步響應。*/
@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 向服務器發送請求,并設置回調隊列接收響應。** @param message 請求消息內容* @return 返回一個確認字符串(實際響應在回調中處理)*/public String sendRequest(String message) throws Exception {// 生成唯一標識 correlationId,用于標識請求-響應配對String correlationId = UUID.randomUUID().toString();// 臨時生成一個獨占的匿名回調隊列(例如 amq.gen-xxxxxx)String replyQueue = rabbitTemplate.execute(channel -> channel.queueDeclare().getQueue());// 設置 RabbitTemplate 的回調地址(其實不會生效于 send 模式,僅用于演示)rabbitTemplate.setReplyAddress(replyQueue);rabbitTemplate.setReplyTimeout(5000); // 設置超時時間(ms)rabbitTemplate.setCorrelationKey("correlation_id"); // 設置用于匹配的屬性名(可選)// 設置監聽器容器,監聽回調隊列中的響應消息SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());container.setQueueNames(replyQueue); // 指定監聽的隊列container.setMessageListener(new MessageListenerAdapter(new Object() {// 定義接收到消息后的處理方法(方法名必須與監聽器默認匹配或顯式指定)@SuppressWarnings("unused")public void handleMessage(byte[] reply) {String response = new String(reply, StandardCharsets.UTF_8);System.out.println("Got reply: " + response);// 實際中這里應喚醒等待線程或放入響應Map中(基于 correlationId)}}));container.start(); // 啟動監聽器容器// 構造請求消息,設置 reply_to 和 correlation_id 屬性MessageProperties props = new MessageProperties();props.setReplyTo(replyQueue);             // 告訴服務端響應要發到這個隊列props.setCorrelationId(correlationId);    // 服務端會原樣返回,用于客戶端識別對應響應Message request = new Message(message.getBytes(), props);// 通過 RabbitTemplate 發送消息到本地隨機交換機(Local Random Exchange)rabbitTemplate.send(RabbitConfig.LRE_EXCHANGE, "", request);return "Request sent with correlationId: " + correlationId;}
}

方式二、推薦寫法
也可以用使用 Spring AMQP 的官方推薦 RPC 模式(即 convertSendAndReceive())的實現方式。這種方式完全利用了 RabbitTemplate 的自動 reply-to、correlationId、超時機制 —— 更加簡單可靠

package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public String sendRequest(String message) {// 設置超時時間(可選)rabbitTemplate.setReplyTimeout(5000);// 使用 convertSendAndReceive 會自動:// 1. 創建一個臨時 reply queue(exclusive)// 2. 設置 reply_to 和 correlation_id// 3. 等待結果并返回Object response = rabbitTemplate.convertSendAndReceive(RabbitConfig.LRE_EXCHANGE, "", message);if (response != null) {return "Received response: " + response.toString();} else {return "No response received (timeout or error)";}}
}

兩者優點總結

功能原來方式(手動監聽)convertSendAndReceive()(推薦)
reply_to自動處理? 手動? 自動
correlation_id 匹配? 手動? 自動
超時控制? 復雜? 簡單
代碼復雜度
推薦程度????

RPC服務端處理
方式一 手動

package com.example.consumer;import com.example.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RpcServer {/*** RabbitMQ RPC 服務端處理方法* * 使用 @RabbitListener 監聽指定隊列,當接收到客戶端請求時,手動獲取 reply_to 和 correlation_id,* 并通過底層 channel 手動發送響應消息。** @param message        收到的消息正文* @param correlationId  唯一標識此次 RPC 請求的 ID(由客戶端生成并設置)* @param replyTo        回調隊列(客戶端臨時隊列)* @param requestMessage 原始 AMQP 消息對象* @param channel        底層通信通道,用于手動發送響應* @return null(返回值不會被用來發送響應,因為我們是手動發送的)*/@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)public String handleRpc(String message,@Header(AmqpHeaders.CORRELATION_ID) String correlationId,@Header(AmqpHeaders.REPLY_TO) String replyTo,Message requestMessage,Channel channel) throws IOException {// 構造服務端響應內容String response = "Processed: " + message;// 打印收到的信息和即將回應的隊列System.out.println("replyTo: " + replyTo + ", Server received: " + message + ", correlationId: " + correlationId);// 構造響應消息的屬性,確保帶上原始 correlationIdMessageProperties replyProps = new MessageProperties();replyProps.setCorrelationId(correlationId);// 構造響應消息對象Message reply = new Message(response.getBytes(), replyProps);// 手動發送響應消息到客戶端指定的臨時隊列channel.basicPublish("", replyTo, null, reply.getBody());// 因為手動處理了響應,不需要 Spring 自動回傳return null;}
}

方式二自動處理

@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)
public String handleRpc(String message) {System.out.println("Server received: " + message);return "Processed: " + message;
}

運行結果

Request sent with correlationId: 9cf6df25-3e02-47da-96ad-23a21791b391
replay:amq.gen-CcSRdsuLJtjtXOzFUE3Eug Server received: 0測試0 correlationId:9cf6df25-3e02-47da-96ad-23a21791b391
Got reply: Processed: 0測試0
Request sent with correlationId: d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
replay:amq.gen-jnFzJQallOE6QRkZEZyn-Q Server received: 3測試1 correlationId:d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
Got reply: Processed: 3測試1
Request sent with correlationId: 2009671b-ef8d-418c-ae9b-c58c8e0dac83
replay:amq.gen--tLpLz3xs9p_BEZmqJUjFg Server received: 6測試2 correlationId:2009671b-ef8d-418c-ae9b-c58c8e0dac83
Got reply: Processed: 6測試2
Request sent with correlationId: 6637a3dd-4e24-48e5-871f-cd671ea6d9b6
replay:amq.gen-CejNGqwNk6bWPkxrQLvH7Q Server received: 9測試3 correlationId:6637a3dd-4e24-48e5-871f-cd671ea6d9b6
Got reply: Processed: 9測試3
Request sent with correlationId: c994fab1-75c4-4618-8af8-b03f2fcdfa6f
replay:amq.gen-mdKE_hhHhj_ZEgT-fIm4nw Server received: 12測試4 correlationId:c994fab1-75c4-4618-8af8-b03f2fcdfa6f
Got reply: Processed: 12測試4
Request sent with correlationId: b27d1409-d595-47f8-b920-2d4ad23288d2
replay:amq.gen-ofZgztMXNh9MMEejK6DDGA Server received: 15測試5 correlationId:b27d1409-d595-47f8-b920-2d4ad23288d2
Got reply: Processed: 15測試5
Request sent with correlationId: adc98f0d-5270-4033-86c0-e863cd56ecee
replay:amq.gen-xKkf-7LcEhOzamv892nL8A Server received: 18測試6 correlationId:adc98f0d-5270-4033-86c0-e863cd56ecee
Got reply: Processed: 18測試6
Request sent with correlationId: 87f6722d-e974-474d-a79c-9aea69401fa7
replay:amq.gen-r5jjy4ypnSDso-HZ5PuNWA Server received: 21測試7 correlationId:87f6722d-e974-474d-a79c-9aea69401fa7
Got reply: Processed: 21測試7
Request sent with correlationId: de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
replay:amq.gen-7QDoBB5wqbjLC0MidVSkbA Server received: 24測試8 correlationId:de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
Got reply: Processed: 24測試8
Request sent with correlationId: 1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
replay:amq.gen-1rFRnN9vKCUt6HIrRLSoBw Server received: 27測試9 correlationId:1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
Got reply: Processed: 27測試9

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/88184.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/88184.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/88184.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

中山排氣歧管批量自動化智能化3D尺寸測量及cav檢測分析

當前制造業快速發展&#xff0c;傳統測量方法正面臨嚴峻挑戰。生產規模的持續擴張使得現有測量手段逐漸暴露出效率不足的問題&#xff0c;這種技術滯后性正在直接影響企業的整體生產效率。具體表現為測量速度跟不上生產節拍&#xff0c;精度要求難以達標&#xff0c;最終導致生…

Debian 11 Bullseye 在線安裝docker

首先移除所有錯誤的 Docker 軟件源&#xff1a;sudo rm -f /etc/apt/sources.list.d/docker*安裝必要依賴sudo apt update sudo apt install -y ca-certificates curl gnupg添加 Docker 官方 GPG 密鑰&#xff08;使用國內鏡像&#xff09;&#xff1a;curl -fsSL https://mirr…

Spring Boot 項目中多數據源配置使用場景

在 Spring Boot 中配置多數據源是一個非常常見的需求&#xff0c;主要用于以下場景&#xff1a; 讀寫分離&#xff1a;一個主數據庫&#xff08;Master&#xff09;負責寫操作&#xff0c;一個或多個從數據庫&#xff08;Slave&#xff09;負責讀操作&#xff0c;以提高性能和可…

FAAC 在海思平臺使用得到aac實時音頻流

FAAC 在海思平臺使用得到aac實時音頻流 使用 FAAC將音頻 pcm轉為 aac 主要參見這篇博客 FAAC 在君正平臺使用得到aac實時音頻流_君正 x2600 音頻-CSDN博客

javascript函數參數類似python函數參數星號*解耦數組

序言通常情況下&#xff0c;我們很可能不清楚參數有多少&#xff0c;這個時候用的都是數組。但是使用數組和單個元素&#xff0c;從內心情感來說&#xff0c;它們是兩種維度&#xff0c;為了讓參數成為一個數組&#xff0c;把單個輸入的參數強加一個數組的外殼&#xff0c;并不…

C語言基礎(1)

1.編譯器的選擇 我們的c語言是一門&#xff0c;我們寫的c語言代碼是文本文件(存放在.c為后綴的文件中)&#xff0c;文本文件本身無法被執行&#xff0c;必須通過編譯器的編譯和鏈接器的鏈接&#xff0c;生成可執行的二進制文件&#xff0c;才能夠被執行注意&#xff1a; 每個源…

Rust賦能美團云原生DevOps實踐

Rust 云原生 DevOps 實踐 在云原生環境中,Rust 的高性能與安全性使其成為構建微服務和基礎設施工具的理想選擇。Docker 作為容器化標準工具,結合 Rust 的跨平臺特性,可高效實現持續集成與部署(CI/CD)。 構建優化的 Rust Docker 鏡像 多階段構建是 Rust 項目容器化的關鍵…

計算機網絡實驗——配置ACL

ACL基礎一、實驗目的1. 配置H3C路由器基本ACL。二、實驗要求1. 熟練掌握網絡配置能力。2. 熟練掌握ACL基本配置。三、實驗步驟&#xff08;1&#xff09;使用reset saved-configuration命令和reboot命令&#xff0c;重置路由器原有配置&#xff0c;如圖1所示。圖 1&#xff08;…

在本地部署mcp服務器實現自然語言操作mysql數據庫,輕松實現數據表的增~ 刪~ 改~ 查~

1.將寫好的mcp_server代碼放在本地任意盤&#xff01; import asyncio import logging import os import sys from mysql.connector import connect, Error from mcp.server import Server from mcp.types import Resource, Tool, TextContent from pydantic import AnyUrl# Co…

2025快手創作者中心發布視頻python實現

難度還行&#xff0c;只有一個__NS_sig3加密&#xff0c;流程麻煩點cookies_list cookie.split("; ")cookie_dict {}# 遍歷每個 Cookie&#xff0c;根據等號將鍵值對拆分并添加到字典中for cookie in cookies_list:key_value cookie.split("")if len(ke…

Android 組件內核

文章目錄什么是binder1. 什么是Binder&#xff1f;2. Binder架構組成3. 工作原理與通信流程1&#xff09;服務注冊2&#xff09;服務查詢3&#xff09;通信過程4&#xff09;核心數據結構4. 關鍵技術點5. 常見面試考點1&#xff09;Binder與傳統IPC&#xff08;Socket、管道、共…

java類加載機制:Tomcat的類加載機制

Tomcat類加載機制深度解析&#xff1a;打破雙親委派的Web容器實現 Tomcat作為Java Web容器&#xff0c;其類加載機制為滿足Web應用的隔離性、熱部署和兼容性需求&#xff0c;對標準Java類加載機制進行了定制化擴展&#xff0c;核心是打破雙親委派模型并引入多層級類加載器。以下…

【PTA數據結構 | C語言版】從順序表 list 中刪除第 i 個元素

本專欄持續輸出數據結構題目集&#xff0c;歡迎訂閱。 文章目錄題目代碼題目 請編寫程序&#xff0c;將 n 個整數存入順序表&#xff0c;對任一指定的第 i 個位置&#xff0c;將這個位置上的元素從順序表中刪除。注意&#xff1a;i 代表位序&#xff0c;從 1 開始&#xff0c;…

VS2022 C++ EasyX庫 掃雷游戲項目開發:打造經典游戲的詳細之旅

老樣子&#xff0c;先上效果 視頻演示 C經典掃雷-介紹一、引言 在這篇博客中&#xff0c;我將詳細介紹掃雷游戲項目的開發過程。掃雷作為一款經典的游戲&#xff0c;其規則簡單但富有挑戰性。通過開發這個項目&#xff0c;我不僅加深了對 C 編程的理解&#xff0c;還提升了自己…

Go語言網絡游戲服務器模塊化編程

本文以使用origin框架&#xff08;一款使用Go語言寫的開源游戲服務器框架&#xff09;為例進行說明&#xff0c;當然也可以使用其它的框架或者自己寫。 在框架中PBProcessor用來處理Protobuf消息&#xff0c;在使用之前&#xff0c;需要使用Register函數注冊網絡消息&#xff…

【機器人】Aether 多任務世界模型 | 4D動態重建 | 視頻預測 | 視覺規劃

Aether 是一個的世界模型&#xff0c;整合幾何重建與生成建模的統一框架&#xff0c;實現類人空間推理能力。 來自ICCV 2025&#xff0c;該框架具有三大核心功能&#xff1a; (1) 4D動態重建&#xff0c;(2) 動作條件視頻預測&#xff0c; (3) 目標條件視覺規劃。 代碼地址&…

MiniMind:3小時訓練26MB微型語言模型,開源項目助力AI初學者快速入門

開發&#xff5c;界面&#xff5c;引擎&#xff5c;交付&#xff5c;副駕——重寫全棧法則&#xff1a;AI原生的倍速造應用流來自全棧程序員 nine 的探索與實踐&#xff0c;持續迭代中。 歡迎關注評論私信交流~ 在大型語言模型(LLaMA、GPT等)日益流行的今天&#xff0c;一個名為…

相機Camera日志實例分析之五:相機Camx【萌拍閃光燈后置拍照】單幀流程日志詳解

【關注我&#xff0c;后續持續新增專題博文&#xff0c;謝謝&#xff01;&#xff01;&#xff01;】 上一篇我們講了&#xff1a; 這一篇我們開始講&#xff1a; 目錄 一、場景操作步驟 二、日志基礎關鍵字分級如下 三、場景日志如下&#xff1a; 一、場景操作步驟 操作步…

[2-02-02].第03節:環境搭建 - Win10搭建ES集群環境

ElasticSearch學習大綱 基于ElasticSearch7.8版本 一、ElasticStack下載&#xff1a; 1.Elasticsearch 的官方地址 2.Elasticsearch 下載地址&#xff1a; 二、集群搭建: 第1步&#xff1a;創建es目錄&#xff1a; 1.創建 elasticsearch-cluster 文件夾&#xff0c;在內部…

操作系統核心技術剖析:從Android驅動模型到鴻蒙微內核的國產化實踐

目錄 一、移動端操作系統技術細節 1. Android 內核版本 核心模塊 驅動架構 國內定制案例 2. iOS XNU內核關鍵模塊 安全機制 3. HarmonyOS 多內核架構 驅動隔離 二、PC端操作系統技術細節 1. Windows NT內核 模塊分層 驅動模型 國內適配 2. macOS&#xff08;X…