說明:本文僅展示架構思路與安全片段,所有敏感字段已用占位符;不含可直接復刻的生產細節。數據與接口均為演示/虛擬。
0. 背景與目標
長耗時/不確定接口(如對接第三方機器人平臺)的同步阻塞,容易造成請求堆積與峰值雪崩。本次改造目標:
接口 202 Accepted + 任務輪詢,釋放前端/網關壓力
消息隊列削峰:生產入隊 → 手動 ack 消費 → 失敗入 DLQ
冪等鍵(
X-Request-Id
)與任務態存儲,防重下發、可追蹤動態配置(示例以配置中心為主),便于多環境切換
結果:寫請求在峰值時平穩可控,消費端有序處理;失敗統一入 DLQ 供排障復盤。
1. 輕量數據流
Client --POST--> Gateway --→ API| ||<--202 + taskId----------||--GET /tasks/{id} 輪詢-->|API --(Produce)--> RabbitMQ(Topic) --→ Consumer(手動 ack) --→ 第三方平臺|成功/失敗|狀態落庫/緩存 <--+失敗 → DLQ
注意:僅示意關鍵節點,隱藏了內部服務名、完整路由與業務字段。
2. 動態配置
# DataId: robot-mq-public.yml
spring:rabbitmq:host: ${ENV_MQ_HOST}port: ${ENV_MQ_PORT:5672}username: ${ENV_MQ_USER}password: ${ENV_MQ_PASS}virtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemq:exchange: robot.task.topic.public # 非生產命名queue: robot.task.q.public # 非生產命名rk: robot.task.dispatch.public # 非生產命名dlx: robot.task.dlx.publicdlq: robot.task.dlq.publicdlrk: "#"async:idem-ttl-seconds: 3600
說明:占位符 ${ENV_*}
僅示意變量化做法;真實值不要寫進文章或倉庫。
3. 關鍵代碼片段
以下為骨架式片段,只展示接口與關鍵點,隱藏了內部實現、異常策略、業務字段映射等細節。
3.1 交換機/隊列與手動 ack
@Configuration
public class MqBasicsConfig {@Value("${mq.exchange}") private String exchange;@Value("${mq.queue}") private String queue;@Value("${mq.rk}") private String rk;@Value("${mq.dlx}") private String dlx;@Value("${mq.dlq}") private String dlq;@Value("${mq.dlrk}") private String dlrk;@Beanpublic Declarables mqDeclarables() {Queue q = QueueBuilder.durable(queue).withArgument("x-dead-letter-exchange", dlx).withArgument("x-dead-letter-routing-key", dlrk).build();TopicExchange biz = ExchangeBuilder.topicExchange(exchange).durable(true).build();Binding b = BindingBuilder.bind(q).to(biz).with(rk);TopicExchange dead = ExchangeBuilder.topicExchange(dlx).durable(true).build();Queue deadQ = QueueBuilder.durable(dlq).build();Binding db = BindingBuilder.bind(deadQ).to(dead).with(dlrk);return new Declarables(biz, q, b, dead, deadQ, db);}@Beanpublic SimpleRabbitListenerContainerFactory manualAckFactory(ConnectionFactory cf,MessageConverter converter) {var f = new SimpleRabbitListenerContainerFactory();f.setConnectionFactory(cf);f.setMessageConverter(converter);f.setAcknowledgeMode(AcknowledgeMode.MANUAL);return f;}
}
3.2 生產者(骨架)
@Component
public class TaskProducer {private final RabbitTemplate tpl;@Value("${mq.exchange}") private String exchange;@Value("${mq.rk}") private String rk;public TaskProducer(RabbitTemplate tpl) { this.tpl = tpl; }/** 僅演示:設置 CorrelationData 綁定 taskId,方便確認回調 */public void send(String taskId, Object payload, @Nullable String requestId) {MessagePostProcessor mpp = msg -> {msg.getMessageProperties().setMessageId(taskId);if (requestId != null) msg.getMessageProperties().setHeader("X-Request-Id", requestId);return msg;};tpl.convertAndSend(exchange, rk, payload, mpp, new CorrelationData(taskId));}
}
3.3 消費者(骨架,手動 ack + 冪等占位)
@Slf4j
@Component
public class TaskConsumer {@RabbitListener(queues = "${mq.queue}", containerFactory = "manualAckFactory")public void onMessage(Message message, Channel channel) throws Exception {String taskId = message.getMessageProperties().getMessageId();String reqId = (String) message.getMessageProperties().getHeaders().get("X-Request-Id");try {// TODO 冪等占位:檢查 reqId / taskId 是否已處理// TODO 業務占位:調用第三方平臺(已脫敏),記錄狀態channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {log.error("consume failed, taskId={}", taskId, ex);// 直接拒絕入 DLQ:公開文章不貼重試/回退策略細節channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}
3.4 接口契約(202 + 輪詢)(骨架)
@RestController
@RequestMapping("/external/async")
public class AsyncController {// POST 接口:只返回 202 + taskId(隱藏業務入參/第三方字段)@PostMapping("/command")public ResponseEntity<Map<String, Object>> postCommand(@RequestHeader(value="X-Request-Id", required=false) String reqId) {String taskId = UUID.randomUUID().toString();// TODO 寫入 PENDING 狀態(緩存/DB),并生產 MQ 消息return ResponseEntity.accepted().body(Map.of("taskId", taskId));}// 輪詢接口:返回 PENDING / DONE / FAILED(隱藏具體結果結構)@GetMapping("/tasks/{taskId}")public Map<String, Object> query(@PathVariable String taskId) {// TODO 讀取存儲的任務態與簡要結果return Map.of("taskId", taskId, "status", "PENDING");}
}
骨架片段僅演示“怎么做”,刻意省略:完整領域模型、重試/退避參數、冪等鍵落盤、第三方錯誤分型、限流灰度策略等生產細節。
4. 測試與驗收要點
壓測入口:僅關注POST→202 延遲與消費者側消費速率,而非業務同步 RT
人為制造失敗,確認DLQ 入列與排障鏈路可達
冪等:同
X-Request-Id
重復請求不導致重復外呼限流:網關/服務內雙層策略命中時,優先保證系統穩定
監控:最少監控入隊速率、堆積深度、消費速率、DLQ 數量
5. 常見坑位
回調未開啟:生產者沒開 confirm/return,消息“丟哪兒了”難定位
死信配置缺失:Nack(false,false) 卻沒有 DLX/DLQ
冪等只在接口層:消費者未做冪等,仍可能二次下游外呼