以下是一個使用純Java(非Spring Boot)接收RabbitMQ消息的完整實現,包含Maven依賴和持續監聽消息的循環:
1. 首先添加Maven依賴 (pom.xml
)
<dependencies><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- 日志框架 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>
2. RabbitMQ消息接收器實現
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitMQReceiver {private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);private final ConnectionFactory factory;private Connection connection;private Channel channel;private volatile boolean running = true;public RabbitMQReceiver(String host, int port, String username, String password) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);}public void startListening(String queueName) {try {// 建立連接connection = factory.newConnection();channel = connection.createChannel();// 聲明隊列(如果不存在則創建)channel.queueDeclare(queueName, true, false, false, null);logger.info("連接到隊列: {}", queueName);// 設置每次只接收一條消息(公平分發)channel.basicQos(1);// 創建消費者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);try {// 在這里處理你的業務邏輯processMessage(message);// 手動確認消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {logger.error("消息處理失敗", e);// 處理失敗時拒絕消息(不重新入隊)channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};// 取消消費者回調CancelCallback cancelCallback = consumerTag -> {logger.warn("消費者被取消: {}", consumerTag);};// 開始消費消息channel.basicConsume(queueName, false, deliverCallback, cancelCallback);logger.info("開始監聽消息... (按CTRL+C停止)");// 保持程序運行while (running) {Thread.sleep(1000); // 防止CPU空轉}} catch (IOException | TimeoutException | InterruptedException e) {logger.error("RabbitMQ連接失敗", e);} finally {closeResources();}}private void processMessage(String message) {// 這里是你的業務邏輯處理logger.info("處理消息: {}", message);// 示例:打印消息長度System.out.println("消息長度: " + message.length());}public void stop() {running = false;logger.info("停止監聽...");}private void closeResources() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMQ連接已關閉");} catch (IOException | TimeoutException e) {logger.error("關閉資源時出錯", e);}}public static void main(String[] args) {// 配置RabbitMQ連接參數String host = "localhost";int port = 5672;String username = "guest";String password = "guest";String queueName = "my_queue";RabbitMQReceiver receiver = new RabbitMQReceiver(host, port, username, password);// 添加關閉鉤子Runtime.getRuntime().addShutdownHook(new Thread(() -> {receiver.stop();receiver.closeResources();}));// 開始監聽receiver.startListening(queueName);}
}
關鍵功能說明:
-
持續監聽機制:
while (running) {Thread.sleep(1000); // 防止CPU空轉 }
使用
running
標志控制循環,優雅退出 -
消息處理流程:
- 聲明隊列確保存在
- 設置QoS為1(公平分發)
- 使用
DeliverCallback
處理消息 - 手動消息確認(ACK/NACK)
- 異常處理與錯誤恢復
-
資源管理:
- 使用
finally
塊確保關閉連接 - 添加Shutdown Hook處理程序終止
- 線程安全的狀態管理(
volatile running
)
- 使用
-
日志記錄:
- 使用SLF4J進行日志記錄
- 關鍵操作都有日志輸出
使用說明:
-
啟動消費者:
mvn compile exec:java -Dexec.mainClass="RabbitMQReceiver"
-
發送測試消息(使用RabbitMQ管理界面或命令行工具):
rabbitmqadmin publish exchange=amq.default routing_key=my_queue payload="hello world"
-
停止程序:
- 按
CTRL+C
優雅停止 - 程序會自動關閉連接
- 按
自定義配置:
-
修改連接參數:
String host = "your.rabbitmq.host"; int port = 5672; String username = "your_user"; String password = "your_password"; String queueName = "your_queue_name";
-
自定義消息處理:
修改processMessage
方法實現你的業務邏輯:private void processMessage(String message) {// 示例:解析JSON消息// JSONObject json = new JSONObject(message);// System.out.println("收到訂單: " + json.getString("orderId"));// 你的實際業務邏輯 }
-
配置調整:
- 修改
channel.basicQos()
調整預取數量 - 修改
basicNack
的requeue
參數控制是否重新入隊 - 添加交換機綁定邏輯(如果需要)
- 修改
這個實現遵循了RabbitMQ最佳實踐,包括:
- 手動消息確認
- 公平分發(QoS設置)
- 連接和通道的異常處理
- 資源清理
- 優雅關閉機制
如果需要處理更復雜的場景(如多個隊列、消息持久化、死信隊列等),可以在channel.queueDeclare
和channel.basicConsume
方法中添加相應參數。