要實現“邊聽邊想、邊說”,核心是把整條鏈路做成全雙工、分片流式、可中斷的流水線:
ASR 連續吐字 →(短緩沖)→ LLM 連續出 token(可搶斷)→ TTS 連續合成并播放(可打斷/續播)。
下面給你一份可落地的技術設計清單 + 關鍵參數 + 事件流示例 + 最小骨架。
?
- 架構總覽(低時延流水線)
I/O 通道(推薦)
? 端到端:WebSocket / WebRTC(瀏覽器端)
? 服務間:FastMCP 2.0(工具/模型調用),內部可選 NATS 作為事件總線
? 控制流:MCP 事件(session、tool.call、response.delta…)
? 數據流:音頻與文本分道并行(audio.* 與 text.* 兩條流)
模塊流水線(并行且可搶斷)
1. VAD/端點檢測(前端或邊緣):10–30ms 幀
2. 流式 ASR(CTC/RNNT):每 50–200ms 輸出 partial 文本
3. 意圖/路由器:對 partial 增量判斷(關鍵詞/小模型)
4. LLM 推理:
? 接受 prefix 的增量 prompt
? 流式 token 輸出(server-side generation with streaming)
? 支持搶斷(barge-in)與提前輸出“開場 token”
? 可加推測式解碼(spec decoding)與prefix 緩存
5. 流式 TTS:
? 子句粒度 150–300ms 合成/播放塊
? 回放端抖動緩沖 100–200ms + 無損打斷(丟棄未播塊)
6. 回聲消除/自回授(AEC + Ducking):邊播邊錄,避免自激
?
- 關鍵機制(必須具備)
A. 增量傳遞(partial → commit)
? ASR:持續發送 asr.partial,端點后補發 asr.final(帶穩定時間戳)
? LLM:對 asr.partial 采用 防抖(debounce 80–150ms)+ 最小片段長度(>= 3–6 個字 / 2–4 個詞)
? TTS:只對 llm.delta 的已成句或子句(中文逗號/句號、英文 punctuation)做合成,減少回退
B. 搶斷(barge-in)
? 前端:一旦檢測到用戶講話(VAD=active),立刻:
1. 發送 barge_in.start
2. 暫停播放;3) 服務端取消當前 LLM/TTS(cancel token)
? 服務端:
? LLM/ TTS 監聽 cancel_id,立即停止/丟棄隊尾片段
? 維護會話版本號(gen_id);僅播放/回答最新 gen_id
C. 遲滯控制(latency budget)
? 端到端可感延遲目標 ≤ 300–500ms
? ASR 輸出間隔:50–150ms(多一層 80–150ms 防抖)
? LLM 首 token:150–350ms(可用模板熱身 & prefix cache)
? TTS 首音:120–250ms(短句優先,文本分塊長度 20–40 字)
? 尾音裁剪:TTS 播放塊≤ 300ms,避免“長塊堵塞”
D. 可靠性與回退
? 弱網:WebRTC/HTTP3(QUIC)優先;WebSocket 備用
? 重連:冪等 session_id + offset 恢復
? 丟包/亂序:塊序號(seq),播放端按 seq 重排
? 回聲:前端 AEC,或服務端播放音軌鏡像做自適應回聲抵消
?
- 事件流(FastMCP 2.0 語義示例)
session.created {session_id}
audio.input.delta {pcm_chunk, ts}
audio.input.vad {active=true/false}
asr.partial {text: "我想…", range:{start,end}}
router.intent.partial {name: "qa", conf:0.62}
llm.output.delta {text: "好的,", gen_id:12, seq:1}
tts.output.delta {audio_chunk, gen_id:12, seq:1}
barge_in.start {reason:"user_speaking", gen_id:12}
cancel {target_gen_id:12}
asr.final {text:"我想訂明天早上的票", ...}
llm.output.delta {text:"已經為你查到…", gen_id:13, seq:1}
tts.output.delta {audio_chunk, gen_id:13, seq:1}
要點:所有可播放/可回復內容都帶 gen_id;前端只消費最大 gen_id 的流。
?
- 具體參數建議(起步即用)
? ASR:RNNT/CTC 流式,分幀 20–30ms,合幀 200–400ms 出 partial
? 防抖:ASR→LLM 文本增量 debounce=120ms,最小增量=6–10字
? LLM:
? system prompt 固定模板,prefix-caching
? 首 token 預算 200ms(服務端熱身、就近副本、推測式解碼)
? 工具調用:遇到明顯意圖 并行預取(RAG/DB)
? TTS:
? 句法切分:逗號/頓號/句號;英文用 spaCy/sentencizer
? 合成塊:200–300ms;播放緩沖 120–200ms
? 無點擊音打斷:跨塊零交叉點截斷 + 短淡出(5–15ms)
? 前端音頻:
? 采樣 16k/24k 單聲道;AEC/NS/AGC 開啟;
? 抖動緩沖 120–180ms;播放速率 1.0–1.05 自適應
?
- 最小骨架(Python/asyncio,偽代碼)
說明:示意“邊聽邊想、邊說”的并行與可搶斷。實際對接你的 ASR/LLM/TTS/MCP。
import asyncio
from contextlib import suppressclass GenState:def __init__(self):self.gen_id = 0self.cancel = asyncio.Event()state = GenState()async def asr_stream(audio_in):async for partial in audio_in.partials(): # 50-150ms一段yield {"type":"asr.partial", "text": partial.text}yield {"type":"asr.final", "text": audio_in.final_text()}async def llm_stream(text_iter):async for chunk in llm.generate_stream(text_iter): # 首token~200msif state.cancel.is_set(): breakyield {"type":"llm.delta", "text": chunk}async def tts_stream(text_iter, cur_gen):async for seg in tts.synthesize_segments(text_iter, max_ms=300):if state.gen_id != cur_gen: breakyield {"type":"tts.delta", "audio": seg.audio}async def handle_session(ws):while True:# 1) 接收音頻 → ASR partialasr_iter = asr_stream(ws.audio())# 2) 組裝增量文本(防抖 + 最小增量)text_iter = debounce_and_chunk(asr_iter, debounce_ms=120, min_chars=8)# 3) 新一輪生成:版本號+取消state.gen_id += 1cur_gen = state.gen_idstate.cancel.clear()# 4) 并行LLM與TTSllm_iter = llm_stream(text_iter)tts_iter = tts_stream(sentencize(llm_iter), cur_gen)async def pump_llm():async for m in llm_iter:await ws.send_json(m)async def pump_tts():async for a in tts_iter:await ws.send_audio(a["audio"])# 5) 監聽barge-in:用戶再講話→取消當前生成async def watch_barge_in():async for e in ws.events("barge_in.start"):state.cancel.set()tasks = [asyncio.create_task(p) for p in (pump_llm(), pump_tts(), watch_barge_in())]done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)for t in pending:with suppress(asyncio.CancelledError): t.cancel()
要點:
? 版本號 gen_id + 取消事件確保只播放最新一輪的語音
? ASR→LLM 有防抖+最小增量,減少“回退”
? LLM→TTS 先句法切分再合成,塊更自然;任一處觸發 barge-in 立刻打斷
?
- 體驗優化招式(錦上添花)
? “先答框架,后補細節”:LLM先給 1–2 個開場 token(例如“好的,”/“Okay,”),TTS馬上播;隨后細節邊出邊播
? 并行預取:一旦意圖>閾值,并行啟動 RAG/DB 查詢,LLM側采用工具結果晚綁定
? 短回應策略:長句拆分為短子句播報,每句≤1.5s,讓用戶隨時能插話
? 音量壓制(ducking):用戶說話瞬時,把播放音量降到 10–20%
? 多模態提示:UI 上對“正在思考/說話/可插話”的狀態做清晰指示
?
- 觀測與SLO(必做)
? 指標:ASR_partial_latency、LLM_first_token_latency、TTS_first_packet_latency、BargeIn_to_Silence、Cancel_to_StopPlayback
? 目標:端到端可感延遲 ≤ 400ms、barge-in 靜音 ≤ 120ms、取消到停止播報 ≤ 80ms
?
- FastMCP 2.0 如何串起來
? 統一事件:audio.input.delta、asr.partial/final、llm.delta、tts.delta、barge_in.start、cancel
? 統一工具:ASR/TTS/RAG/DB 都通過 MCP tool schema 暴露;Agent 只認協議與事件
? 統一流控:MCP 層攜帶 gen_id、seq、cancel_id、ts,服務間無感替換底層實現(Qwen→GPT、不同 TTS/ASR)