一、前言
? ? ?再JAVA系統開發中,再高并發的場景經常需要使用到消息隊列,有時候是不得不使用到消息對了。特別是大數據量的并發處理。對數據實時性要求又沒那么高的情況下。
用戶請求 → 接入層(Nginx) → 限流 → 消息隊列 → 訂單服務 → 庫存服務 → 支付服務
↑ ? ? ? ? ? ? ? ? ? ? ↓
結果緩存 ←───────────────┘
在高并發場景下,消息隊列(MQ)作為系統解耦、流量削峰和異步處理的核心組件,其性能優化和穩定性保障至關重要。下面我將從架構設計、性能優化、可靠性保障等方面詳細解析高并發場景下的MQ使用策略。
高并發場景下的MQ選型策略
1. 主流MQ性能對比
特性 | RabbitMQ | Kafka | RocketMQ | Pulsar |
---|---|---|---|---|
吞吐量 | 萬級 | 百萬級 | 十萬級 | 百萬級 |
延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒級 |
持久化 | 支持 | 支持 | 支持 | 支持 |
事務消息 | 支持 | 支持(0.11+) | 支持 | 支持 |
高可用 | 鏡像隊列 | 分區復制 | 主從復制 | 分層存儲 |
適用場景 | 業務解耦/復雜路由 | 日志/流處理 | 訂單/交易 | 多租戶/流計算 |
2. 選型建議
電商秒殺:RocketMQ(事務消息+高吞吐)
日志收集:Kafka(超高吞吐+分區存儲)
金融支付:RabbitMQ(強一致性+復雜路由)
物聯網IoT:Pulsar(多租戶+低延遲)
二、MQ主要使用,一是數據產生,第二是消費
消息隊列(MQ)是分布式系統中常用的異步通信機制,Java中常用的MQ實現包括RabbitMQ、Apache Kafka、ActiveMQ、RocketMQ等。下面我將介紹這些MQ在Java中的基本使用方法。
三、直接上代碼示例
生產者:
/*** 消息隊列的生產者*/ package cn.xxx.module.member.mq.producer;
@Slf4j
@Component
public class MemberUserProducer {@Resourceprivate ApplicationContext applicationContext;/*** 發送 {@link MemberUserCreateMessage} 消息** @param userId 用戶編號*/public void sendUserCreateMessage(Long userId) {applicationContext.publishEvent(new MemberUserCreateMessage().setUserId(userId));}}
消費者:
/*** 消息隊列的消費者*/ package cn.xxx.module.member.mq.consumer;
@Slf4j
@Component
public class MemberRegisterPointIssueConsumer implements ApplicationRunner {@Resourceprivate RocketTXMqService rocketTXMqService;@Resourceprivate MemberPointIssueApi memberPointIssueApi;@Value("${rocketmq.producer2.topic}")private String memberRegisterPointIssueTopic;@Overridepublic void run(ApplicationArguments args) throws Exception {DefaultMQPushConsumer pushConsumer = rocketTXMqService.getPushConsumer2();if (null != pushConsumer) {try {pushConsumer.subscribe(memberRegisterPointIssueTopic, "*");// 注冊回調實現類來處理從broker拉取回來的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息處理邏輯IssueRegisterTaskPointReqVO issueRegisterTaskPointReqVO = JSONObject.parseObject(msgs.get(0).getBody(), IssueRegisterTaskPointReqVO.class);log.info("%s Receive Topic %s New Messages: %s issueRegisterTaskPointReqVO: %s %n", Thread.currentThread().getName(), memberRegisterPointIssueTopic, msgs, JSONObject.toJSONString(issueRegisterTaskPointReqVO));memberPointIssueApi.issueRegisterTaskPoint(issueRegisterTaskPointReqVO);// 標記該消息已經被成功消費, 根據消費情況,返回處理狀態return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 啟動消費者實例pushConsumer.start();log.info("push topic{} consumer start success", memberRegisterPointIssueTopic);} catch (MQClientException e) {log.error("push topic{} MQClientException:{}", memberRegisterPointIssueTopic, e.getMessage());}}}
}
可靠性保障機制
1. 消息不丟失設計
環節 | 保障措施 |
---|---|
生產者 | 事務消息/confirm機制+本地消息表+定時任務補償 |
Broker | 同步刷盤+多副本同步復制(ISR)+ RAID磁盤陣列 |
消費者 | 手動ACK+消費冪等設計+死信隊列+消息軌跡追蹤 |
?消息積壓處理方案
// 動態擴容消費者實例(Kafka示例)
// 1. 監控積壓量
long lag = getConsumerLag("order-topic", "consumer-group");
// 2. 自動擴容規則
if (lag > 100000) { // 積壓超過10萬scaleConsumerInstances(2); // 雙倍擴容
} else if (lag < 1000) {scaleConsumerInstances(0.5); // 縮容50%
}// 3. 緊急處理方案
if (lag > 500000) {// 啟動降級處理服務startDegradeService();// 消息轉存至冷存儲transferToColdStorage();
}