MQ解決高并發下訂單問題,實現流量削峰

文章目錄

      • 示例:電商秒殺系統中的流量削峰
        • 1. 依賴引入(Maven)
        • 2. 消息隊列配置(RabbitMQ)
        • 3. 生產者:訂單服務(接收高并發請求)
        • 4. 消費者:庫存服務(按系統容量處理訂單)
        • 5. 模擬高并發測試
      • 關鍵技術點解析
        • 1. 流量削峰的實現
        • 2. 消息可靠性保障
        • 3. 削峰前后對比
      • 生產環境優化建議
      • 其他 MQ 選型參考

以下是一個基于 Java 和 RabbitMQ 實現流量削峰的示例,展示如何通過消息隊列處理高并發下單請求:

示例:電商秒殺系統中的流量削峰

1. 依賴引入(Maven)
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version>
</dependency>
2. 消息隊列配置(RabbitMQ)
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MQConfig {private static final String HOST = "localhost";private static final int PORT = 5672;private static final String USERNAME = "guest";private static final String PASSWORD = "guest";public static final String QUEUE_NAME = "order_queue";// 創建連接工廠public static ConnectionFactory getConnectionFactory() {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory;}
}
3. 生產者:訂單服務(接收高并發請求)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class OrderService {private final Connection connection;public OrderService() throws IOException, TimeoutException {this.connection = MQConfig.getConnectionFactory().newConnection();}// 處理下單請求(削峰前)public void createOrderDirectly(Long productId, Integer count) {// 傳統模式:直接處理訂單(高并發時會壓垮數據庫)System.out.println("直接處理訂單:商品ID=" + productId + ", 數量=" + count);// 模擬數據庫操作try {Thread.sleep(200); // 假設處理一個訂單需要200ms} catch (InterruptedException e) {e.printStackTrace();}}// 處理下單請求(削峰后)public void createOrderWithMQ(Long productId, Integer count) throws IOException {try (Channel channel = connection.createChannel()) {// 聲明隊列(如果不存在則創建)channel.queueDeclare(MQConfig.QUEUE_NAME, false, false, false, null);// 封裝訂單信息為JSONString orderInfo = "{\"productId\":" + productId + ",\"count\":" + count + "}";// 發送消息到隊列channel.basicPublish("", MQConfig.QUEUE_NAME, null, orderInfo.getBytes());System.out.println("訂單已放入隊列:" + orderInfo);}}
}
4. 消費者:庫存服務(按系統容量處理訂單)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class InventoryService {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = MQConfig.getConnectionFactory();try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 聲明隊列channel.queueDeclare(MQConfig.QUEUE_NAME, false, false, false, null);// 設置消費者每次只處理1條消息(限流)channel.basicQos(1);System.out.println("庫存服務已啟動,等待訂單消息...");// 創建消費者DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到訂單:" + message);try {// 模擬處理訂單(扣庫存、更新數據庫等)processOrder(message);// 手動確認消息已處理channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();// 處理失敗,拒絕消息并重新入隊channel.basicNack(envelope.getDeliveryTag(), false, true);}}};// 啟動消費者(手動確認模式)channel.basicConsume(MQConfig.QUEUE_NAME, false, consumer);}}private static void processOrder(String orderInfo) {try {// 模擬處理訂單耗時(如扣減庫存、寫入訂單表)Thread.sleep(500);System.out.println("訂單處理完成:" + orderInfo);} catch (InterruptedException e) {e.printStackTrace();}}
}
5. 模擬高并發測試
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;public class TrafficPeakTest {public static void main(String[] args) throws IOException, TimeoutException {OrderService orderService = new OrderService();ExecutorService executor = Executors.newFixedThreadPool(100); // 模擬100個并發用戶// 模擬1000個并發下單請求(流量峰值)for (int i = 0; i < 1000; i++) {final int orderId = i;executor.submit(() -> {try {// 削峰前:直接處理訂單(可能導致系統崩潰)// orderService.createOrderDirectly(1001L, 1);// 削峰后:通過MQ異步處理訂單orderService.createOrderWithMQ(1001L, 1);} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}
}

關鍵技術點解析

1. 流量削峰的實現
  • 生產者端:將訂單請求快速放入隊列后立即返回,避免請求堆積
  • 消費者端:
    • 通過 channel.basicQos(1) 限制每次只處理 1 條消息
    • 單線程消費(可擴展為多線程),每秒處理約 2 個訂單(500ms / 訂單)
  • 效果:1000 個并發請求被隊列緩沖,系統按自身容量(2TPS)平穩處理
2. 消息可靠性保障
  • 持久化:RabbitMQ 默認將消息存儲在內存中,可配置持久化到磁盤
  • 手動確認:消費者處理完成后手動 basicAck,失敗則 basicNack 并重試
  • 死信隊列:可配置死信隊列存儲多次處理失敗的消息
3. 削峰前后對比
指標無 MQ(傳統模式)有 MQ(流量削峰)
最大并發處理量受數據庫連接數限制(如 100)隊列可緩沖無限量請求
響應時間平均 200ms(直接處理)立即返回(<10ms)
系統穩定性峰值時易崩潰平穩處理,無崩潰風險
資源利用率峰值時資源耗盡,平時閑置按固定速率使用資源

生產環境優化建議

  1. 隊列監控

    • 監控隊列長度,設置告警閾值(如超過 10 萬條未處理消息)
    • 使用 RabbitMQ Management 插件或 Prometheus + Grafana 監控
  2. 消費者擴容

    • 垂直擴容:增加消費者機器配置
    • 水平擴容:增加消費者實例數(需注意冪等性)
  3. 降級策略

    • 隊列過長時,拒絕新請求并返回 “系統繁忙”
    • 非核心業務降級(如暫時關閉短信通知)
  4. 持久化配置

    // 設置隊列持久化
    boolean durable = true;
    channel.queueDeclare(MQConfig.QUEUE_NAME, durable, false, false, null);// 設置消息持久化
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2表示持久化.build();
    channel.basicPublish("", MQConfig.QUEUE_NAME, properties, message.getBytes());
    

其他 MQ 選型參考

消息隊列吞吐量優勢場景示例項目
RabbitMQ萬級 TPS強一致性、支持事務、靈活路由金融系統訂單處理
Kafka百萬級 TPS大數據實時處理日志收集、實時數據流處理
RocketMQ十萬級 TPS高可用、順序消息電商訂單、物流系統
Pulsar百萬級 TPS云原生、多租戶分布式微服務架構

根據業務場景選擇合適的 MQ,本例使用 RabbitMQ 是因其易用性和可靠性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/84505.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/84505.shtml
英文地址,請注明出處:http://en.pswp.cn/web/84505.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【二進制安全作業】250616課上作業2 - 棧溢出漏洞利用

文章目錄 前言一、使用環境二、程序源碼1. C語言源碼2. 編譯方式 三、源碼分析四、反匯編分析1. 檢查文件安全性2. 查找目標函數3. 計算偏移量4. 繞過 strlen5. 繞過 if 五、編寫EXP結語 前言 直接進入正題 一、使用環境 處理器架構&#xff1a;x86_64 操作系統&#xff1a;U…

Python類型處理與推導式

歡迎來到啾啾的博客&#x1f431;。 記錄學習點滴。分享工作思考和實用技巧&#xff0c;偶爾也分享一些雜談&#x1f4ac;。 有很多很多不足的地方&#xff0c;歡迎評論交流&#xff0c;感謝您的閱讀和評論&#x1f604;。 目錄 1 引言2 類型處理3 高效操作AI開發常見數據結構3…

數據庫char字段做trim之后查詢很慢的解決方式

select * from TABLE0 where trim(column1):param 當表數據量大時&#xff0c;即使給column1字段加上索引&#xff0c;這條查詢語句也會很慢。 因為使用trim函數后&#xff0c;column1的索引會失效&#xff0c;有兩種處理方法&#xff1a; 1.給表加上trim(column1)函數索引 …

Kafka核心架構解析:從CAP理論到消息可靠性的設計哲學

摘要 本文從分布式系統CAP理論和消息可靠性兩個視角深入解析Kafka的架構設計&#xff0c;通過概念關系圖和組件交互圖揭示其核心設計思想&#xff0c;并詳細拆解各組件功能與協作機制。文章包含完整的交互流程分析和配置參數說明&#xff0c;是理解Kafka設計精髓的實用指南。 一…

LeetCode 275.H指數 II

題目&#xff1a; 給你一個整數數組 citations &#xff0c;其中 citations[i] 表示研究者的第 i 篇論文被引用的次數&#xff0c;citations 已經按照 非降序排列 。計算并返回該研究者的 h 指數。 h 指數的定義&#xff1a;h 代表“高引用次數”&#xff08;high citations&…

OV汽車攝像頭cmos sensor 相關情況介紹

OV汽車攝像頭cmos sensor 相關情況介紹 文章目錄 OV汽車攝像頭cmos sensor 相關情況介紹**1. 汽車攝像頭三大場景應用****2. 車載CMOS SENSOR的核心技術****3. 兩大車規認證:實現真正的車規可靠性****4. 最新產品**2022年,汽車智能化加碼提速,被譽為“智能駕駛之眼”的車載攝…

Pinia在多步驟表單中的實踐應用

引言 Pinia是Vue 3推薦的狀態管理庫&#xff0c;相比Vuex提供了更簡潔的API、更好的TypeScript支持和更靈活的組合式風格。本文基于實際項目代碼&#xff0c;詳細介紹Pinia在多步驟表單場景中的應用方法。 1. Pinia Store的創建與設計 1.1 基礎Store結構 在src/store/modul…

目標檢測之YOLOV11的環境搭建

1 創建虛擬環境 conda create -n yolov11 python3.9 conda activate yolov112 安裝ultralytics 默認是有cuda的情況下 # Install all packages together using conda conda install pytorch torchvision conda 還不能直接安裝ultralytics&#xff0c;需要通過pip進行安裝 …

Android 構建配置中的變量(通常在設備制造商或定制 ROM 的 AndroidProducts.mk 或產品配置文件中定義)

以下是 Android 構建系統中常見的用于產品配置、資源復制和構建規則的變量 1. PRODUCT_COPY_FILES 作用&#xff1a;指定需要從源碼樹復制到鏡像的文件。示例&#xff1a;PRODUCT_COPY_FILES \device/manufacturer/device_name/file.conf:$(TARGET_COPY_OUT_VENDOR)/etc/file…

火山引擎項亮:機器學習與智能推薦平臺多云部署解決方案正式發布

資料來源&#xff1a;火山引擎-開發者社區 2022年7月20日&#xff0c;火山引擎2022 Force原動力大會在北京諾金酒店成功舉辦。在上午的議程中&#xff0c;《推薦系統實踐》一書的作者、同時也是火山引擎機器學習系統負責人——項亮&#xff0c;展開了題目為《開放AI基建&#x…

NVR的方法多種取決于應用場景

攝像頭接入NVR&#xff08;網絡視頻錄像機&#xff09;的方法通常取決于具體的應用場景和設備支持的功能。 一、通過局域網接入 設備連接 &#xff1a; 將攝像機通過網絡線纜連接到NVR的對應端口&#xff0c;或者將攝像機和NVR都連接到同一個路由器/交換機上&#xff0c;確保它…

JAVA從入門到精通一文搞定

博主介紹&#xff1a; 大家好&#xff0c;我是想成為Super的Yuperman&#xff0c;互聯網宇宙廠經驗&#xff0c;17年醫療健康行業的碼拉松奔跑者&#xff0c;曾擔任技術專家、架構師、研發總監負責和主導多個應用架構。 近期專注&#xff1a; DeepSeek應用&#xff0c;RPA應用研…

火山引擎發布大模型生態廣場MCP Servers,LAS MCP助力AI數據湖構建

資料來源&#xff1a;火山引擎-開發者社區 近日&#xff0c;火山引擎發布大模型生態廣場—— MCP Servers&#xff0c;借助字節跳動生態能力&#xff0c;通過“MCP Market&#xff08;工具廣場&#xff09; 火山方舟&#xff08;大模型服務&#xff09;Trae&#xff08;應用開…

NodeJS 對接 Outlook 發信服務器實現發信功能

示例代碼&#xff1a; const express require(express); const nodemailer require(nodemailer); const querystring require(querystring); const axios require(axios);const app express(); app.use(express.json());const transporter nodemailer.createTransport({…

【同聲傳譯】RealtimeSTT:超低延遲語音轉文字,支持喚醒詞與中譯英

把你說的話實時變成文字&#xff1a;RealtimeSTT 上手體驗 想找一個真正好用的語音轉文字工具嗎&#xff1f;不用等說完一整段才出結果&#xff0c;也不用反復點擊按鈕。RealtimeSTT 這個開源項目能做到??實時??轉錄&#xff0c;你說一句&#xff0c;屏幕上幾乎同時出現文…

【大模型lora微調】關于推理時如何使用 LoRA Adapter

假設你有兩部分&#xff1a; 一個是原始大模型&#xff08;base model&#xff09; 一個是保存的 LoRA Adapter&#xff08;adapter_config.json adapter_model.bin&#xff09; 不合并的情況下推理方法 你可以用 peft 的方式加載 LoRA Adapter&#xff0c;推理時這樣寫&a…

谷歌時間序列算法:零樣本預測如何重塑行業決策?

谷歌時間序列算法&#xff1a;零樣本預測如何重塑行業決策&#xff1f; TimesFM 你是否曾面臨這樣的困境&#xff1f;—— ? 需要預測新產品銷量&#xff0c;卻苦于缺乏歷史數據&#xff1b; ? 依賴傳統模型&#xff08;如ARIMA&#xff09;&#xff0c;但調參耗時且泛化能力…

國產服務器【銀河麒麟v10】【CPU鯤鵬920】部署Minio文件服務器

目錄 準備工作操作步驟1. 確認掛載點狀態2. 創建專用用戶和目錄3. 下載ARM版Minio到掛在盤4. 環境變量配置5. 更新Systemd服務配置6. 啟動、重啟7. 防火墻8. 訪問驗證9. 故障排查&#xff08;如服務未啟動&#xff09;? 結束 準備工作 環境要求&#xff1a;Linux虛擬機 操作…

解決: React Native android webview 空白頁

Android react-native-webview 之前是正常的, 升級了 react-native / react-native-webview 等 之后, 就變成了空白頁. 通過下面的修改, 可以修復, 回到正常的狀態. 來源: https://github.com/react-native-webview/react-native-webview/issues/3697 注意 ts 文件一定要改,…

高中編程教學中教師專業發展的困境與突破:基于實踐與理論的雙重審視

一、引言 1.1 研究背景 在數字化時代&#xff0c;編程已成為一項基本技能&#xff0c;其重要性日益凸顯。編程不僅是計算機科學領域的核心能力&#xff0c;更是培養學生邏輯思維、創新能力和問題解決能力的有效途徑。高中階段作為學生成長和發展的關鍵時期&#xff0c;開展編…