??大家好,我是 展菲,目前在上市企業從事人工智能項目研發管理工作,平時熱衷于分享各種編程領域的軟硬技能知識以及前沿技術,包括iOS、前端、Harmony OS、Java、Python等方向。在移動端開發、鴻蒙開發、物聯網、嵌入式、云原生、開源等領域有深厚造詣。
圖書作者:《ESP32-C3 物聯網工程開發實戰》
圖書作者:《SwiftUI 入門,進階與實戰》
超級個體:COC上海社區主理人
特約講師:大學講師,谷歌亞馬遜分享嘉賓
科技博主:華為HDE/HDG
我的博客內容涵蓋廣泛,主要分享技術教程、Bug解決方案、開發工具使用、前沿科技資訊、產品評測與使用體驗。我特別關注云服務產品評測、AI 產品對比、開發板性能測試以及技術報告,同時也會提供產品優缺點分析、橫向對比,并分享技術沙龍與行業大會的參會體驗。我的目標是為讀者提供有深度、有實用價值的技術洞察與分析。
展菲:您的前沿技術領航員
👋 大家好,我是展菲!
📱 全網搜索“展菲”,即可縱覽我在各大平臺的知識足跡。
📣 公眾號“Swift社區”,每周定時推送干貨滿滿的技術長文,從新興框架的剖析到運維實戰的復盤,助您技術進階之路暢通無阻。
💬 微信端添加好友“fzhanfei”,與我直接交流,不管是項目瓶頸的求助,還是行業趨勢的探討,隨時暢所欲言。
📅 最新動態:2025 年 3 月 17 日
快來加入技術社區,一起挖掘技術的無限潛能,攜手邁向數字化新征程!
文章目錄
- 摘要
- 引言
- Dynamo 推理的關鍵模塊
- 計算面:引擎與并行
- 服務面:模型服務與動態批處理
- 調度面:Dynamo 風格的連續批處理
- 存儲面:KV Cache 管理
- 觀測面:可視化與自愈
- Dynamo 推理架構解析
- Batch 合并策略:怎么既不拖慢人,又讓 GPU 忙起來
- Token 流式輸出:既要快,又要穩定
- KV Cache 管理:讓顯存一直夠用
- 最小可運行的 Dynamo 風格推理原型
- 代碼講解(抓要點就好)
- 把 Demo 搬進生產:Triton 動態批處理與多實例
- Triton 動態批處理配置示例
- Batch 合并策略、流式輸出與 KV Cache
- Batch 合并策略的取舍
- Token 級流式輸出
- KV Cache 管理的小技巧
- 把它用在真實業務里
- 場景一:聊天問答 API 網關
- 場景二:RAG 檢索問答
- 場景三:多租戶 SLA 與限流
- QA 環節
- 總結
摘要
大模型上到生產之后,最先撞墻的往往不是“精度”,而是“吞吐”和“時延”。聊天問答、RAG 檢索問答、智能客服這類在線場景,并發高、延遲敏感,常規“一次性 batch + 逐條生成”的做法很快就遇到性能瓶頸:GPU 吃不滿、排隊時間長、波動大。本文聚焦一套基于 NVIDIA 生態的“Dynamo 風格”分布式 LLM 推理架構(文中簡稱 Dynamo 架構):核心是連續批處理(continuous batching / dynamic batching)、Prefill/Decode 分離、Token 級流式輸出和KV Cache 有效管理,并結合多卡通信(NCCL)、TensorRT-LLM、Triton Inference Server(或 NIM)的工程化能力,給出從原理到可運行 Demo 的完整路徑。
引言
現在主流推理框架都在往兩個方向發力:
一是單卡效率,靠算子融合、CUDA Graph、FP8/INT8、Paged KV 等技術把單次 kernel 做大做滿;
二是調度層效率,靠更聰明的批合并、流式解碼、緩存復用讓“GPU 忙起來”。
實際落地時,大家經常遇到這些痛點:
- 吞吐/延遲的拉扯:批大一點吞吐好了,但最慢的那個請求拖著整批人一起慢;批小一點時延低了,但 GPU 忙閑不均。
- Prompt 長度差異大:Prefill 階段(吃 prompt)是長序列 GEMM,Decode 階段(逐 token)是短序列/單步計算,混一起做常常低效。
- KV Cache 爆內存:并發高的時候,KV Cache 占滿顯存;請求又長又多時還會碎片化,OOM 很常見。
- 在線流式輸出:用戶想“邊生成邊看”,但你要一邊發 token,一邊還能繼續合批,工程上不太好寫。
Dynamo 架構就是在這堆現實問題上“摳細節”:分階段合批、持續注入新請求、Token 級別出結果、按頁管理 KV。下面我們把它拆開說,并給出一個能直接跑起來的最小可用 Demo(CPU/GPU 都能跑),幫你把核心機制吃透,再對接到 Triton / TensorRT-LLM 時會順手很多。
Dynamo 推理的關鍵模塊
計算面:引擎與并行
- 引擎層用 TensorRT-LLM(或 PyTorch + CUDA Graph)承載模型執行;
- 結合 TP(張量并行)/PP(流水線并行) 和 NCCL 做多卡擴展;
- Decode 路徑啟用 Paged KV、Flash Attention、FP8/INT8 等優化。
服務面:模型服務與動態批處理
- 用 Triton Inference Server(或 NVIDIA NIM 微服務)做在線服務;
- 啟用 dynamic batching / decoupled transaction policy,讓請求在毫秒級延遲預算內合批。
調度面:Dynamo 風格的連續批處理
- 請求進入“接入隊列”后不必等整批完成,在 token 邊界持續并入;
- Prefill 與 Decode 分離:新請求先做 Prefill(長序列計算),再加入 Decode 批次(短序列 token-by-token)。
存儲面:KV Cache 管理
- 按頁管理(Paged KV),回收和復用更容易;
- 配額與水位:限制單租戶最大并發、觸發降級或反壓;
- 冷熱分層:長上下文或“掛起請求”的 KV 優先換出。
觀測面:可視化與自愈
- 關鍵指標:GPU 使用率、合批效率、Prefill/Decode 時間、KV 命中率/占用、p95/p99 時延;
- 超閾值觸發自動降采樣、降精度、切換更激進的合批策略。
Dynamo 推理架構解析
Batch 合并策略:怎么既不拖慢人,又讓 GPU 忙起來
一個好用的合批器至少考慮這幾條:
- 時間上限:最大等待時間,比如 1~3ms,過時就直接發,保證尾時延。
- 容量上限:
max_batch_size
、max_tokens_per_batch
雙限同時生效,避免單批太重。 - Prefill / Decode 拆分:Prefill 先按Prompt token 總量控重;Decode 按活躍會話數控重。
- 公平性:優先級隊列(SLA),避免長請求餓死短請求,或多租戶之間互相影響。
- 可搶占:Decode 每一步都是天然切片點,可以在 token 邊界調度,讓新請求盡快插隊進入 Prefill。
Token 流式輸出:既要快,又要穩定
- 解碼步進:每次只向前生成一個 token(或小于等于 N 個),立刻把該 token 推給客戶端;
- 解耦 I/O:推流線程和計算線程分離,避免阻塞合批;
- 回壓控制:客戶端慢時不要拖死計算,必要時丟棄部分中間增量,或聚合為“句子級片段”再發。
KV Cache 管理:讓顯存一直夠用
- 按頁管理:每個序列的 KV 占用被切成固定大小的 page,釋放與復用都以 page 為單位;
- 配額&回收:超出配額的會話拒絕接入、掛起或降級;
- 前綴復用:命中相同系統提示或檢索段時,KV 前綴可復用,Prefill 直接省掉一大截。
最小可運行的 Dynamo 風格推理原型
下面這段 Python 代碼用 HuggingFace 的 transformers
跑一個可連續合批 + 流式輸出 + KV 復用的最小原型。為了容易跑通,默認用 CPU + distilgpt2
,也支持 GPU(裝好 PyTorch CUDA 即可)。
運行前安裝依賴:
pip install torch transformers
# file: toy_dynamo_engine.py
import time
import threading
import queue
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Anyimport torch
from transformers import AutoModelForCausalLM, AutoTokenizer# --------- 配置 ----------
MODEL_NAME = "distilgpt2" # 體積小,CPU 也能跑
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"MAX_BATCH_DECODE = 16 # 每步 decode 合批的最大會話數
MAX_PREFILL_TOKENS = 8192 # 單次 prefill 的總 token 上限(防止過大序列)
MAX_QUEUE_DELAY_MS = 3 # 合批最大等待時間
MAX_NEW_TOKENS = 64 # 每個請求最多生成多少 token# --------- 數據結構 ----------
@dataclass
class Request:req_id: intprompt: strmax_new_tokens: int = MAX_NEW_TOKENScreated_at: float = field(default_factory=time.time)output_tokens: List[int] = field(default_factory=list)done: bool = False# 由調度器填充input_ids: Optional[torch.Tensor] = Nonepast_key_values: Optional[Any] = Nonelast_token: Optional[torch.Tensor] = None# --------- 引擎 ----------
class ToyDynamoEngine:def __init__(self):self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)self.model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE)self.model.eval()if self.tokenizer.pad_token is None:self.tokenizer.pad_token = self.tokenizer.eos_token@torch.no_grad()def prefill(self, reqs: List[Request]):""" 對新來的請求做 prefill,得到初始 past_key_values 和第一個解碼 token """inputs = self.tokenizer([r.prompt for r in reqs], return_tensors="pt", padding=True).to(DEVICE)# 控制總 token,避免超大 batchtotal_tokens = inputs["input_ids"].numel()if total_tokens > MAX_PREFILL_TOKENS:raise RuntimeError(f"Prefill tokens {total_tokens} exceed limit {MAX_PREFILL_TOKENS}")outputs = self.model(**inputs, use_cache=True)# 取每個序列最后一個位置的 logits,采樣一個 token 作為第一步輸出last_logits = outputs.logits[:, -1, :]next_tokens = torch.argmax(last_logits, dim=-1) # 為簡單演示,用 argmax;生產建議用采樣或溫度控制# 更新請求狀態for i, r in enumerate(reqs):r.input_ids = inputs["input_ids"][i:i+1] # 保存原始 prompt(可選)r.past_key_values = outputs.past_key_valuesr.last_token = next_tokens[i:i+1].unsqueeze(0) # [1,1]r.output_tokens.append(int(next_tokens[i].item()))return reqs@torch.no_grad()def decode_step(self, active: List[Request]):""" 對活躍會話做一步 decode:輸入 last_token + past,輸出下一個 token """if not active:return# 合批 last_tokeninput_ids = torch.cat([r.last_token for r in active], dim=0).to(DEVICE) # [B,1]past = active[0].past_key_values # 這里簡單起見假設結構一致outputs = self.model(input_ids=input_ids, past_key_values=past, use_cache=True)logits = outputs.logits[:, -1, :]next_tokens = torch.argmax(logits, dim=-1)# 更新每個請求的狀態for i, r in enumerate(active):r.past_key_values = outputs.past_key_valuesr.last_token = next_tokens[i:i+1].unsqueeze(0) # [1,1]r.output_tokens.append(int(next_tokens[i].item()))if len(r.output_tokens) >= r.max_new_tokens or int(next_tokens[i].item()) == self.tokenizer.eos_token_id:r.done = Truedef decode_text(self, token_ids: List[int]) -> str:return self.tokenizer.decode(token_ids, skip_special_tokens=True)# --------- 調度器(連續批處理) ----------
class DynamoScheduler:def __init__(self, engine: ToyDynamoEngine):self.engine = engineself.waiting_q: "queue.Queue[Request]" = queue.Queue()self.active: List[Request] = []self.lock = threading.Lock()self.stop_flag = Falsedef submit(self, req: Request):self.waiting_q.put(req)def run_forever(self):""" 主循環:小延遲等一會兒,聚合 prefill;隨后進入 decode 步進,期間持續接納新請求 """while not self.stop_flag:# 1) 聚合 prefillnewcomers = self._gather_newcomers(MAX_QUEUE_DELAY_MS / 1000)if newcomers:try:self.engine.prefill(newcomers)with self.lock:self.active.extend(newcomers)except Exception as e:for r in newcomers:r.done = Trueprint("Prefill error:", e)# 2) 一步 decodeactives = [r for r in self.active if not r.done]if not actives and self.waiting_q.empty():time.sleep(0.001)continue# 控制 decode 批大小step_batch = actives[:MAX_BATCH_DECODE]if step_batch:self.engine.decode_step(step_batch)# 3) 輸出增量(模擬流式)self._flush_streaming(step_batch)# 4) 清理已完成請求with self.lock:self.active = [r for r in self.active if not r.done]def _gather_newcomers(self, wait_seconds: float) -> List[Request]:""" 在一個很短的時間窗內收集新請求,做 prefill 批 """start = time.time()newcomers = []while time.time() - start < wait_seconds:try:r = self.waiting_q.get_nowait()newcomers.append(r)except queue.Empty:time.sleep(0.001)return newcomersdef _flush_streaming(self, batch: List[Request]):""" 這里簡單打印每一步的增量,你可以改成 WebSocket/SSE 推流 """for r in batch:if r.output_tokens:text = self.engine.decode_text([r.output_tokens[-1]])print(f"[req#{r.req_id}] +{repr(text)}", flush=True)# --------- 演示入口 ----------
def demo():eng = ToyDynamoEngine()sch = DynamoScheduler(eng)t = threading.Thread(target=sch.run_forever, daemon=True)t.start()# 模擬高并發提交prompts = ["Explain the significance of GPU memory bandwidth in LLM inference.","Write a short poem about distributed systems.","What is continuous batching and why does it help?","Summarize the benefits of KV cache paging in two sentences.",]reqs: Dict[int, Request] = {}for i, p in enumerate(prompts, 1):r = Request(req_id=i, prompt=p, max_new_tokens=40)reqs[i] = rsch.submit(r)# 等待全部完成while any(not r.done for r in reqs.values()):time.sleep(0.05)# 匯總輸出for i, r in reqs.items():whole_text = eng.decode_text(r.output_tokens)print(f"\n=== Request #{i} Final ===\n{whole_text}\n")sch.stop_flag = Truet.join(timeout=1)if __name__ == "__main__":demo()
代碼講解(抓要點就好)
- Prefill:把新請求在一個很短的時間窗(默認 3ms)內聚合起來,一次性做長序列前向,得到
past_key_values
和第一個 token。 - Decode:把活躍會話的
last_token
合起來做一步解碼,拿到下一個 token 并更新 KV。 - 連續并入:主循環每一輪都嘗試把“剛剛到達的新請求”做 prefill,然后繼續 decode,不會等整批結束。
- 流式輸出:每步 decode 后把本步新 token 打印出來,實際工程里換成 WebSocket/SSE 推給客戶端即可。
- KV 管理:示例里把
past_key_values
掛在 Request 上。真實系統會做 page 化和內存池復用,這里保留了思路。
把 Demo 搬進生產:Triton 動態批處理與多實例
Triton 動態批處理配置示例
如果你用 Triton Inference Server 托管 TensorRT-LLM 引擎,可以在 config.pbtxt
開啟動態批處理與解耦響應,達到和上面 Demo 相同的調度思路:
name: "llm-trtllm"
backend: "tensorrtllm"
max_batch_size: 128instance_group [{ kind: KIND_GPU count: 1 gpus: [0] },{ kind: KIND_GPU count: 1 gpus: [1] }
]dynamic_batching {preferred_batch_size: [ 4, 8, 16, 32 ]max_queue_delay_microseconds: 3000 # 3ms 等待窗口
}# 解耦請求-響應,便于流式輸出
model_transaction_policy { decoupled: true }# 打開內置緩存(如啟用可用)
response_cache { enable: true }
要點:
max_queue_delay_microseconds
就是我們的短窗合批;decoupled: true
允許多次響應,對應 token 級流式;- 多
instance_group
可以把一個模型同時開多個副本,利用大卡/MIG 做隔離和擴容。
Batch 合并策略、流式輸出與 KV Cache
Batch 合并策略的取舍
- 吞吐優先:把
preferred_batch_size
設大一些,max_queue_delay
放寬到 4~6ms,能大幅提升 GPU 利用; - 時延優先:把
preferred_batch_size
收緊,max_queue_delay
控制在 1~2ms,犧牲部分吞吐; - 混合策略:對不同 SLA 的租戶使用不同隊列,不同的
max_queue_delay
,高優先級走低延遲策略。
Token 級流式輸出
- Triton/NIM 用解耦事務,一條請求可以多次
send
; - 如果你自己寫服務,SSE/WebSocket 都行。關鍵是I/O 解耦:推流線程不要阻塞 decode。
KV Cache 管理的小技巧
- 按頁管理:統一 page 大小(比如 2MB),回收/復用都用 page 計數;
- 復用前綴:RAG/系統提示經常重復,前綴 KV 命中能省掉一大段 Prefill;
- 閾值&回收:高水位觸發回收策略:暫停超長請求、降低并發、切浮點精度。
把它用在真實業務里
下面給 3 個常見場景,順便給出必要的代碼/配置片段。
場景一:聊天問答 API 網關
需求:超高 QPS,延遲敏感,用戶要“邊打字邊看結果”。
做法:
- API 網關把請求丟進短窗隊列,每 1~3ms 聚合一次做 Prefill;
- 進入 Decode 后每步出 token,SSE 推給前端;
- Triton 配置
decoupled: true
,業務代碼里每步send()
。
SSE 端點偽代碼(Python/FastAPI):
from fastapi import FastAPI
from fastapi.responses import StreamingResponseapp = FastAPI()@app.post("/chat/stream")
def chat_stream(prompt: str):def token_stream():req = Request(req_id=int(time.time()*1000)%100000, prompt=prompt, max_new_tokens=128)scheduler.submit(req)last_len = 0while not req.done:if len(req.output_tokens) > last_len:text = engine.decode_text(req.output_tokens[last_len:])last_len = len(req.output_tokens)yield f"data: {text}\n\n"time.sleep(0.01)yield "data: [DONE]\n\n"return StreamingResponse(token_stream(), media_type="text/event-stream")
場景二:RAG 檢索問答
痛點:Prompt 很長(檢索段拼進去),Prefill 特別重。
做法:
- 把“系統提示 + 領域說明 + 模板”作為前綴 KV緩存,檢索段變化時只拼后綴;
- 命中前綴直接跳過大段 Prefill;
- 合批時對 Prefill 設
MAX_PREFILL_TOKENS
,避免偶發超長請求拖全局。
前綴 KV 復用思路(示意):
# 假設 prefix_kv_map 緩存了若干常見前綴的 KV
prefix = build_prefix(system_prompt, domain_hint, template) # 只包括穩定部分
key = hash(prefix)
if key in prefix_kv_map:r.past_key_values = clone(prefix_kv_map[key]) # 直接拿到 KVr.input_ids = tokenizer(rag_snippets + question) # 只對新增部分做 prefill
else:# 第一次命中:先把 prefix 做一遍 prefill,然后緩存 KVprefix_req = Request(req_id=-1, prompt=prefix, max_new_tokens=1)engine.prefill([prefix_req])prefix_kv_map[key] = clone(prefix_req.past_key_values)
場景三:多租戶 SLA 與限流
痛點:不同租戶對延遲/吞吐訴求不一樣,容易互相影響。
做法:
- 維護多優先級隊列,高優先級
max_queue_delay
更小; - 租戶配額:限制“活躍會話數”和“總 KV page 數”;
- 過載時對低優先級租戶降級(減小
max_new_tokens
、降低溫度/核采樣)。
多優先級合批偽代碼:
def gather_newcomers():high, normal = [], []start = time.time()while time.time() - start < MAX_QUEUE_DELAY_MS/1000:r = try_pop_high_or_normal_queue()if not r:time.sleep(0.001); continue(high if r.priority == "high" else normal).append(r)# 先吃高優先級,空余再放普通return (high + normal)[:MAX_BATCH_DECODE]
QA 環節
Q1:Prefill/Decode 為什么要分開?
Prefill 是長序列 GEMM,Decode 是短序列、token 級前向。把兩種負載硬拼一起,顯卡經常“忽忙忽閑”。分開后,你可以對 Prefill 限制總 token,對 Decode 限制活躍會話數,各自吃滿各自的“甜點區”。
Q2:連續批處理會不會餓死長請求?
會,如果不做公平性。用優先級隊列+輪轉,從不同桶里按比例取請求進入 decode 批。長請求也能穩定推進。
Q3:KV Cache 是不是一直增?
不是。按頁管理后,請求結束立刻歸還 page。高水位觸發回收策略,先清理低優先級/掛起的會話。
Q4:流式輸出影響吞吐嗎?
I/O 線程要解耦,輸出聚合到句子級再發,或者使用零拷貝隊列,就不會拖慢計算。Triton 的 decoupled transaction 可以放心用。
Q5:TensorRT-LLM 和 PyTorch 選擇哪個?
追求極致延遲/吞吐、模型相對穩定用 TensorRT-LLM;需要快速迭代/頻繁改模型,用 PyTorch + CUDA Graph 也能做得很強。生產常見做法是兩者并行:驗證在 PyTorch,穩定后下沉到 TensorRT-LLM。
Q6:多卡如何劃分并行?
長上下文/大模型先考慮 TP(張量并行),很長序列或需要拉長流水線時再加 PP。跨機房的通信延遲會顯著影響效果,盡量同機架內聚合。
總結
把 LLM 真正跑上線,關鍵不是“一個模型多準”,而是“一個 GPU 多忙、一個請求多穩”。Dynamo 風格的分布式推理架構抓住了生產里的三件事:連續批處理把吞吐提起來;流式輸出把體驗做順;KV Cache 管理把顯存穩住。
本文先用一個能跑起來的最小 Demo 把思路講清楚,再落到 Triton/TensorRT-LLM 的配置與實踐,最后結合聊天問答、RAG、多租戶三類常見場景給出實操建議。你可以先直接跑 Demo 感受“連續批”的節奏,再把批策略、流式通道和 KV 管理逐步換成你線上框架的等價能力。這樣推進,既能盡快見到收益,也能把復雜度壓在你可控的范圍里。