模型服務示例:實時語音轉文本服務
本示例展示一個支持雙協議(WebSocket流式接口+HTTP同步接口)的語音轉文本模型服務,并提供將WebSocket接口封裝為HTTP接口的代碼實現。
一、服務架構設計
二、WebSocket流式接口實現(Python)
使用FastAPI實現流式語音識別服務:
# websocket_server.py
from fastapi import FastAPI, WebSocket
import asyncioapp = FastAPI()async def process_audio_stream(bytes_data: bytes) -> str:# 調用語音識別模型(示例用偽代碼)return "識別文本片段"@app.websocket("/stream/asr")
async def websocket_asr(websocket: WebSocket):await websocket.accept()try:while True:audio_chunk = await websocket.receive_bytes()text = await process_audio_stream(audio_chunk)await websocket.send_text(text)except WebSocketDisconnect:print("客戶端斷開連接")
技術特點:
? 支持分片音頻流實時識別
? 全雙工通信降低延遲
? 自動處理連接斷開
三、HTTP同步接口實現
# http_server.py
from fastapi import FastAPI, File, UploadFile
import httpxapp = FastAPI()@app.post("/api/asr")
async def http_asr(file: UploadFile = File(...)):# 調用內部WebSocket服務(后文實現封裝)async with httpx.AsyncClient() as client:result = await client.post("http://adapter:8000/adapt/asr", files={"file": (file.filename, await file.read())})return result.json()
四、WebSocket轉HTTP適配器實現
方案1:異步代理模式(推薦)
# adapter.py
from fastapi import FastAPI, UploadFile, File
import websockets
import asyncio
import uuidapp = FastAPI()async def websocket_client(audio_data: bytes):async with websockets.connect("ws://localhost:8000/stream/asr") as ws:# 分片發送音頻數據chunk_size = 1024for i in range(0, len(audio_data), chunk_size):await ws.send(audio_data[i:i+chunk_size])await asyncio.sleep(0.01) # 模擬流式傳輸# 接收最終結果final_result = []while True:try:result = await asyncio.wait_for(ws.recv(), timeout=1.0)final_result.append(result)except (asyncio.TimeoutError, websockets.ConnectionClosed):breakreturn "".join(final_result)@app.post("/adapt/asr")
async def adapt_http_to_ws(file: UploadFile = File(...)):audio_data = await file.read()return {"text": await websocket_client(audio_data)}
方案2:消息隊列橋接
# 使用Redis Stream實現
import redis
r = redis.Redis()async def process_task(file_data: bytes):task_id = str(uuid.uuid4())# 將任務放入隊列r.xadd("asr_tasks", {task_id: file_data})# 等待結果while True:result = r.get(f"result:{task_id}")if result:return result.decode()await asyncio.sleep(0.1)@app.post("/queue/asr")
async def queue_adapter(file: UploadFile = File(...)):return {"text": await process_task(await file.read())}
五、協議轉換關鍵技術點
-
數據分片處理
? HTTP接口接收完整文件后自動切分為WebSocket流式分片
? 設置合理的數據塊大小(建議1-4KB) -
超時控制
# 設置10秒超時 async with async_timeout.timeout(10):return await websocket_client(data)
-
錯誤重試機制
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5)) async def safe_websocket_call():# 包含心跳檢測的穩定連接
-
協議頭轉換
# 攜帶HTTP認證頭到WebSocket headers = {"Authorization": request.headers.get("Authorization")} async with websockets.connect(ws_url, extra_headers=headers) as ws:# ...
六、性能對比
指標 | WebSocket流式接口 | HTTP封裝接口 |
---|---|---|
延遲 | 200-500ms | 1-2s |
吞吐量 | 1000 req/s | 300 req/s |
CPU占用 | 較高(持續連接) | 較低(短連接) |
適用場景 | 實時語音/視頻流 | 文件上傳/短文本 |
開發復雜度 | 需要處理連接狀態 | 簡單請求響應模型 |
七、部署建議
-
容器化配置
# Dockerfile FROM python:3.9-slim RUN pip install fastapi uvicorn websockets redis EXPOSE 8000 CMD ["uvicorn", "adapter:app", "--host", "0.0.0.0"]
-
負載均衡策略
# Nginx配置 upstream asr_servers {server ws1:8000;server ws2:8000;keepalive 10; # 保持WebSocket長連接 }
-
監控指標
? WebSocket連接存活時間
? HTTP請求成功率
? 音頻流分片處理延遲
以上實現完整支持兩種協議的混合調用模式,開發者可根據實際場景選擇適配方案。如需測試完整代碼,建議參考WebSocket官方測試方法建立端到端驗證流程。