流式輸出不懂可看這篇文章:流式輸出:概念、技巧與常見問題
正常情況,如下代碼所示:
async def event_generator():# 先輸出數字1yield "data: 1\n\n"# 然后每隔2秒輸出數字2,共輸出10次for i in range(10):await asyncio.sleep(2) # 等待2秒yield "data: 2\n\n"return StreamingResponse(event_generator(),media_type="text/event-stream",)
一、網絡緩存
在FastAPI中實現Server-Sent Events (SSE)流式響應時,經常遇到響應數據不是實時發送給客戶端,而是累積到一定程度后一次性發送的問題。這主要由以下幾個原因造成:
- 網絡層緩沖機制:網絡協議棧為了提高傳輸效率,會將小的數據包進行緩沖合并,這種現象稱為Nagle算法或TCP延遲確認。
- 應用層緩沖:
Web服務器(如uvicorn)和ASGI應用本身可能對響應內容進行緩沖
Python的異步框架在處理異步生成器時,可能會將多個yield值緩存起來批量處理 - 事件循環調度:
在同一個異步函數中連續調用yield時,事件循環可能不會立即切換到發送數據的協程
需要顯式地讓出控制權,使事件循環有機會處理數據發送
解決方法:
-
事件循環控制權讓出:
await asyncio.sleep(0)會讓當前協程暫停執行,并將控制權交還給事件循環
事件循環會處理其他待處理的任務,包括將已生成的數據發送給客戶端 -
打破緩沖機制:
通過在每次yield后添加await asyncio.sleep(0),強制中斷當前協程的連續執行
這給底層傳輸機制一個機會來刷新緩沖區并發送數據
async def event_generator():# 先輸出數字1yield "data: 1\n\n"# 如果是下面這種就會產生阻塞,需要使用 await asyncio.sleep(0)for event in coze.chat.stream(bot_id=bot_id,user_id=user_id,additional_messages=user_messages,):if event.event == ChatEventType.CONVERSATION_MESSAGE_DELTA:yield f"data: {event.message.content}\n\n"# 強制刷新緩沖區,確保數據立即發送await asyncio.sleep(0)return StreamingResponse(event_generator(),media_type="text/event-stream",)
二、Nginx 緩存
Nginx 作為反向代理時,流式輸出出現問題是很常見的問題。Nginx 默認會緩沖后端的響應,這會導致流式傳輸無法正常工作,主要原因包括:
- Nginx默認啟用緩沖,會等待后端響應完成后再轉發給客戶端
- HTTP/1.0的連接行為可能中斷流式傳輸
- 超時設置過短導致連接被提前關閉
第一步:在返回方法處,添加 headers 參數,如下
return StreamingResponse(generate_stream(agent_id, BOT_ID, file_id, content, background_tasks),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive","X-Accel-Buffering": "no" # 禁用Nginx緩沖})
第二步:如果第一步未解決,則修改Nginx配置文件,添加關鍵配置下面幾行
location /api/ {proxy_pass http://127.0.0.1:8051/;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;# 關鍵配置:禁用緩沖以支持流式傳輸proxy_buffering off;proxy_cache off;proxy_read_timeout 86400s;proxy_send_timeout 86400s;}