說明
今天偶然在別的文章里看到這個功能,突然覺得正好。
CeleryWorker已經搭好了,但是我一直想在用戶請求時進行額外的處理會比較影響處理時間,用這個正好可以搭配上。
我設想的一個場景:
- 1 用戶發起請求
- 2 接口中進行關鍵信息的日志記錄 logger.info這種,直接記錄在文本文件里
- 3 影響請求后在后臺進行相關操作:在redis中計數,將數據轉發kafka等
這樣既不會影響請求時間,又可以把需要做的操作,例如在內存中記錄,修改狀態;采樣數據發送做完。
一個比較笨(現在有在用)的方法是在處理時將數據采樣發送到kafka,然后擔心kafka服務出問題,又做了try…except。當然,最壞的情況也還好,因為調用大模型通常都是數秒。
當然,在接口中logging和redis這樣的操作倒是沒關系,因為時間足夠短。某種程度上來說,logging+ logstash可能是更好的方案,redis都還有可能掛。
還有一個相對好一點的方法(準備好了還沒有啟用)。使用WCelery發送任務(甚至可以是復雜任務),并且可以再封裝一層異步(async httpx),這樣也只是多花一個請求時間。
當然這些都不如直接用FastAPI自帶的BackgroundTasks方法,這種服務啟動嵌入的方法應該更可靠。(其實在flask時代,就有before和after request裝飾器)
以下是一個實驗代碼(主要 by deepseek)
server.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
import time
from typing import Optional
import loggingapp = FastAPI()# 配置日志
logging.basicConfig(filename='app.log', level=logging.INFO)# def write_log(message: str):
# """記錄日志到文件(模擬耗時操作)"""
# time.sleep(2) # 模擬耗時操作
# with open("log.txt", mode="a") as f:
# f.write(f"{time.ctime()}: {message}\n")
# logging.info(f"日志已記錄: {message}")import aiofiles # 需要先安裝: pip3 install aiofiles
import asyncioasync def write_log(message: str):"""真正的異步日志寫入"""await asyncio.sleep(2) # 正確使用awaitasync with aiofiles.open("log.txt", mode="a") as f:await f.write(f"{time.ctime()}: {message}\n")logging.info(f"日志已記錄: {message}") # logging默認是同步的
def send_email(to: str, subject: str, body: Optional[str] = None):"""模擬發送郵件(帶錯誤處理)"""try:time.sleep(3) # 模擬網絡延遲with open("email_logs.txt", "a") as f:content = f"""Time: {time.ctime()}To: {to}Subject: {subject}Body: {body or 'No content'}{'-'*30}"""f.write(content)print(f"郵件已發送給 {to}")except Exception as e:logging.error(f"郵件發送失敗: {str(e)}")def cleanup_temp_files():# aa"""模擬清理臨時文件"""time.sleep(1)print("臨時文件清理完成")from pydantic import BaseModelclass RegisterInput(BaseModel):username: stremail: str@app.post("/register")
async def register_user(user_input:RegisterInput , background_tasks: BackgroundTasks):"""用戶注冊接口(演示多個后臺任務)"""if not user_input.email.endswith("@example.com"):raise HTTPException(400, "僅支持 example.com 域名")# 添加多個后臺任務background_tasks.add_task(write_log,f"新用戶注冊: {user_input.username}, 郵箱: {user_input.email}")background_tasks.add_task(send_email,to=user_input.email,subject="歡迎注冊",body=f"尊敬的 {user_input.username},感謝您的注冊!")background_tasks.add_task(cleanup_temp_files)return {"message": "注冊成功","details": "激活郵件和日志記錄正在后臺處理"}@app.get("/stats")
async def get_stats(background_tasks: BackgroundTasks):"""獲取統計信息(演示快速響應+后臺處理)"""background_tasks.add_task(write_log,"用戶查看了統計信息")# 立即返回的簡單數據return {"active_users": 42,"note": "詳細日志正在后臺記錄"}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
test.py
resp = httpx.post('http://127.0.0.1:8000/register', json = {'username':'andy', 'email':'andy@example.com'})
對應產生的幾個文件,如log.txt
Fri Apr 18 23:27:02 2025: 新用戶注冊: andy, 郵箱: andy@example.com
Fri Apr 18 23:28:08 2025: 新用戶注冊: andy, 郵箱: andy@example.com
Fri Apr 18 23:29:52 2025: 新用戶注冊: andy, 郵箱: andy@example.com
Fri Apr 18 23:30:33 2025: 新用戶注冊: andy, 郵箱: andy@example.com
Fri Apr 18 23:39:40 2025: 新用戶注冊: andy001, 郵箱: andy001@example.com
實驗成功,感覺還挺好的。
原文有一些錯誤,說background_tasks只能執行同步任務,事實證明是錯誤的。某種程度上說,異步的才符合FastAPI的特點。
另外,如果有些同步操作時間特別短是可以不用異步的。例如redis操作。
以上對我有用,希望對你也有用。