Local Random Exchange
一種 RabbitMQ 4.0+ 引入的新型交換機,主要是為 request-reply(RPC)場景 設計的。
- 使用這種交換機時,消息只會被路由到本地節點上的隊列,可以確保極低的消息發布延遲。
- 如果有多個本地隊列綁定到該交換機,它會隨機選擇一個隊列接收消息。
關鍵點總結:
- 本地傳輸:不會把消息發到其他節點的隊列。
- 隨機選隊列:多個本地隊列中隨機挑一個。
- 發布快:避免了跨節點網絡通信,延遲低。
- 最適合用于 RPC 模式,即
“請求-響應”
。
建議將
Local Random Exchange 和 exclusive(獨占)隊列
搭配使用,這樣可以為 RPC 場景提供更低延遲的組合。
注意
- Exclusive 隊列是 RabbitMQ 中只對某個連接開放的臨時隊列(通常用于響應)。
- LRE + Exclusive Queue,可以避免消息在集群中轉發,提高響應速度。
LRE 不轉發消息
到其他節點,所以如果當前節點沒有合適的隊列,消息會被直接丟棄!
所以使用時你必須確保每個節點上都至少有一個消費者綁定的隊列
。
在 RabbitMQ 前面加負載均衡器(load balancer)會讓這種交換機類型幾乎無法正常工作。
原因分析
- Local Random Exchange 依賴于消息被投遞到“本地綁定隊列(local queues)”。
- 如果用了負載均衡,客戶端連接可能隨機落在任何節點上,消息將發給該節點的本地隊列。
- 如果該節點上沒有消費者綁定本地隊列,消息就會被丟棄。
實操如下
application.properties
# JVM內存配置
# 設置較小的堆內存,避免占用過多系統資源
spring.jvm.memory=-Xmx256m -Xms128m -XX:MaxMetaspaceSize=128m# 設置較小的線程棧大小
spring.jvm.thread-stack-size=-Xss256k# 啟用GC日志,幫助診斷內存問題
spring.jvm.gc-log=-Xlog:gc*:file=./logs/gc.log:time,uptime,level,tags:filecount=5,filesize=10m# 設置較小的代碼緩存大小
spring.jvm.code-cache=-XX:ReservedCodeCacheSize=128m# 啟用內存壓縮指針基址設置,將Java堆放在4GB以上地址空間
spring.jvm.heap-base=-XX:HeapBaseMinAddress=4g# 啟用G1垃圾收集器的更積極設置
spring.jvm.gc-tuning=-XX:G1ReservePercent=10 -XX:G1HeapRegionSize=4m -XX:InitiatingHeapOccupancyPercent=30# 禁用顯式GC調用
spring.jvm.disable-explicit-gc=-XX:+DisableExplicitGC
application.yml
#定義要使用的交換機和隊列名稱
spring:application:name: local-random-exchange#配置連接 rabbitmq服務器rabbitmq:#mq服務器的iphost: 127.0.0.1#訪問端口號port: 5672#用戶名稱username: admin#密碼password: 123456#虛擬主機virtual-host: my-virtual-host# 連接超時時間connection-timeout: 10000# 日志配置
logging:level:org.springframework.amqp: INFO # AMQP日志級別com.example: DEBUG # 應用日志級別
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ 配置類,用于創建 Local Random Exchange(本地隨機交換機)和綁定的 RPC 隊列。* 本配置主要用于實現基于 RabbitMQ 的 RPC 模式,使用 Local Random Exchange 類型降低延遲。*/
@Configuration
public class RabbitConfig {// Local Random Exchange 名稱(自定義交換機)public static final String LRE_EXCHANGE = "lre.exchange";// RPC 使用的隊列名稱public static final String RPC_QUEUE_NAME = "rpc.queue";/*** 聲明一個 Local Random Exchange(x-local-random 類型的交換機)。** - durable: true 表示交換機會持久化* - autoDelete: false 表示不會在沒有綁定隊列時自動刪除* - arguments: 可選參數,此處為空*/@Beanpublic CustomExchange lreExchange() {Map<String, Object> args = new HashMap<>();return new CustomExchange(LRE_EXCHANGE, "x-local-random", true, false, args);}/*** 聲明一個 RPC 隊列。** - durable: false 表示不持久化(重啟后丟失)* - exclusive: false 表示不是只被當前連接獨占* - autoDelete: true 表示連接斷開后自動刪除隊列** 如果你要模擬 RPC Client 的 exclusive 回調隊列,建議用 `exclusive = true`。*/@Beanpublic Queue rpcQueue() {return new Queue(RPC_QUEUE_NAME, false, false, true);}/*** 將 RPC 隊列綁定到 Local Random Exchange 上。** - routingKey 設置為 "",因為 Local Random Exchange 不關心路由鍵*/@Beanpublic Binding binding(Queue rpcQueue, CustomExchange lreExchange) {return BindingBuilder.bind(rpcQueue).to(lreExchange).with("").noargs();}
}
方式一、手動監聽
package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** 模擬 RPC 客戶端,用于通過 RabbitMQ 的 Local Random Exchange 發送請求并接收異步響應。*/
@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 向服務器發送請求,并設置回調隊列接收響應。** @param message 請求消息內容* @return 返回一個確認字符串(實際響應在回調中處理)*/public String sendRequest(String message) throws Exception {// 生成唯一標識 correlationId,用于標識請求-響應配對String correlationId = UUID.randomUUID().toString();// 臨時生成一個獨占的匿名回調隊列(例如 amq.gen-xxxxxx)String replyQueue = rabbitTemplate.execute(channel -> channel.queueDeclare().getQueue());// 設置 RabbitTemplate 的回調地址(其實不會生效于 send 模式,僅用于演示)rabbitTemplate.setReplyAddress(replyQueue);rabbitTemplate.setReplyTimeout(5000); // 設置超時時間(ms)rabbitTemplate.setCorrelationKey("correlation_id"); // 設置用于匹配的屬性名(可選)// 設置監聽器容器,監聽回調隊列中的響應消息SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());container.setQueueNames(replyQueue); // 指定監聽的隊列container.setMessageListener(new MessageListenerAdapter(new Object() {// 定義接收到消息后的處理方法(方法名必須與監聽器默認匹配或顯式指定)@SuppressWarnings("unused")public void handleMessage(byte[] reply) {String response = new String(reply, StandardCharsets.UTF_8);System.out.println("Got reply: " + response);// 實際中這里應喚醒等待線程或放入響應Map中(基于 correlationId)}}));container.start(); // 啟動監聽器容器// 構造請求消息,設置 reply_to 和 correlation_id 屬性MessageProperties props = new MessageProperties();props.setReplyTo(replyQueue); // 告訴服務端響應要發到這個隊列props.setCorrelationId(correlationId); // 服務端會原樣返回,用于客戶端識別對應響應Message request = new Message(message.getBytes(), props);// 通過 RabbitTemplate 發送消息到本地隨機交換機(Local Random Exchange)rabbitTemplate.send(RabbitConfig.LRE_EXCHANGE, "", request);return "Request sent with correlationId: " + correlationId;}
}
方式二、推薦寫法
也可以用使用 Spring AMQP 的官方推薦 RPC 模式(即 convertSendAndReceive())的實現方式。這種方式完全利用了 RabbitTemplate 的自動 reply-to、correlationId、超時機制 —— 更加簡單可靠
package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public String sendRequest(String message) {// 設置超時時間(可選)rabbitTemplate.setReplyTimeout(5000);// 使用 convertSendAndReceive 會自動:// 1. 創建一個臨時 reply queue(exclusive)// 2. 設置 reply_to 和 correlation_id// 3. 等待結果并返回Object response = rabbitTemplate.convertSendAndReceive(RabbitConfig.LRE_EXCHANGE, "", message);if (response != null) {return "Received response: " + response.toString();} else {return "No response received (timeout or error)";}}
}
兩者優點總結
功能 | 原來方式(手動監聽) | convertSendAndReceive()(推薦) |
---|---|---|
reply_to 自動處理 | ? 手動 | ? 自動 |
correlation_id 匹配 | ? 手動 | ? 自動 |
超時控制 | ? 復雜 | ? 簡單 |
代碼復雜度 | 高 | 低 |
推薦程度 | ? | ??? |
RPC服務端處理
方式一 手動
package com.example.consumer;import com.example.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RpcServer {/*** RabbitMQ RPC 服務端處理方法* * 使用 @RabbitListener 監聽指定隊列,當接收到客戶端請求時,手動獲取 reply_to 和 correlation_id,* 并通過底層 channel 手動發送響應消息。** @param message 收到的消息正文* @param correlationId 唯一標識此次 RPC 請求的 ID(由客戶端生成并設置)* @param replyTo 回調隊列(客戶端臨時隊列)* @param requestMessage 原始 AMQP 消息對象* @param channel 底層通信通道,用于手動發送響應* @return null(返回值不會被用來發送響應,因為我們是手動發送的)*/@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)public String handleRpc(String message,@Header(AmqpHeaders.CORRELATION_ID) String correlationId,@Header(AmqpHeaders.REPLY_TO) String replyTo,Message requestMessage,Channel channel) throws IOException {// 構造服務端響應內容String response = "Processed: " + message;// 打印收到的信息和即將回應的隊列System.out.println("replyTo: " + replyTo + ", Server received: " + message + ", correlationId: " + correlationId);// 構造響應消息的屬性,確保帶上原始 correlationIdMessageProperties replyProps = new MessageProperties();replyProps.setCorrelationId(correlationId);// 構造響應消息對象Message reply = new Message(response.getBytes(), replyProps);// 手動發送響應消息到客戶端指定的臨時隊列channel.basicPublish("", replyTo, null, reply.getBody());// 因為手動處理了響應,不需要 Spring 自動回傳return null;}
}
方式二自動處理
@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)
public String handleRpc(String message) {System.out.println("Server received: " + message);return "Processed: " + message;
}
運行結果
Request sent with correlationId: 9cf6df25-3e02-47da-96ad-23a21791b391
replay:amq.gen-CcSRdsuLJtjtXOzFUE3Eug Server received: 0測試0 correlationId:9cf6df25-3e02-47da-96ad-23a21791b391
Got reply: Processed: 0測試0
Request sent with correlationId: d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
replay:amq.gen-jnFzJQallOE6QRkZEZyn-Q Server received: 3測試1 correlationId:d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
Got reply: Processed: 3測試1
Request sent with correlationId: 2009671b-ef8d-418c-ae9b-c58c8e0dac83
replay:amq.gen--tLpLz3xs9p_BEZmqJUjFg Server received: 6測試2 correlationId:2009671b-ef8d-418c-ae9b-c58c8e0dac83
Got reply: Processed: 6測試2
Request sent with correlationId: 6637a3dd-4e24-48e5-871f-cd671ea6d9b6
replay:amq.gen-CejNGqwNk6bWPkxrQLvH7Q Server received: 9測試3 correlationId:6637a3dd-4e24-48e5-871f-cd671ea6d9b6
Got reply: Processed: 9測試3
Request sent with correlationId: c994fab1-75c4-4618-8af8-b03f2fcdfa6f
replay:amq.gen-mdKE_hhHhj_ZEgT-fIm4nw Server received: 12測試4 correlationId:c994fab1-75c4-4618-8af8-b03f2fcdfa6f
Got reply: Processed: 12測試4
Request sent with correlationId: b27d1409-d595-47f8-b920-2d4ad23288d2
replay:amq.gen-ofZgztMXNh9MMEejK6DDGA Server received: 15測試5 correlationId:b27d1409-d595-47f8-b920-2d4ad23288d2
Got reply: Processed: 15測試5
Request sent with correlationId: adc98f0d-5270-4033-86c0-e863cd56ecee
replay:amq.gen-xKkf-7LcEhOzamv892nL8A Server received: 18測試6 correlationId:adc98f0d-5270-4033-86c0-e863cd56ecee
Got reply: Processed: 18測試6
Request sent with correlationId: 87f6722d-e974-474d-a79c-9aea69401fa7
replay:amq.gen-r5jjy4ypnSDso-HZ5PuNWA Server received: 21測試7 correlationId:87f6722d-e974-474d-a79c-9aea69401fa7
Got reply: Processed: 21測試7
Request sent with correlationId: de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
replay:amq.gen-7QDoBB5wqbjLC0MidVSkbA Server received: 24測試8 correlationId:de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
Got reply: Processed: 24測試8
Request sent with correlationId: 1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
replay:amq.gen-1rFRnN9vKCUt6HIrRLSoBw Server received: 27測試9 correlationId:1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
Got reply: Processed: 27測試9