-
聲明對列和交換機
你需要先在yaml文件當中進行rabbitmq的相關配置
rabbitmq:host:192.168.150.101 //消息件的地址port:5672 //端口數據username:itcast //用戶名password:123321 //密碼virtual-host:/ //虛擬機主機名
聲明隊列交換機,創建新的工具類,定義不同功能的交換機
public class MqConstants {/*** 交換機*/public final static String HOTEL_EXCHANGE = "hotel.topic";/*** 監聽新增和修改的隊列*/public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";/*** 監聽刪除的隊列*/public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";/*** 新增或修改的RoutingKey*/public final static String HOTEL_INSERT_KEY = "hotel.insert";/*** 刪除的RoutingKey*/public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
-
生產者發送消息時,指定交換機為
HOTEL_EXCHANGE
、路由鍵為HOTEL_INSERT_KEY
等。 -
消費者聲明隊列并綁定到
HOTEL_EXCHANGE
交換機,通過對應的路由鍵(HOTEL_INSERT_KEY
/HOTEL_DELETE_KEY
)來訂閱特定業務(新增修改 / 刪除 )的消息 。
定義隊列交換機的對象和綁定關系:
-
基于@Bean的方式
這種方式適合在配置類當中集中管理隊列,交換機,綁定關系的 Bean 定義,結構清晰,便于統一維護。
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; ? // 配置類注解,讓 Spring 掃描并加載這些 Bean 定義 @Configuration public class RabbitMqConfig { ?// 1. 定義交換機(對應 MqConstants 里的交換機)@Beanpublic TopicExchange hotelExchange() {// 創建一個 Topic 類型的交換機,名稱用 MqConstants 里的return new TopicExchange(MqConstants.HOTEL_EXCHANGE);} ?// 2. 定義“新增和修改”隊列@Beanpublic Queue hotelInsertQueue() {return new Queue(MqConstants.HOTEL_INSERT_QUEUE);} ?// 3. 定義“刪除”隊列@Beanpublic Queue hotelDeleteQueue() {return new Queue(MqConstants.HOTEL_DELETE_QUEUE);} ?// 4. 綁定“新增和修改”隊列到交換機(設置路由鍵)@Beanpublic Binding bindHotelInsertQueue(TopicExchange hotelExchange, Queue hotelInsertQueue) {// 用 MqConstants 里的新增路由鍵,將隊列綁定到交換機return BindingBuilder.bind(hotelInsertQueue).to(hotelExchange).with(MqConstants.HOTEL_INSERT_KEY);} ?// 5. 綁定“刪除”隊列到交換機(設置路由鍵)@Beanpublic Binding bindHotelDeleteQueue(TopicExchange hotelExchange, Queue hotelDeleteQueue) {// 用 MqConstants 里的刪除路由鍵,將隊列綁定到交換機return BindingBuilder.bind(hotelDeleteQueue).to(hotelExchange).with(MqConstants.HOTEL_DELETE_KEY);} }
-
交換機類型:這里用了
TopicExchange
(主題交換機 ),和MqConstants
里hotel.topic
對應,也可以根據實際需求換成DirectExchange
(直連 )、FanoutExchange
(扇形 / 廣播 )等。 -
Bean 依賴注入:綁定方法(如
bindHotelInsertQueue
)的參數,會由 Spring 自動注入對應的交換機、隊列 Bean,要保證方法參數名或類型能匹配上容器里的 Bean 。 -
路由鍵作用:通過
with
方法指定路由鍵,這樣交換機就會根據路由鍵,把不同業務(新增修改、刪除 )的消息路由到對應的隊列。
-
基于注解的方式
這種方式更靈活,通常在消費者監聽方法上直接聲明隊列、交換機和綁定關系,適合快速開發簡單場景,或者臨時新增隊列綁定的情況。
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; ? @Component // 讓 Spring 掃描到這個組件 public class HotelMqListener { ?// 監聽“新增和修改”隊列,同時聲明隊列、交換機、綁定關系@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE, durable = "true"), // 聲明隊列,durable = true 表示持久化exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = "topic"), // 聲明交換機,類型 topickey = MqConstants.HOTEL_INSERT_KEY // 路由鍵))public void handleHotelInsert(String message) {// 這里寫接收到“新增或修改”消息后的業務邏輯,比如更新 Elasticsearch 酒店數據System.out.println("收到酒店新增/修改消息:" + message);} ?// 監聽“刪除”隊列,同時聲明隊列、交換機、綁定關系@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = "topic"),key = MqConstants.HOTEL_DELETE_KEY))public void handleHotelDelete(String message) {// 這里寫接收到“刪除”消息后的業務邏輯,比如從 Elasticsearch 移除酒店數據System.out.println("收到酒店刪除消息:" + message);} }
-
注解參數說明:
-
@Queue
:聲明隊列,name
指定隊列名稱,durable
設置是否持久化(建議生產環境設為true
,防止 RabbitMQ 重啟隊列丟失 )。 -
@Exchange
:聲明交換機,name
是交換機名稱,type
指定類型(如topic
、direct
、fanout
等 )。 -
@QueueBinding
:把隊列、交換機、路由鍵綁定起來,讓交換機知道如何把消息路由到這個隊列。
-
-
消費者方法:
handleHotelInsert
和handleHotelDelete
方法就是實際處理消息的邏輯,當對應的隊列有消息進來時,方法會被觸發執行。
以下為Rabbitmq的消息發送的代碼,當我們在進行增刪改等操作的時候,我們只需在其方法內進行消息的發送
// 新增酒店@PostMappingpublic void saveHotel(@RequestBody Hotel hotel) {hotelService.save(hotel);// 發送新增消息,用新增路由鍵rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());} ?// 修改酒店@PutMappingpublic void updateById(@RequestBody Hotel hotel) {if (Objects.isNull(hotel.getId())) {throw new InvalidParameterException("id不能為空");}hotelService.updateById(hotel);// 發送修改消息,用修改路由鍵rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_UPDATE_KEY, hotel.getId());} ?// 刪除酒店@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);// 發送刪除消息,用刪除路由鍵rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);} }
當我們進行消息的發送個時候,我們所需要的參數有。交換機的名字。交換機的路由。需要發送的消息。
-
-
消息發送之后,再進行消息的監聽。
消息的監聽的時候,需要先進行接口的定義,分別定義新增修改和刪除的業務接口,可以使用RabbitListener注解進行消息的監聽。
/*** 監聽酒店新增或修改隊列:從數據庫查詢最新數據,同步到 Elasticsearch* */@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)@Transactionalpublic void listenHotelInsertOrUpdate(Long id) {hotelRepository.save(hotel);}
?/*** 監聽酒店刪除隊列:從 Elasticsearch 刪除對應酒店數據* @param id 酒店ID*/@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)@Transactionalpublic void listenHotelDelete(Long id) {// 從 Elasticsearch 刪除酒店hotelRepository.deleteById(id);System.out.println("酒店刪除,同步 Elasticsearch 成功,酒店ID:" + id);}
定義過接口之后,我們就可以進行方法的實現@Overridepublic void deleteById(Long id) {// 1. 準備Request:創建刪除請求,指定索引和文檔IDDeleteRequest request = new DeleteRequest(INDEX_NAME, id.toString());
?// 2. 準備發送請求:執行刪除操作(這里直接執行,也可做一些請求參數的額外設置,比如超時等)try {DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);if (response.getResult() != null) {System.out.println("Elasticsearch 文檔刪除成功,文檔ID:" + id + ",結果:" + response.getResult());} else {System.err.println("Elasticsearch 文檔刪除失敗,文檔ID:" + id);}} catch (Exception e) {e.printStackTrace();System.err.println("刪除 Elasticsearch 文檔時發生異常,文檔ID:" + id + ",異常信息:" + e.getMessage());}}
?@Overridepublic void insertById(Long id) {// 0. 根據id查詢酒店數據:從數據庫查詢Hotel hotel = hotelDbService.getHotelById(id); // 需實現該方法,返回 Hotel 實體if (hotel == null) {System.err.println("根據ID查詢酒店數據為空,ID:" + id);return;}
?// 1. 準備Request:創建索引請求,指定索引IndexRequest request = new IndexRequest(INDEX_NAME);request.id(id.toString()); // 設置文檔ID,與酒店ID對應
?try {// 將 Hotel 對象轉為 JSON 字符串,作為文檔內容String hotelJson = objectMapper.writeValueAsString(hotel);request.source(hotelJson, XContentType.JSON);
?// 2. 準備DSL:這里 DSL 已經通過 request.source 等方式設置好了,// ? 若是復雜場景,可繼續添加路由、超時等參數,比如 request.routing("...")
?// 3. 發送請求:執行索引(新增/更新)操作IndexResponse response = client.index(request, RequestOptions.DEFAULT);System.out.println("Elasticsearch 文檔插入/更新成功,文檔ID:" + id + ",結果:" + response.getResult());} catch (Exception e) {e.printStackTrace();System.err.println("插入/更新 Elasticsearch 文檔時發生異常,文檔ID:" + id + ",異常信息:" + e.getMessage());}}