? 11.1 為什么要用消息隊列?在哪些場景下最適合?
? 作用:
-
削峰填谷:緩解高并發壓力,異步處理任務(如秒殺下單 → MQ → 異步扣庫存)
-
解耦服務:上下游解耦(如下單服務和短信服務)
-
異步處理:降低請求延遲(如下單后異步發短信、記錄日志)
-
廣播通知:多服務訂閱同一事件(如訂單支付成功 → 通知物流、用戶、財務)
? 典型使用場景:
-
秒殺限流、異步發送郵件/短信
-
日志收集、用戶行為打點
-
微服務事件驅動架構
-
延遲任務、定時推送
? 11.2 RabbitMQ / Kafka / Redis Stream 的區別與適用場景?
特性 | RabbitMQ | Kafka | Redis Stream |
---|---|---|---|
類型 | AMQP 協議消息隊列 | 分布式日志系統(高吞吐) | 內存消息隊列(輕量級) |
吞吐量 | 中 | 高(百萬級 TPS) | 中 |
消息順序 | 支持 | 分區內保證順序 | 支持 |
持久化 | 支持磁盤持久化 | 高效持久化 | 內存為主+RDB持久化 |
消費模式 | 推模式(push) | 拉模式(pull) | 拉(XREAD)+消費者組 |
優勢 | 易用、功能全、支持ACK/TTL等 | 高性能、大數據分析、日志采集 | Node.js 原生整合好、靈活 |
場景 | 通用業務異步、RPC、訂單消息等 | 日志分析、監控鏈路、大規模消費 | 中小型異步任務隊列 |
? 11.3 MQ 如何保證消息不丟?如何實現消息的重試機制?
? 防止消息丟失的策略:
-
生產端確認機制:
-
RabbitMQ 開啟
publisher confirm
-
Kafka 開啟
acks=all
-
Redis 使用
XADD
并手動確認消費
-
-
持久化機制:
-
RabbitMQ:設置
durable queue
+persistent message
-
Kafka:磁盤持久化 + 日志壓縮
-
Redis Stream:持久化配置 + 手動 ack
-
-
消費端手動 ack:
-
消費成功才 ack,失敗不確認可重試
-
? 消息重試機制:
-
RabbitMQ:死信隊列(DLX)+ 延遲交換器 + retry 次數記錄
-
Kafka:設置 retry/backoff,或使用重試 topic
-
Redis:失敗記錄進失敗隊列或重入主隊列,配合重試次數標記
? 11.4 如何處理消息重復消費問題?冪等性的實現方法?
? 產生原因:
-
消息重試或網絡問題,導致同一消息被多次消費
? 冪等處理策略:
-
唯一標識:每條消息帶唯一
msgId
,消費記錄保存到 Redis / DB -
操作前去重判斷:如
INSERT ... ON DUPLICATE KEY UPDATE
-
數據庫事務或鎖機制:防止并發寫入重復
-
消費端去重隊列:維護消費記錄(如 Redis Set)
? 11.5 如何做異步任務處理?Node.js 中有哪些隊列庫?
? 實現方式:
-
使用消息隊列發送異步任務,由專門的 worker 進程消費
? 常用隊列庫:
庫名 | 特點 |
---|---|
BullMQ | 基于 Redis,支持任務調度/重試/延遲/優先級 |
Bee-Queue | 簡潔快速,適合中小項目 |
Agenda | MongoDB 支持,適用于定時任務 |
Kue | 老牌 Redis 隊列,但維護較少 |
Bree | 基于原生 worker_threads 和 cron |
推薦 BullMQ(配合 Redis Stream 實現可靠任務隊列)
? 11.6 MQ 如何處理消息堆積問題?如何限速?
? 消息堆積原因:
-
消費端消費慢 / 宕機
-
消息生產過快
-
任務執行時間長
? 解決方式:
-
增加消費者并發(水平擴展 Worker)
-
限速控制生產端(令牌桶算法、漏桶算法)
-
異步分批處理任務(批量消費)
-
設置 TTL 和死信隊列:避免無效消息堆積
-
流量預估 + 指標報警(Prometheus + Grafana)
? 11.7 什么是發布/訂閱模型?和點對點模型的區別?
模型 | 描述 | 例子 |
---|---|---|
發布/訂閱(Pub/Sub) | 一個生產者 → 多個訂閱者都能收到消息 | Redis Pub/Sub,Kafka Topic |
點對點(P2P) | 每條消息只有一個消費者能收到 | RabbitMQ work queue 模式 |
Pub/Sub 更適合廣播場景(如發系統通知),點對點適合任務隊列場景(如視頻轉碼)
? 11.8 如何實現延遲消息、定時消息發送?
? 常見實現方式:
工具 | 實現方式 |
---|---|
RabbitMQ | 使用延遲插件 rabbitmq_delayed_message_exchange 或 TTL + 死信隊列 |
Kafka | 無原生支持,可通過延遲 topic + 輪詢實現 |
Redis | ZSET + 時間戳排序(如 BullMQ 的延遲任務) |
BullMQ | job.delay() 支持毫秒級延遲 |
? 11.9 如何監控消息隊列的健康狀態和消費情況?
? RabbitMQ:
-
管理面板(Web UI)
-
隊列堆積長度、連接數、消費速率
-
配合 Prometheus + Grafana 面板
? Kafka:
-
Kafka Manager、Kafka UI
-
消費組 lag(滯后)監控
-
消息發送/消費 TPS
? Redis:
-
RedisInsight、命令行查看 stream length
-
BullMQ 提供 UI 面板(bull-board、arena)
? 11.10 NestJS 如何使用 RabbitMQ / Kafka?使用過 @EventPattern() 嗎?
NestJS 提供微服務模塊,支持多種 MQ 協議。
? 使用方式(以 RabbitMQ 為例):
// main.ts
const app = await NestFactory.createMicroservice(AppModule, {transport: Transport.RMQ,options: {urls: ['amqp://localhost:5672'],queue: 'tasks_queue',queueOptions: { durable: true },},
});
await app.listen();
? 消費消息(使用 @EventPattern):
import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices';@Controller()
export class TaskConsumer {@EventPattern('user.created')handleUserCreated(@Payload() data: any, @Ctx() context: RmqContext) {const channel = context.getChannelRef();const originalMsg = context.getMessage();console.log('Received user event:', data);// ack 消息channel.ack(originalMsg);}
}
@EventPattern()
用于監聽 MQ 的某個 routing key 或 topic 事件,適用于異步事件處理。
? 總結表格
編號 | 關鍵知識點 | 核心點 |
---|---|---|
11.1 | 為什么使用 MQ | 解耦、異步、削峰填谷 |
11.2 | MQ 類型對比 | RabbitMQ / Kafka / Redis Stream 區別 |
11.3 | 消息不丟 + 重試 | ACK + 持久化 + 死信隊列 |
11.4 | 消息冪等性 | msgId 去重、Redis 標記 |
11.5 | 異步任務庫 | BullMQ / Bee-Queue / Agenda |
11.6 | 消息堆積限速 | 擴容 + 限流算法 + TTL |
11.7 | Pub/Sub vs P2P | 廣播 vs 分發 |
11.8 | 延遲消息 | TTL + 死信 + ZSET + delay job |
11.9 | 隊列監控工具 | Prometheus / RabbitMQ UI / Kafka Manager |
11.10 | NestJS 消費 MQ | @EventPattern + Transport 設置 |