在現代應用開發中,異步任務處理是一個常見的需求。無論是數據處理、圖像生成,還是復雜的計算任務,異步執行都能顯著提升系統的響應速度和吞吐量。今天,我們將通過一個實際項目,探索如何使用 FastAPI、Celery 和 Redis 構建一個高性能的異步任務引擎。
項目背景
技術棧介紹
- FastAPI:一個現代、高性能的 Web 框架,基于 Python 3.7+ 的異步編程特性構建。它支持自動生成 OpenAPI 文檔和 Swagger UI,能夠快速構建 RESTful API,并且具有極低的延遲和高并發處理能力。
- Celery:一個分布式任務隊列系統,主要用于處理異步任務和定時任務。它支持多種消息傳輸機制,能夠將任務分發到多個工作節點上并行處理,從而提高系統的吞吐量和響應速度。
- Redis:一個高性能的鍵值存儲系統,常用于緩存、消息隊列和分布式鎖等場景。在 Celery 中,Redis 通常作為消息代理(Broker)和結果存儲(Backend),負責任務的分發和結果的持久化。
項目目標
通過 FastAPI、Celery 和 Redis 的結合,構建一個能夠高效處理用戶提交的 Python 代碼的異步任務引擎。用戶可以通過 API 提交代碼,系統會異步執行代碼,并返回任務的執行結果。
項目目錄結構
project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py
代碼功能深度解析
1. main.py
:FastAPI 應用的核心
main.py
是項目的核心入口文件,負責定義 FastAPI 應用的接口和邏輯。
FastAPI 應用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")
這里我們創建了一個 FastAPI 應用,命名為 Async Task API
,版本為 1.0.0
。
自定義 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch
通過 Monkey Patch 的方式,我們自定義了 Swagger UI 的資源加載路徑,使用了國內的 CDN 加速資源,提升文檔加載速度。
全局異常處理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})
我們定義了一個全局異常處理器,捕獲所有未處理的異常,并返回一個包含錯誤信息的 JSON 響應。
HTTP 中間件:計算請求處理時間
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response
這個中間件用于計算每個請求的處理時間,并將處理時間添加到響應頭 X-Process-Time
中,方便調試和性能優化。
創建任務的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})
用戶可以通過 /tasks
接口提交 Python 代碼,代碼會被異步執行。任務的執行結果可以通過 /tasks/{task_id}
接口查詢。
查詢任務結果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)
用戶可以通過 /tasks/{task_id}
接口查詢任務的執行結果和狀態。
2. utils.py
:任務信息獲取工具
utils.py
文件定義了一個工具函數 get_task_info
,用于獲取 Celery 任務的狀態和結果。
def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result
通過 AsyncResult
,我們可以獲取任務的當前狀態(如 PENDING
、SUCCESS
、FAILURE
等)和執行結果。
3. schemas.py
:數據模型定義
schemas.py
文件定義了 Pydantic 模型,用于驗證和序列化請求和響應的數據。
任務請求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None
用戶提交的任務請求包含以下字段:
code
: 任務的 Python 代碼。expires
: 任務的過期時間(可選)。time_limit
: 任務的時間限制(可選)。
任務結果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None
任務的執行結果包含以下字段:
status
: 任務的執行狀態(如success
或failure
)。output
: 任務的標準輸出(可選)。error
: 任務的錯誤輸出(可選)。
任務結果響應模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None
任務的查詢結果包含以下字段:
task_id
: 任務的 ID。task_status
: 任務的狀態(如PENDING
、SUCCESS
等)。task_result
: 任務的執行結果(可選)。
4. app/__init__.py
:Celery 應用初始化
app/__init__.py
文件是 Celery 應用的初始化文件,主要用于配置 Celery 應用和任務的自動發現。
創建 Celery 應用
app = Celery('my_celery_project')
我們創建了一個名為 my_celery_project
的 Celery 應用。
加載配置
app.config_from_object('app.config')
從 app.config
文件中加載 Celery 的配置。
自動發現任務
app.autodiscover_tasks(['app.tasks'])
自動發現 app.tasks
模塊中的任務。
Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化開始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化開始")
定義了 Worker 和 Beat 的初始化函數,分別在 Worker 和 Beat 啟動時執行。
5. app/config.py
:Celery 配置
app/config.py
文件定義了 Celery 的配置。
消息代理和結果存儲
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'
使用 Redis 作為消息代理和結果存儲。
任務結果過期時間
result_expires = 3600
任務結果在 Redis 中保存 1 小時后過期。
序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
使用 JSON 作為任務和結果的序列化格式。
時區配置
timezone = 'Asia/Shanghai'
enable_utc = True
設置時區為 Asia/Shanghai
,并啟用 UTC 時間。
6. app/tasks/tasks.py
:任務執行邏輯
app/tasks/tasks.py
文件定義了一個 Celery 任務 execute_python_code
,用于執行用戶提交的 Python 代碼。
@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)
該任務將用戶提交的代碼字符串保存為臨時文件,然后使用 subprocess.run
執行該文件,捕獲標準輸出和錯誤輸出。如果執行成功,返回 success
狀態和標準輸出;如果執行失敗,返回 failure
狀態和錯誤輸出。最后,刪除臨時文件。
部署分析
version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false
在這個 Docker Compose 配置中,我們定義了三個服務:
- fastapi:FastAPI 應用,負責接收用戶請求并分發任務。
- celery-worker:Celery 工作節點,負責執行異步任務。
- celery-flower:Celery 的監控工具,提供任務執行的可視化界面。
- redis:Redis 服務,作為 Celery 的消息代理和結果存儲。
代碼的功能和價值
功能
-
異步任務執行:
- 用戶可以通過
/tasks
接口提交 Python 代碼,代碼會被異步執行。 - 任務的執行結果可以通過
/tasks/{task_id}
接口查詢。
- 用戶可以通過
-
任務狀態管理:
- 任務的狀態(如
PENDING
、SUCCESS
、FAILURE
等)可以通過/tasks/{task_id}
接口查詢。
- 任務的狀態(如
-
高性能和可擴展性:
- 使用 FastAPI 和 Celery 構建的異步任務引擎能夠處理高并發的任務請求。
- Celery 的分布式特性使得系統可以輕松擴展以應對更多的任務。
-
安全性:
- 通過設置
time_limit
和expires
,可以限制任務的執行時間和過期時間,防止惡意代碼的長時間執行。
- 通過設置
-
易用性:
- FastAPI 自動生成的 Swagger UI 使得 API 的使用和調試更加方便。
- Pydantic 模型確保了請求和響應數據的類型安全。
價值
-
高效的任務處理:
- 該系統能夠高效地處理大量異步任務,適用于需要異步執行代碼的場景,如在線代碼執行、數據處理、圖像處理等。
-
可擴展性:
- 通過 Celery 的分布式任務隊列,系統可以輕松擴展以處理更多的任務,適合高并發場景。
-
安全性:
- 通過限制任務的執行時間和過期時間,系統能夠有效防止惡意代碼的濫用。
-
易用性:
- FastAPI 和 Pydantic 的結合使得 API 的開發和維護更加簡單,同時提供了自動生成的文檔和類型檢查。
-
靈活性:
- 系統支持自定義任務的執行邏輯,可以根據業務需求擴展任務類型和功能。
總結
通過 FastAPI、Celery 和 Redis 的結合,我們構建了一個高性能、可擴展的分布式異步任務引擎。它能夠高效地處理用戶提交的 Python 代碼,并提供任務狀態查詢功能。該系統適用于需要異步執行代碼的場景,具有高效、安全、易用和靈活的特點。
無論是構建一個在線代碼執行平臺,還是處理復雜的計算任務,這個項目都為你提供了一個強大的基礎。希望這篇文章能為你帶來啟發,讓你在異步任務處理的道路上走得更遠!
附圖
發送任務
查詢結果