一 消息通信協議
1 AMQP
??AMQP 是一個開放的、跨語言、跨平臺的消息協議標準,用于在分布式系統中傳遞業務消息。它定義了消息隊列的二進制協議格式和交互模型(如交換機、隊列、綁定等),確保不同語言(Java、Python、C#等)和平臺(RabbitMQ、Qpid等)的實現能夠互操作。
2 Spring AMQP
??Spring AMQP 是 Spring 框架對 AMQP 協議(如 RabbitMQ)的抽象與封裝,提供了一套簡潔的 Java API 來簡化消息的生產和消費。
??它通過AmqpTemplate
模板類實現消息的快速發送(如自動序列化、路由鍵設置),結合@RabbitListener
注解實現聲明式的消費者監聽,同時支持動態聲明交換機、隊列和綁定(通過 RabbitAdmin),并深度集成 Spring 生態(如依賴注入、事務管理)。
??其核心優勢在于隱藏底層協議細節,讓開發者僅需關注業務邏輯,無需手動處理連接、通道或消息確認等復雜操作,從而高效構建異步、解耦的分布式系統。
二 入門demo
1 項目路徑圖
2 交換機與隊列準備
??創建虛擬主機/midhuang
,使用其中的交換機amq.fanout
,與隊列xiaohuang.queue1
進行綁定。
3 父工程導入依賴
??創建spring項目,為父工程導入依賴,兩個子工程會會自動繼承這些依賴
<!-- Lombok:簡化Java代碼(如自動生成Getter/Setter) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- Spring AMQP + RabbitMQ 集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot 單元測試 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
4 各個微服務中配置文件
??① consumer(消費者),5672是用來發消息的端口,15672是Web控制臺
spring:rabbitmq:host: 192.168.44.128port: 5672 virtual-host: /midhuangusername: dahuangpassword: "dahuang66"
logging:level:cn.tj.consumer.listeners: DEBUG # 設置為 DEBUG 以查看詳細日志
??② publisher(生產者)
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"
logging:level:cn.tj: DEBUG # 設置為 DEBUG 以查看詳細日志
5 測試類發送消息
??利用RabbitTemplate
發送消息
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// 直接發送消息到隊列@Testpublic void testHuangQueue() {// 1. 定義隊列名稱String queueName = ".queue";// 2. 定義消息內容String message = "hello, spring amqp!";// 3. 發送消息(默認路由到隊列)rabbitTemplate.convertAndSend(queueName, message);System.out.println("消息發送成功: " + message);}
}
6 控制臺檢查剛剛發送消息
7 接收消息
??利用@RabbitListener
注解聲明要監聽的隊列,監聽消息,此時消費者服務需啟動保持監聽狀態
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "xiaohuang.queue1")public void listenHuangQueue(String msg) {log.info("收到消息: {}", msg);// 調試:打印消息長度和字節(如果 msg 為 null,會輸出 "null")log.debug("消息長度: {}, 內容: {}", msg == null ? "null" : msg.length(), msg);}
}
三 Workqueues
??Workqueues(工作隊列)是一種常見的任務分發模型,將多個消費者綁定到一個隊列,共同消費隊列中的消息,通過共享隊列實現負載均衡,從而解決消息堆積問題。
1 創建隊列 work.queue
2 添加兩個消費者 – MqListener
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) {log.info("1號消費者收到消息: {}", msg);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) {log.info("2號消費者收到消息: {}", msg);}
}
3 生產者連續發送消息 – PublisherApplicationTests
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";// 發送50條測試消息for (int i = 1; i <= 40; i++) {String msg = "hello" + i;rabbitTemplate.convertAndSend(queueName, msg);System.out.println(" [x] Sent '" + msg + "'");Thread.sleep(30); // 模擬延遲,避免消息爆發}}
}
4 控制臺輸出
??此時并未考慮到消費者的處理能力,RabbitMQ 默認使用 輪詢(Round-Robin) 策略分發消息,這可能導致某些消費者積壓大量未處理消息,而其他消費者空閑。
5 編寫配置文件 – consumer
??prefetchCount:1
像外賣小哥一次只接一單,平臺會一次性給小哥派5 個訂單(即使他還沒處理完之前的訂單)。如果小哥手慢(比如堵車),這 5 個訂單都會卡在他手里,其他空閑的小哥卻沒訂單可接。
??啟用prefetchCount:1
后:平臺每次只給小哥派 1 個訂單。小哥必須完成這個訂單(送到顧客手里),平臺才會派下一個訂單。如果小哥 A 處理快,他很快能接下一個訂單;小哥 B 處理慢,他手頭的訂單不會影響別人。
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"listener:simple:prefetch: 1
logging:level:cn.tj.consumer.listeners: DEBUG # 設置為 DEBUG 以查看詳細日志
6 設置消費者處理速度 – MqListener
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {log.info("1號消費者收到消息: {}", msg);Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {log.debug("2號消費者收到消息: {}", msg);Thread.sleep(200);}
}