目錄
一、代碼示例
二、執行說明
(一) 調用任務執行接口
(二) 監控任務進度
實現功能:
- 注冊后臺任務(如:郵件發送、文件處理等異步場景,不影響接口返回)
- 監控后臺任務執行進度(進度條功能)
- 支持根據任務ID查詢對應任務進度
一、代碼示例
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import random
import asynciofrom typing import List, Dict
from fastapi import FastAPI, BackgroundTasks, WebSocketapp = FastAPI()# 用于存儲連接的 WebSocket 實例
connected_websockets: Dict[int, List[WebSocket]] = {}@app.websocket("/ws/{task_id}/")
async def websocket_endpoint(websocket: WebSocket, task_id: int):"""WebSocket路由,用于接收任務進度"""await websocket.accept()connected_websockets.setdefault(task_id, []).append(websocket)try:while True:await websocket.receive_text()except:connected_websockets[task_id].remove(websocket)@app.post("/task/{task_id}/")
async def start_task(background_tasks: BackgroundTasks, task_id: int):"""注冊后臺任務"""background_tasks.add_task(process_task, task_id=task_id)return {"task_id": task_id}async def process_task(task_id):"""處理任務的后臺任務"""progress = 0while progress < 100:await asyncio.sleep(1)progress += random.randint(1, 10)progress = min(progress, 100)for ws in connected_websockets[task_id]:await ws.send_json({"task_id": task_id, "progress": progress})await asyncio.sleep(1)# 啟動應用
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
二、執行說明
(一) 調用任務執行接口
- 啟動服務后,訪問:http://127.0.0.1:8000/docs
POST
請求:http://127.0.0.1:8000/task/1/,指定任務ID為1
(二) 監控任務進度
- 安裝
websocket
請求工具:npm install -g wscat
- 終端輸入
wscat -c ws://127.0.0.1:8000/ws/1/
,監控任務ID為1
的執行進度