文章目錄
- 示例:電商秒殺系統中的流量削峰
- 1. 依賴引入(Maven)
- 2. 消息隊列配置(RabbitMQ)
- 3. 生產者:訂單服務(接收高并發請求)
- 4. 消費者:庫存服務(按系統容量處理訂單)
- 5. 模擬高并發測試
- 關鍵技術點解析
- 1. 流量削峰的實現
- 2. 消息可靠性保障
- 3. 削峰前后對比
- 生產環境優化建議
- 其他 MQ 選型參考
以下是一個基于 Java 和 RabbitMQ 實現流量削峰的示例,展示如何通過消息隊列處理高并發下單請求:
示例:電商秒殺系統中的流量削峰
1. 依賴引入(Maven)
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version>
</dependency>
2. 消息隊列配置(RabbitMQ)
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MQConfig {private static final String HOST = "localhost";private static final int PORT = 5672;private static final String USERNAME = "guest";private static final String PASSWORD = "guest";public static final String QUEUE_NAME = "order_queue";// 創建連接工廠public static ConnectionFactory getConnectionFactory() {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory;}
}
3. 生產者:訂單服務(接收高并發請求)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class OrderService {private final Connection connection;public OrderService() throws IOException, TimeoutException {this.connection = MQConfig.getConnectionFactory().newConnection();}// 處理下單請求(削峰前)public void createOrderDirectly(Long productId, Integer count) {// 傳統模式:直接處理訂單(高并發時會壓垮數據庫)System.out.println("直接處理訂單:商品ID=" + productId + ", 數量=" + count);// 模擬數據庫操作try {Thread.sleep(200); // 假設處理一個訂單需要200ms} catch (InterruptedException e) {e.printStackTrace();}}// 處理下單請求(削峰后)public void createOrderWithMQ(Long productId, Integer count) throws IOException {try (Channel channel = connection.createChannel()) {// 聲明隊列(如果不存在則創建)channel.queueDeclare(MQConfig.QUEUE_NAME, false, false, false, null);// 封裝訂單信息為JSONString orderInfo = "{\"productId\":" + productId + ",\"count\":" + count + "}";// 發送消息到隊列channel.basicPublish("", MQConfig.QUEUE_NAME, null, orderInfo.getBytes());System.out.println("訂單已放入隊列:" + orderInfo);}}
}
4. 消費者:庫存服務(按系統容量處理訂單)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class InventoryService {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = MQConfig.getConnectionFactory();try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 聲明隊列channel.queueDeclare(MQConfig.QUEUE_NAME, false, false, false, null);// 設置消費者每次只處理1條消息(限流)channel.basicQos(1);System.out.println("庫存服務已啟動,等待訂單消息...");// 創建消費者DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到訂單:" + message);try {// 模擬處理訂單(扣庫存、更新數據庫等)processOrder(message);// 手動確認消息已處理channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();// 處理失敗,拒絕消息并重新入隊channel.basicNack(envelope.getDeliveryTag(), false, true);}}};// 啟動消費者(手動確認模式)channel.basicConsume(MQConfig.QUEUE_NAME, false, consumer);}}private static void processOrder(String orderInfo) {try {// 模擬處理訂單耗時(如扣減庫存、寫入訂單表)Thread.sleep(500);System.out.println("訂單處理完成:" + orderInfo);} catch (InterruptedException e) {e.printStackTrace();}}
}
5. 模擬高并發測試
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;public class TrafficPeakTest {public static void main(String[] args) throws IOException, TimeoutException {OrderService orderService = new OrderService();ExecutorService executor = Executors.newFixedThreadPool(100); // 模擬100個并發用戶// 模擬1000個并發下單請求(流量峰值)for (int i = 0; i < 1000; i++) {final int orderId = i;executor.submit(() -> {try {// 削峰前:直接處理訂單(可能導致系統崩潰)// orderService.createOrderDirectly(1001L, 1);// 削峰后:通過MQ異步處理訂單orderService.createOrderWithMQ(1001L, 1);} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}
}
關鍵技術點解析
1. 流量削峰的實現
- 生產者端:將訂單請求快速放入隊列后立即返回,避免請求堆積
- 消費者端:
- 通過
channel.basicQos(1)
限制每次只處理 1 條消息 - 單線程消費(可擴展為多線程),每秒處理約 2 個訂單(500ms / 訂單)
- 通過
- 效果:1000 個并發請求被隊列緩沖,系統按自身容量(2TPS)平穩處理
2. 消息可靠性保障
- 持久化:RabbitMQ 默認將消息存儲在內存中,可配置持久化到磁盤
- 手動確認:消費者處理完成后手動
basicAck
,失敗則basicNack
并重試 - 死信隊列:可配置死信隊列存儲多次處理失敗的消息
3. 削峰前后對比
指標 | 無 MQ(傳統模式) | 有 MQ(流量削峰) |
---|---|---|
最大并發處理量 | 受數據庫連接數限制(如 100) | 隊列可緩沖無限量請求 |
響應時間 | 平均 200ms(直接處理) | 立即返回(<10ms) |
系統穩定性 | 峰值時易崩潰 | 平穩處理,無崩潰風險 |
資源利用率 | 峰值時資源耗盡,平時閑置 | 按固定速率使用資源 |
生產環境優化建議
-
隊列監控:
- 監控隊列長度,設置告警閾值(如超過 10 萬條未處理消息)
- 使用 RabbitMQ Management 插件或 Prometheus + Grafana 監控
-
消費者擴容:
- 垂直擴容:增加消費者機器配置
- 水平擴容:增加消費者實例數(需注意冪等性)
-
降級策略:
- 隊列過長時,拒絕新請求并返回 “系統繁忙”
- 非核心業務降級(如暫時關閉短信通知)
-
持久化配置:
// 設置隊列持久化 boolean durable = true; channel.queueDeclare(MQConfig.QUEUE_NAME, durable, false, false, null);// 設置消息持久化 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2表示持久化.build(); channel.basicPublish("", MQConfig.QUEUE_NAME, properties, message.getBytes());
其他 MQ 選型參考
消息隊列 | 吞吐量 | 優勢場景 | 示例項目 |
---|---|---|---|
RabbitMQ | 萬級 TPS | 強一致性、支持事務、靈活路由 | 金融系統訂單處理 |
Kafka | 百萬級 TPS | 大數據實時處理 | 日志收集、實時數據流處理 |
RocketMQ | 十萬級 TPS | 高可用、順序消息 | 電商訂單、物流系統 |
Pulsar | 百萬級 TPS | 云原生、多租戶 | 分布式微服務架構 |
根據業務場景選擇合適的 MQ,本例使用 RabbitMQ 是因其易用性和可靠性。