Springboot仿抖音app開發之評論業務模塊后端復盤及相關業務知識總結
Springboot仿抖音app開發之粉絲業務模塊后端復盤及相關業務知識總結
Springboot仿抖音app開發之用短視頻務模塊后端復盤及相關業務知識總結
Springboot仿抖音app開發之用戶業務模塊后端復盤及相關業務知識總結
Springboot仿抖音app開發之消息業務模塊后端復盤及相關業務知識總結?
為什么需要接口解耦
1. 數據重要性分級處理
在實際業務系統中,數據通常被分為不同重要級別:
重要數據(關鍵業務數據):
- 用戶賬戶信息、交易記錄、訂單數據
- 需要強一致性和ACID特性
- 通常存儲在關系型數據庫(MySQL、PostgreSQL等)
非重要數據(輔助業務數據):
- 用戶行為日志、消息通知、統計數據
- 可以容忍最終一致性
- 適合存儲在NoSQL數據庫(MongoDB、Redis等)
2. 接口解耦的核心優勢
故障隔離:
- 重要數據操作失敗不影響非重要數據的處理
- MongoDB服務異常不會阻塞核心業務流程
- 提高系統整體可用性
為什么MongoDB服務異常會阻塞核心業務流程?
問題場景分析
1. 未解耦的情況(會阻塞核心業務)
問題:
- 如果MongoDB服務異常,整個訂單處理流程都會失敗
- 核心的訂單數據無法保存,影響業務連續性
- 一個非關鍵功能的故障導致關鍵業務無法進行
2. 故障隔離的好處
業務連續性保障:
- 核心業務(訂單創建)不受MongoDB故障影響
- 用戶可以正常下單,不會感知到系統部分組件的異常
系統健壯性提升:
- 不同重要級別的數據采用不同的處理策略
- 非關鍵功能的故障不會造成系統雪崩
運維友好:
- 可以獨立維護和升級MongoDB服務
- MongoDB的性能調優不會影響核心業務
3. 實際案例
假設一個電商系統:
核心流程:用戶下單 → 扣減庫存 → 生成訂單 → 扣款 輔助功能:記錄用戶行為 → 發送消息通知 → 更新推薦算法數據
如果沒有解耦,MongoDB異常會導致:
- 用戶無法下單
- 訂單系統完全癱瘓
- 收入損失
解耦后的效果:
- 用戶正常下單和支付
- 部分通知功能暫時不可用(用戶基本無感知)
- 系統穩定運行,收入不受影響
工廠與批發商的故事
如果沒有中間件(微信群)生產者給消費者發消息需要逐個去發送對應的消息,有了中間件之后只需要 統一發送就行,消費者去找自己對應的消息
?RabbitMQ
?
1.?異步任務處理
- 場景:耗時操作(如發送郵件、生成報表、圖片處理)不適合阻塞主流程。
- 實現:將任務放入消息隊列,由消費者異步處理。
- 優勢:
- 主程序快速響應(如用戶注冊后立即返回,郵件發送由隊列處理)。
- 避免因任務失敗導致主流程崩潰。
2.?系統提速
- 場景:高延遲操作(如數據庫寫入、第三方API調用)拖慢整體性能。
- 實現:主程序發布消息后立即返回,消費者逐步處理。
- 示例:
- 電商下單后,庫存扣減和日志記錄通過隊列異步執行。
- 吞吐量提升:隊列充當緩沖區,允許系統以最大承受速度處理任務。
3.?接口解耦
- 場景:系統間直接調用導致強依賴(如支付系統與物流系統)。
- 實現:通過消息隊列間接通信,生產者無需知道消費者細節。
- 優勢:
- 系統可獨立擴展或升級(如新增一個消費者不會影響生產者)。
- 協議靈活性:不同語言/框架的系統可通過標準協議(如AMQP)交互。
4.?流量削峰(Peak Shaving)
- 場景:突發流量(如秒殺活動)可能壓垮后端服務。
- 實現:將請求放入隊列,消費者按固定速率處理。
- 關鍵點:
- 隊列緩沖超載請求,避免服務崩潰。
- 配合限流策略(如設置隊列最大長度),保證系統穩定。
核心組件及其關系
1. 生產者 (Producer)
- 作用: 消息的發送方,類似于寫信的人
- 職責: 創建消息并發送到交換機
- 特點: 生產者不直接將消息發送給隊列,而是發送給Exchange
2. 交換機 (Exchange)
- 作用: 消息的郵局/分揀中心
- 職責: 接收生產者的消息,根據路由規則決定消息發送到哪個隊列
- 類型:
- Direct: 精確匹配路由鍵
- Topic: 支持通配符匹配 (
*
?和?#
) - Fanout: 廣播到所有綁定的隊列
- Headers: 根據消息頭屬性路由
3. 隊列 (Queue)
- 作用: 消息的郵箱
- 職責: 存儲消息,等待消費者來獲取
- 特點: 先進先出(FIFO)的數據結構
4. 消費者 (Consumer)
- 作用: 消息的接收方,類似于收信的人
- 職責: 從隊列中獲取并處理消息
工作流程
生產者 → Exchange → Queue → 消費者↓ ↓ ↓ ↓寫信 → 郵局 → 郵箱 → 收信人
綁定關系 (Binding)
綁定是連接Exchange和Queue的路由規則:
// 您代碼中的綁定示例
.bind(queue) // 綁定隊列
.to(exchange) // 到交換機
.with("sys.msg.*") // 路由鍵規則
這意味著:
- 當生產者發送路由鍵為?
sys.msg.login
?的消息時 → 會路由到?queue_sys_msg
- 當發送路由鍵為?
user.info.update
?的消息時 → 不會路由到此隊列
實際應用場景舉例
場景: 系統消息通知
-
生產者: 用戶服務
// 發送登錄消息 rabbitTemplate.convertAndSend("exchange_msg", "sys.msg.login", "用戶張三登錄");
-
Exchange:
exchange_msg
(Topic類型)- 接收到路由鍵?
sys.msg.login
?的消息
- 接收到路由鍵?
-
路由判斷:
sys.msg.login
?匹配?sys.msg.*
?規則 ?- 消息被路由到?
queue_sys_msg
-
隊列:
queue_sys_msg
- 存儲消息等待處理
-
消費者: 系統通知服務
@RabbitListener(queues = "queue_sys_msg") public void handleSysMessage(String message) {// 處理系統消息logger.info("收到系統消息: " + message); }
關鍵理解點
- 解耦性: 生產者不需要知道具體哪個消費者會處理消息
- 靈活性: 通過不同的Exchange類型和綁定規則,可以實現各種消息路由策略
- 可靠性: 消息持久化、隊列持久化確保消息不會丟失
- 擴展性: 可以輕松添加新的隊列和消費者
集成Rabbitmq - 引入配置和依賴
<!-- 引入 RabbitMQ 依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: imooc-red-book
集成Rabbitmq - 創建交換機和隊列
?我們來看完整代碼
package com.imooc;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/*** 根據模型編寫代碼:* 1. 定義交換機* 2. 定義隊列* 3. 創建交換機* 4. 創建隊列* 5. 隊列和交換機的綁定*/public static final String EXCHANGE_MSG = "exchange_msg";public static final String QUEUE_SYS_MSG = "queue_sys_msg";@Bean(EXCHANGE_MSG)public Exchange exchange() {return ExchangeBuilder // 構建交換機.topicExchange(EXCHANGE_MSG) // 使用topic類型,參考:https://www.rabbitmq.com/getstarted.html.durable(true) // 設置持久化,重啟mq后依然存在.build();}@Bean(QUEUE_SYS_MSG)public Queue queue() {return new Queue(QUEUE_SYS_MSG);}@Beanpublic Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("sys.msg.*") // 定義路由規則(requestMapping).noargs();// FIXME: * 和 # 分別代表什么意思?}}
Spring Boot自動創建機制
當Spring容器啟動時,會自動掃描所有標注了@Bean
的方法,并將返回值注冊到Spring容器中。對于RabbitMQ組件,Spring AMQP會自動檢測這些Bean并在RabbitMQ服務器上創建對應的實體。
代碼執行流程
1. 創建交換機
@Bean(EXCHANGE_MSG)
public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG) // 創建名為"exchange_msg"的Topic交換機.durable(true) // 持久化,服務器重啟后不會丟失.build();
}
2. 創建隊列
@Bean(QUEUE_SYS_MSG)
public Queue queue() {return new Queue(QUEUE_SYS_MSG); // 創建名為"queue_sys_msg"的隊列
}
3. 建立綁定關系
@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue) // 綁定隊列.to(exchange) // 到交換機.with("sys.msg.*") // 使用路由鍵模式.noargs();
}
Topic交換機的路由規則
關于您代碼中的FIXME注釋,Topic類型的通配符含義:
-
*
(星號): 匹配一個單詞sys.msg.*
?可以匹配:sys.msg.login
、sys.msg.logout
、sys.msg.error
- 但不能匹配:
sys.msg.user.login
(因為有兩個單詞)
-
#
(井號): 匹配零個或多個單詞sys.msg.#
?可以匹配:sys.msg
、sys.msg.login
、sys.msg.user.login.success
集成Rabbitmq - 創建生產者,配置路由規則
1. 添加依賴 (pom.xml)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. 配置文件 (application.yml)
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /
3. RabbitMQ配置類 (已有)
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_MSG = "exchange_msg";public static final String QUEUE_SYS_MSG = "queue_sys_msg";// 創建Topic交換機@Bean(EXCHANGE_MSG)public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG).durable(true).build();}// 創建隊列@Bean(QUEUE_SYS_MSG)public Queue queue() {return new Queue(QUEUE_SYS_MSG);}// 綁定關系:隊列綁定到交換機,使用路由規則@Beanpublic Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("sys.msg.*") // 關鍵:路由鍵規則.noargs();}
}
1. 核心常量定義
public static final String EXCHANGE_MSG = "exchange_msg";
public static final String QUEUE_SYS_MSG = "queue_sys_msg";
- 作用: 定義交換機和隊列的名稱常量
- 好處: 避免硬編碼,便于維護和引用
2. Topic交換機配置
@Bean(EXCHANGE_MSG)
public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG) // 創建Topic類型交換機.durable(true) // 持久化配置.build();
}
Topic交換機特點:
- 類型: Topic Exchange(主題交換機)
- 路由規則: 支持通配符匹配
*
?:匹配一個單詞#
?:匹配零個或多個單詞
- 持久化:?
durable(true)
?確保服務器重啟后交換機不丟失
3. 隊列配置
@Bean(QUEUE_SYS_MSG)
public Queue queue() {return new Queue(QUEUE_SYS_MSG);
}
- 隊列名:?
queue_sys_msg
- 用途: 存儲系統消息
- 默認配置: 非持久化隊列(可以改為持久化)
4. 綁定關系配置
@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue) // 綁定隊列.to(exchange) // 到交換機.with("sys.msg.*") // 使用路由規則.noargs();
}
?
4. 生產者控制器
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("produce")public Object produce() throws Exception {// 發送消息到指定交換機,使用特定路由鍵rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG, // 交換機名稱"sys.msg.send", // 路由鍵"我發了一個消息\~\~" // 消息內容);return GraceJSONResult.ok();}
}
集成Rabbitmq - 消費者接受消息處理業務?
package com.imooc;import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQConsumer {@Autowiredprivate MsgService msgService;@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})public void watchQueue(String payload, Message message) {log.info(payload);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info(routingKey);}}
1. 消費者組件定義
@Slf4j
@Component
public class RabbitMQConsumer {
@Component
: 將此類注冊為Spring Bean@Slf4j
: 自動生成日志對象,用于記錄日志
2. 隊列監聽
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
@RabbitListener
: 監聽指定隊列的注解queues = {RabbitMQConfig.QUEUE_SYS_MSG}
: 監聽名為?queue_sys_msg
?的隊列String payload
: 接收消息的具體內容Message message
: 完整的消息對象,包含元數據
3. 消息處理邏輯
log.info(payload); // 打印消息內容
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey); // 打印路由鍵
完整工作機制
監聽機制
- 當應用啟動時,Spring會自動掃描帶有?
@RabbitListener
?的方法 - 為該方法創建一個消息監聽容器
- 容器會持續監聽?
queue_sys_msg
?隊列
消息處理流程
- 接收消息: 當隊列中有新消息時,自動觸發?
watchQueue
?方法 - 解析內容: 獲取消息的文本內容 (
payload
) - 提取路由鍵: 從消息屬性中獲取路由鍵信息
- 記錄日志: 將消息內容和路由鍵打印到控制臺
rabbitTemplate.convertAndSend
?方法詳解?
這行代碼是RabbitMQ異步消息發送的核心部分,讓我逐個參數詳細解析:
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,"sys.msg." + MessageEnum.FOLLOW_YOU.enValue,JsonUtils.objectToJson(messageMO)
);
參數解析
參數1: 交換機名稱
RabbitMQConfig.EXCHANGE_MSG
- 作用: 指定消息要發送到哪個交換機
- 實際值: 通常是?
"exchange_msg"
?(常量值) - 工作方式: 交換機接收所有消息,并根據路由規則分發
參數2: 路由鍵
"sys.msg." + MessageEnum.FOLLOW_YOU.enValue
- 拼接結果:?
"sys.msg.follow"
- 作用: 決定消息如何被路由到目標隊列
- 匹配規則: 與交換機綁定時定義的模式進行匹配
- 如:?
sys.msg.*
?會匹配此路由鍵
- 如:?
參數3: 消息內容
JsonUtils.objectToJson(messageMO)
- 輸入:?
messageMO
?(消息對象) - 轉換過程: 對象 → JSON字符串
- 輸出示例:
{"fromUserId": "用戶123","toUserId": "博主456","msgContent": null
}
- 傳輸形式: 字節數組形式在網絡上傳輸
?異步解耦 - 系統消息入庫保存
階段一:關注操作(生產者)
@Transactional
@Override
public void doFollow(String myId, String vlogerId) {// 1. 核心業務邏輯(同步)String fid = sid.nextShort();Fans fans = new Fans();// ... 設置粉絲關系fansMapper.insert(fans); // 插入粉絲關系到數據庫// 2. 構建消息對象MessageMO messageMO = new MessageMO();messageMO.setFromUserId(myId); // 關注者messageMO.setToUserId(vlogerId); // 被關注者// 3. 異步發送消息(關鍵點)rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,"sys.msg." + MessageEnum.FOLLOW_YOU.enValue, // 路由鍵:sys.msg.followJsonUtils.objectToJson(messageMO));// 事務提交,關注操作立即完成!
}
階段二:消息消費(消費者)
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {// 1. 解析JSON消息MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);String routingKey = message.getMessageProperties().getReceivedRoutingKey();// 2. 根據路由鍵判斷消息類型并處理if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {// 異步執行系統消息入庫msgService.createMsg(messageMO.getFromUserId(), // 關注者IDmessageMO.getToUserId(), // 被關注者ID MessageEnum.FOLLOW_YOU.type, // 消息類型:關注(1)null // 無額外內容);}// ... 處理其他消息類型
}
異步解耦的核心優勢
1. 事務分離
// 主事務:關注業務
@Transactional
public void doFollow() {fansMapper.insert(fans); // 核心業務rabbitTemplate.convertAndSend(); // 發送MQ(非阻塞)
} // 事務立即提交// 獨立處理:消息入庫
@RabbitListener
public void watchQueue() {msgService.createMsg(); // 在獨立的事務中處理
}
2. 時序對比
傳統同步方式:
用戶點擊關注↓
[開始事務]├── 插入粉絲關系 (50ms)├── 更新互關狀態 (30ms) └── 創建系統消息 (100ms) ← 可能慢
[提交事務] (180ms總耗時)↓
返回成功給用戶
MQ異步方式:
用戶點擊關注↓
[開始事務]├── 插入粉絲關系 (50ms)├── 更新互關狀態 (30ms)└── 發送MQ消息 (5ms) ← 超快
[提交事務] (85ms總耗時)↓
返回成功給用戶 ← 用戶立即看到結果[后臺異步]└── 創建系統消息 (100ms) ← 后臺處理
3. 系統消息入庫的異步處理
消息流轉過程:
// 發送的JSON消息
{"fromUserId": "user123","toUserId": "vlogger456"
}// 路由鍵
"sys.msg.follow"// 最終入庫的系統消息
INSERT INTO sys_msg (from_user_id = 'user123',to_user_id = 'vlogger456', msg_type = 1, -- FOLLOW_YOU類型msg_content = null,create_time = now()
);
容錯和可靠性保障
1. 消息持久化
// RabbitMQ配置
@Bean
public Queue queue() {return QueueBuilder.durable("queue_sys_msg") // 持久化隊列.build();
}
2. 失敗重試機制
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {try {// 處理消息msgService.createMsg(...);} catch (Exception e) {log.error("處理消息失敗: {}", e.getMessage());// 消息會自動重新入隊重試throw e; // 拋出異常觸發重試}
}
其他相關的操作也同樣進行異步解耦即可,我們已經在消費者模型中做了if判斷處理
package com.imooc;import com.imooc.base.RabbitMQConfig;
import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQConsumer {@Autowiredprivate MsgService msgService;@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})public void watchQueue(String payload, Message message) {log.info(payload);MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info(routingKey);if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.FOLLOW_YOU.type,null);} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_VLOG.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.FOLLOW_YOU.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.COMMENT_VLOG.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.COMMENT_VLOG.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.REPLY_YOU.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.REPLY_YOU.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_COMMENT.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.LIKE_COMMENT.type,messageMO.getMsgContent());} else {GraceException.display(ResponseStatusEnum.SYSTEM_OPERATION_ERROR);}}}
消息流轉的完整過程
1. 發送端(生產者)
// 關注操作中發送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG, // 交換機:exchange_msg"sys.msg." + MessageEnum.FOLLOW_YOU.enValue, // 路由鍵:sys.msg.followJsonUtils.objectToJson(messageMO) // JSON消息
);
2. RabbitMQ路由過程
消息發送到交換機 "exchange_msg"↓
交換機根據路由鍵 "sys.msg.follow" 進行路由判斷↓
匹配綁定規則:queue_sys_msg 綁定了 "sys.msg.*"↓
"sys.msg.follow" 匹配 "sys.msg.*" ?↓
消息被路由到隊列 "queue_sys_msg"↓
消息在隊列中等待被消費
3. 消費端(監聽器)
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {// 自動監聽隊列,有消息就觸發此方法
}
監聽機制的工作原理
@RabbitListener 注解的作用
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
Spring Boot啟動時:
- 掃描注解: Spring掃描到?
@RabbitListener
?注解 - 創建監聽容器: 為該方法創建一個?
MessageListenerContainer
- 建立連接: 連接到RabbitMQ服務器
- 監聽隊列: 持續監聽?
queue_sys_msg
?隊列 - 等待消息: 進入阻塞狀態,等待隊列中有新消息
消息到達時:
[隊列中有新消息]↓
[監聽容器檢測到消息]↓
[自動調用 watchQueue 方法]↓
[傳入消息內容和元數據]
消息處理的具體流程
參數接收
public void watchQueue(String payload, Message message) {// payload: JSON字符串內容// message: 完整的AMQP消息對象(包含屬性、路由鍵等)
}
消息解析
// 1. 打印消息內容
log.info(payload); // 輸出:{"fromUserId":"user123","toUserId":"vlogger456"}// 2. 解析JSON為對象
MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);// 3. 獲取路由鍵
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey); // 輸出:sys.msg.follow
業務邏輯分發
// 根據路由鍵判斷消息類型
if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {// 處理關注消息:sys.msg.followmsgService.createMsg(messageMO.getFromUserId(), // 關注者messageMO.getToUserId(), // 被關注者MessageEnum.FOLLOW_YOU.type, // 消息類型:1null // 無額外內容);
}