返利app的消息隊列架構:基于RabbitMQ的異步通信與解耦實踐

返利app的消息隊列架構:基于RabbitMQ的異步通信與解耦實踐

大家好,我是阿可,微賺淘客系統及省賺客APP創始人,是個冬天不穿秋褲,天冷也要風度的程序猿!

在返利app的業務流程中,用戶下單、返利計算、傭金到賬、消息通知等環節存在強依賴關系——傳統同步調用模式下,若“返利計算服務”響應延遲,會導致整個下單流程卡頓,甚至引發連鎖故障。為解決這一問題,我們引入RabbitMQ消息隊列,基于“生產者-交換機-隊列-消費者”架構,實現服務間異步通信與業務解耦,將下單流程響應時間從500ms縮短至150ms,系統峰值吞吐量提升2倍。以下從架構設計、核心組件實現、業務場景落地三方面展開,附完整代碼示例。
返利app

一、返利app RabbitMQ架構設計

1.1 架構分層與組件職責

針對返利app的業務特性,設計三層消息通信架構,各組件職責如下:

  • 生產者層:各微服務(訂單服務、用戶服務、返利服務)作為生產者,將業務事件(如“訂單創建”“返利生成”)封裝為消息發送至RabbitMQ;
  • 中間件層:RabbitMQ通過交換機(Exchange)與隊列(Queue)的綁定關系,實現消息路由——采用Topic交換機支持按規則匹配路由,Fanout交換機實現廣播通知;
  • 消費者層:下游服務(如通知服務、統計服務)作為消費者,監聽指定隊列,異步處理消息,避免與上游服務強耦合。

1.2 核心業務消息流轉路徑

以“用戶下單”場景為例,消息流轉路徑為:

  1. 訂單服務(生產者)創建“訂單創建”消息,發送至order-exchange交換機;
  2. 交換機按路由鍵order.created,將消息路由至order-confirm-queue(商家確認隊列)與rebate-calculate-queue(返利計算隊列);
  3. 商家服務監聽order-confirm-queue,異步處理訂單確認;返利服務監聽rebate-calculate-queue,異步計算返利金額;
  4. 返利服務計算完成后,作為生產者發送“返利生成”消息至rebate-exchange,由通知服務監聽隊列并發送用戶到賬通知。

二、RabbitMQ核心組件代碼實現

2.1 消息生產者封裝(通用發送組件)

基于Spring AMQP封裝通用消息發送組件,支持指定交換機、路由鍵與消息屬性,代碼如下:

package cn.juwatech.rebate.mq.producer;import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;/*** RabbitMQ通用消息生產者*/
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;/*** 發送消息(帶確認機制,確保消息可靠投遞)* @param exchange 交換機名稱* @param routingKey 路由鍵* @param message 消息體*/public void sendMessage(String exchange, String routingKey, Object message) {// 1. 生成消息唯一ID(用于消息確認與追蹤)String messageId = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(messageId);// 2. 設置消息確認回調(確保消息到達交換機)rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {if (ack) {// 消息成功到達交換機System.out.printf("消息[%s]已到達交換機,exchange:%s%n", messageId, exchange);} else {// 消息投遞失敗,可記錄日志并觸發重試System.err.printf("消息[%s]投遞交換機失敗,原因:%s%n", messageId, cause);}});// 3. 設置消息返回回調(交換機無法路由時觸發)rabbitTemplate.setReturnsCallback(returned -> {System.err.printf("消息[%s]路由失敗,routingKey:%s,原因:%s%n",messageId, returned.getRoutingKey(), returned.getReplyText());// 路由失敗處理:如發送至死信隊列handleReturnedMessage(returned, message);});// 4. 發送消息rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);}/*** 處理路由失敗的消息(發送至死信隊列)*/private void handleReturnedMessage(org.springframework.amqp.core.ReturnedMessage returned, Object message) {String deadExchange = RabbitMqConfig.DEAD_LETTER_EXCHANGE;String deadRoutingKey = RabbitMqConfig.DEAD_LETTER_ROUTING_KEY;rabbitTemplate.convertAndSend(deadExchange, deadRoutingKey, message, new CorrelationData(UUID.randomUUID().toString()));}
}

2.2 RabbitMQ配置類(交換機、隊列、綁定關系)

通過配置類定義業務所需的交換機、隊列及綁定規則,包含死信隊列配置以處理失敗消息,代碼如下:

package cn.juwatech.rebate.mq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ交換機、隊列、綁定關系配置*/
@Configuration
public class RabbitMqConfig {// 1. 訂單相關配置public static final String ORDER_EXCHANGE = "order-exchange";public static final String ORDER_CONFIRM_QUEUE = "order-confirm-queue";public static final String REBATE_CALCULATE_QUEUE = "rebate-calculate-queue";public static final String ROUTING_KEY_ORDER_CREATED = "order.created";// 2. 返利相關配置public static final String REBATE_EXCHANGE = "rebate-exchange";public static final String REBATE_NOTIFY_QUEUE = "rebate-notify-queue";public static final String ROUTING_KEY_REBATE_GENERATED = "rebate.generated";// 3. 死信隊列配置public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter";/*** 1. 聲明死信交換機與死信隊列(處理失敗消息)*/@Beanpublic DirectExchange deadLetterExchange() {// Direct交換機:精確匹配路由鍵return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}@Beanpublic Queue deadLetterQueue() {// 持久化隊列,避免消息丟失return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}@Beanpublic Binding deadLetterBinding() {// 綁定死信交換機與隊列return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}/*** 2. 聲明訂單交換機(Topic類型,支持模糊匹配路由鍵)*/@Beanpublic TopicExchange orderExchange() {return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}/*** 聲明訂單確認隊列(綁定死信交換機,消息消費失敗時轉發)*/@Beanpublic Queue orderConfirmQueue() {return QueueBuilder.durable(ORDER_CONFIRM_QUEUE)// 配置死信交換機.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)// 配置死信路由鍵.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)// 配置消息過期時間(30分鐘).withArgument("x-message-ttl", 1800000).build();}/*** 聲明返利計算隊列*/@Beanpublic Queue rebateCalculateQueue() {return QueueBuilder.durable(REBATE_CALCULATE_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY).withArgument("x-message-ttl", 1800000).build();}/*** 綁定訂單交換機與訂單確認隊列*/@Beanpublic Binding orderConfirmBinding() {return BindingBuilder.bind(orderConfirmQueue()).to(orderExchange()).with(ROUTING_KEY_ORDER_CREATED);}/*** 綁定訂單交換機與返利計算隊列*/@Beanpublic Binding rebateCalculateBinding() {return BindingBuilder.bind(rebateCalculateQueue()).to(orderExchange()).with(ROUTING_KEY_ORDER_CREATED);}/*** 3. 聲明返利交換機與通知隊列*/@Beanpublic TopicExchange rebateExchange() {return ExchangeBuilder.topicExchange(REBATE_EXCHANGE).durable(true).build();}@Beanpublic Queue rebateNotifyQueue() {return QueueBuilder.durable(REBATE_NOTIFY_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY).build();}@Beanpublic Binding rebateNotifyBinding() {return BindingBuilder.bind(rebateNotifyQueue()).to(rebateExchange()).with(ROUTING_KEY_REBATE_GENERATED);}
}

2.3 消息消費者實現(業務處理)

以“返利計算消費者”為例,監聽rebate-calculate-queue隊列,異步處理訂單返利計算,代碼如下:

package cn.juwatech.rebate.mq.consumer;import cn.juwatech.rebate.dto.OrderDTO;
import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import cn.juwatech.rebate.service.RebateCalculateService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 返利計算消息消費者*/
@Component
@RequiredArgsConstructor
public class RebateCalculateConsumer {private final RebateCalculateService rebateCalculateService;/*** 監聽返利計算隊列,處理訂單返利* @param orderDTO 訂單數據(消息體)*/@RabbitListener(queues = RabbitMqConfig.REBATE_CALCULATE_QUEUE)public void handleRebateCalculate(OrderDTO orderDTO) {try {System.out.printf("開始處理訂單返利,訂單ID:%s,用戶ID:%s%n", orderDTO.getOrderId(), orderDTO.getUserId());// 調用返利計算服務(核心業務邏輯)rebateCalculateService.calculateRebate(orderDTO);// 手動確認消息(默認AUTO模式,此處顯式確認確保業務處理完成)// 注:若使用AUTO模式,方法無異常則自動確認,拋出異常則拒絕并重回隊列} catch (Exception e) {System.err.printf("訂單返利處理失敗,訂單ID:%s,原因:%s%n", orderDTO.getOrderId(), e.getMessage());// 消費失敗處理:可記錄日志,觸發告警,避免消息重復重試throw new RuntimeException("返利計算失敗,消息將轉發至死信隊列", e);}}
}

2.4 業務層消息發送示例(訂單服務)

訂單服務創建訂單后,通過生產者發送“訂單創建”消息,觸發后續異步流程,代碼如下:

package cn.juwatech.rebate.service.impl;import cn.juwatech.rebate.dto.OrderDTO;
import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import cn.juwatech.rebate.mq.producer.RabbitMqProducer;
import cn.juwatech.rebate.service.OrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** 訂單服務實現類*/
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {private final RabbitMqProducer rabbitMqProducer;// 省略訂單DAO層依賴...@Override@Transactional(rollbackFor = Exception.class)public void createOrder(OrderDTO orderDTO) {// 1. 保存訂單數據(本地事務)saveOrder(orderDTO);// 2. 發送訂單創建消息(異步觸發商家確認與返利計算)rabbitMqProducer.sendMessage(RabbitMqConfig.ORDER_EXCHANGE,RabbitMqConfig.ROUTING_KEY_ORDER_CREATED,orderDTO);System.out.printf("訂單創建成功,訂單ID:%s,消息已發送%n", orderDTO.getOrderId());}// 省略訂單保存邏輯...
}

三、消息可靠性保障與性能優化

3.1 可靠性保障措施

  1. 消息持久化:交換機、隊列均配置為durable=true,消息發送時設置deliveryMode=2(持久化),避免RabbitMQ重啟丟失消息;
  2. 生產者確認:通過ConfirmCallback確保消息到達交換機,ReturnsCallback處理路由失敗消息,轉發至死信隊列;
  3. 消費者確認:采用手動確認模式(或AUTO模式結合異常處理),確保業務邏輯執行完成后再確認消息,避免消息丟失;
  4. 死信隊列:配置消息過期時間(TTL)與死信路由,消費失敗的消息最終進入死信隊列,避免無限重試導致系統資源浪費。

3.2 性能優化策略

  1. 消息批量發送:對高頻低時延要求的場景(如用戶行為日志),采用rabbitTemplate.convertAndSend批量發送,減少網絡請求次數;
  2. 消費者線程池配置:通過spring.rabbitmq.listener.simple.concurrencymax-concurrency設置消費者線程池大小,默認1-10,根據業務調整為5-20;
  3. 隊列分片:對高流量隊列(如rebate-calculate-queue),按用戶ID哈希拆分多個隊列(如rebate-calculate-queue-1-4),分散消費壓力;
  4. 消息壓縮:對大體積消息(如包含商品詳情的訂單數據),發送前通過Gzip壓縮,接收后解壓,減少網絡傳輸與存儲開銷。

本文著作權歸聚娃科技省賺客app開發者團隊,轉載請注明出處!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/96717.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/96717.shtml
英文地址,請注明出處:http://en.pswp.cn/web/96717.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Vue3 響應式失效 debug:Proxy 陷阱導致數據更新異常的深度排查

人們眼中的天才之所以卓越非凡,并非天資超人一等而是付出了持續不斷的努力。1萬小時的錘煉是任何人從平凡變成超凡的必要條件。———— 馬爾科姆格拉德威爾 🌟 Hello,我是Xxtaoaooo! 🌈 “代碼是邏輯的詩篇&#xff0…

【貪心算法】day10

📝前言說明: 本專欄主要記錄本人的貪心算法學習以及LeetCode刷題記錄,按專題劃分每題主要記錄:(1)本人解法 本人屎山代碼;(2)優質解法 優質代碼;&#xff…

LeetCode算法日記 - Day 42: 島嶼數量、島嶼的最大面積

目錄 1. 島嶼數量 1.1 題目解析 1.2 解法 1.3 代碼實現 2. 島嶼的最大面積 2.1 題目解析 2.2 解法 2.3 代碼實現 1. 島嶼數量 https://leetcode.cn/problems/number-of-islands/ 給你一個由 1(陸地)和 0(水)組成的的二維…

短波紅外相機在機器視覺檢測方向的應用

短波紅外相機在機器視覺檢測方向的應用短波紅外相機:機器視覺的“低成本突破者”一、打破成本困局:短波紅外的“平民化”革新二、核心技術:有機材料的“硬核創新”1. 材料革命:有機感光層的優勢2. 工藝兼容:嫁接成熟CM…

【數據結構與算法】圖 Floyd算法

相關題目: 1334. 閾值距離內鄰居最少的城市 - 力扣(LeetCode) 資料 : Floyd算法原理及公式推導 - 知乎 Floyd 算法是一種經典的動態規劃算法,用與求解圖中所有頂點之間的最短短路路徑。它由Robert Floyd 于1962…

衛星通信天線的指向精度,含義、測量和計算

衛星通信天線的指向精度,含義、測量和計算我們在衛星通信天線的技術規格書中,都會看到天線指向精度這個指標。一般來說,技術規格書上的天線指向精度的參數是這么寫的:“天線指向精度≤1/10半功率波束帶寬”今天這個文章&#xff0…

基于LSTM與3秒級Tick數據的金融時間序列預測實現

數據加載模塊解析 def load_data(filepath):df pd.read_csv(filepath)return df該函數承擔基礎數據采集職責,通過Pandas庫讀取CSV格式的高頻交易數據(典型如股票分筆成交明細)。輸入參數為文件路徑字符串,輸出結構化DataFrame對象…

C# --- Field and Property

C# --- Field and Property字段 (Field) vs. 屬性 (Property)Property的聲明初始化方法單例類property錯誤初始化導致線程泄漏字段 (Field) vs. 屬性 (Property) 字段 (Field) - 數據的存儲容器 字段是直接在類或結構中聲明的變量。它是存儲數據的地方,是對象狀態的…

【Python】實現一個文件夾快照與比較工具

1. 工具簡介 在日常開發、項目管理或備份場景中,我們經常需要知道某個文件夾中的文件是否發生變化,例如: 項目源碼是否新增或修改文件?數據集是否被不小心刪除或篡改?備份文件夾是否和上次一致? 本教程將教…

LINUX913 shell:set ip [lindex $argv 0],\r,send_user,spawn ssh root@ip “cat “

問題 獲取公鑰 [codesamba ~]$ cat pub.sh #!/bin/usr/expect set ip "$1" set password 123456 set timeout 20 spawn ssh root192.168.235.100:cat ~/.ssh/id_rsa.pub expect { "yes/no" {send "yes/r";exp_continue} "password:" {…

Acwing算法基礎課--鏈表

一、單鏈表 AcWing 826. 單鏈表 代碼 N 100010 idx 0 e [0] * N ne [0] * N head -1def init():global idx,headidx 0head -1def add_head(x):global idx,heade[idx] xne[idx] headhead idxidx 1def delete(k):ne[k] ne[ne[k]]def add_k(k,x):global idxe[idx] …

AI表征了西方的有界,AI+體現了東方的無界

AI表征了西方的有界,AI體現了東方的無界,試圖通過文化差異的視角來對比傳統AI(AI)與增強型或融合型AI(AI)的特征。一、“AI表征了西方的有界”西方的“有界”可以理解為:1、邏輯清晰、結構嚴謹&…

LabVIEW泵輪檢測

?在現代制造業蓬勃發展的浪潮下,汽車行業也迎來了高速發展期。液力變矩器作為實現車輛自動變速的關鍵零件產品,在汽車動力系統中扮演著不可或缺的角色。泵輪作為液力變矩器的核心組成部分,其生產質量直接影響著液力變矩器的性能。因此&#…

RT-DETRv2 中的坐標回歸機制深度解析:為什么用 `sigmoid(inv_sigmoid(ref) + delta)` 而不是除以圖像尺寸?

引言:一個看似簡單的公式,背后藏著工業級設計智慧 在閱讀 RT-DETRv2(Real-Time DETR v2)源碼時,我曾被一行代碼深深震撼: inter_ref_bbox F.sigmoid(bbox_head[i](output) inverse_sigmoid(ref_points_de…

簡單了解一下GraphRAG

傳統RAG的缺點 當我們將一段文本信息以句子分割后,存入到向量數據庫中。用戶提問“老王喜歡吃什么”,這個問題會與向量數據庫中的許多句子關聯性比較強,能返回準確且具體的信息。 但是,若是問題換成“出現了幾次西瓜”&#xff0c…

HTTP 狀態碼背后的邏輯:從請求到響應的完整流程解析(含完整流程圖)

在日常的 Web 開發與 API 調試中,我們經常會遇到各種 HTTP 狀態碼 ——404 Not Found、401 Unauthorized、500 Internal Server Error... 這些數字背后并非隨機出現,而是服務器處理請求過程中不同階段的 "反饋信號"。理解這些狀態碼的觸發邏輯…

Vue:下拉框多選影響行高

目錄 一、 出現場景二、 解決方案 一、 出現場景 在使用el-select增加multiple屬性進行多選時&#xff0c;會出現高度塌陷的情況 二、 解決方案 首先需要在el-select中增加collapse-tags屬性&#xff0c;并在style中增加如下樣式 方案一 <style scoped> ::v-deep .e…

如何在高通躍龍QCS6490 Arm架構上使用Windows 11 IoT企業版?

1.簡介研華已將高通躍龍QCS6490 技術應用于嵌入式模塊、單板電腦和AI攝像頭等各種規格的嵌入式硬件中。QCS6490平臺支持全面的操作系統生態系統&#xff0c;包括Windows、Ubuntu、Yocto和 Android。Windows 11 IoT企業版是微軟新一代的物聯網操作系統&#xff0c;具有更強的安全…

阿里云國際代理:如何利用RDS構建高可用、可擴展的數據庫架構

講下云數據庫RDS案例解析&#xff0c;若在上云或用云過程中有不懂的&#xff0c;可尋云樞國際yunshuguoji助力免卡上云用云。1、RDS MySQL數據庫代理支持讀寫分離、連接保持、就近訪問、事務拆分、連接池、SSL加密等功能&#xff0c;能夠降低主實例負載&#xff0c;提高實例可用…

C++之特殊類設計

文章目錄前言一、 設計一個不能被拷貝的類1. C98 實現方式2. C11 實現方式二、設計一個只能在堆上創建對象的類1. 方法一&#xff1a;析構函數私有&#xff0c;提供destory接口釋放資源2. 方法二&#xff1a;構造函數私有三、 設計一個只能在棧上創建對象的類1. 實現方式四、設…