構建高性能異步任務引擎:FastAPI + Celery + Redis

在現代應用開發中,異步任務處理是一個常見的需求。無論是數據處理、圖像生成,還是復雜的計算任務,異步執行都能顯著提升系統的響應速度和吞吐量。今天,我們將通過一個實際項目,探索如何使用 FastAPICeleryRedis 構建一個高性能的異步任務引擎。

項目背景

技術棧介紹

  • FastAPI:一個現代、高性能的 Web 框架,基于 Python 3.7+ 的異步編程特性構建。它支持自動生成 OpenAPI 文檔和 Swagger UI,能夠快速構建 RESTful API,并且具有極低的延遲和高并發處理能力。
  • Celery:一個分布式任務隊列系統,主要用于處理異步任務和定時任務。它支持多種消息傳輸機制,能夠將任務分發到多個工作節點上并行處理,從而提高系統的吞吐量和響應速度。
  • Redis:一個高性能的鍵值存儲系統,常用于緩存、消息隊列和分布式鎖等場景。在 Celery 中,Redis 通常作為消息代理(Broker)和結果存儲(Backend),負責任務的分發和結果的持久化。

項目目標

通過 FastAPI、Celery 和 Redis 的結合,構建一個能夠高效處理用戶提交的 Python 代碼的異步任務引擎。用戶可以通過 API 提交代碼,系統會異步執行代碼,并返回任務的執行結果。

項目目錄結構

project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py

代碼功能深度解析

1. main.py:FastAPI 應用的核心

main.py 是項目的核心入口文件,負責定義 FastAPI 應用的接口和邏輯。

FastAPI 應用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")

這里我們創建了一個 FastAPI 應用,命名為 Async Task API,版本為 1.0.0

自定義 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch

通過 Monkey Patch 的方式,我們自定義了 Swagger UI 的資源加載路徑,使用了國內的 CDN 加速資源,提升文檔加載速度。

全局異常處理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})

我們定義了一個全局異常處理器,捕獲所有未處理的異常,并返回一個包含錯誤信息的 JSON 響應。

HTTP 中間件:計算請求處理時間
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response

這個中間件用于計算每個請求的處理時間,并將處理時間添加到響應頭 X-Process-Time 中,方便調試和性能優化。

創建任務的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})

用戶可以通過 /tasks 接口提交 Python 代碼,代碼會被異步執行。任務的執行結果可以通過 /tasks/{task_id} 接口查詢。

查詢任務結果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)

用戶可以通過 /tasks/{task_id} 接口查詢任務的執行結果和狀態。

2. utils.py:任務信息獲取工具

utils.py 文件定義了一個工具函數 get_task_info,用于獲取 Celery 任務的狀態和結果。

def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result

通過 AsyncResult,我們可以獲取任務的當前狀態(如 PENDINGSUCCESSFAILURE 等)和執行結果。

3. schemas.py:數據模型定義

schemas.py 文件定義了 Pydantic 模型,用于驗證和序列化請求和響應的數據。

任務請求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None

用戶提交的任務請求包含以下字段:

  • code: 任務的 Python 代碼。
  • expires: 任務的過期時間(可選)。
  • time_limit: 任務的時間限制(可選)。
任務結果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None

任務的執行結果包含以下字段:

  • status: 任務的執行狀態(如 successfailure)。
  • output: 任務的標準輸出(可選)。
  • error: 任務的錯誤輸出(可選)。
任務結果響應模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None

任務的查詢結果包含以下字段:

  • task_id: 任務的 ID。
  • task_status: 任務的狀態(如 PENDINGSUCCESS 等)。
  • task_result: 任務的執行結果(可選)。

4. app/__init__.py:Celery 應用初始化

app/__init__.py 文件是 Celery 應用的初始化文件,主要用于配置 Celery 應用和任務的自動發現。

創建 Celery 應用
app = Celery('my_celery_project')

我們創建了一個名為 my_celery_project 的 Celery 應用。

加載配置
app.config_from_object('app.config')

app.config 文件中加載 Celery 的配置。

自動發現任務
app.autodiscover_tasks(['app.tasks'])

自動發現 app.tasks 模塊中的任務。

Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化開始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化開始")

定義了 Worker 和 Beat 的初始化函數,分別在 Worker 和 Beat 啟動時執行。

5. app/config.py:Celery 配置

app/config.py 文件定義了 Celery 的配置。

消息代理和結果存儲
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'

使用 Redis 作為消息代理和結果存儲。

任務結果過期時間
result_expires = 3600

任務結果在 Redis 中保存 1 小時后過期。

序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

使用 JSON 作為任務和結果的序列化格式。

時區配置
timezone = 'Asia/Shanghai'
enable_utc = True

設置時區為 Asia/Shanghai,并啟用 UTC 時間。

6. app/tasks/tasks.py:任務執行邏輯

app/tasks/tasks.py 文件定義了一個 Celery 任務 execute_python_code,用于執行用戶提交的 Python 代碼。

@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)

該任務將用戶提交的代碼字符串保存為臨時文件,然后使用 subprocess.run 執行該文件,捕獲標準輸出和錯誤輸出。如果執行成功,返回 success 狀態和標準輸出;如果執行失敗,返回 failure 狀態和錯誤輸出。最后,刪除臨時文件。

部署分析

version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false

在這個 Docker Compose 配置中,我們定義了三個服務:

  • fastapi:FastAPI 應用,負責接收用戶請求并分發任務。
  • celery-worker:Celery 工作節點,負責執行異步任務。
  • celery-flower:Celery 的監控工具,提供任務執行的可視化界面。
  • redis:Redis 服務,作為 Celery 的消息代理和結果存儲。

代碼的功能和價值

功能

  1. 異步任務執行

    • 用戶可以通過 /tasks 接口提交 Python 代碼,代碼會被異步執行。
    • 任務的執行結果可以通過 /tasks/{task_id} 接口查詢。
  2. 任務狀態管理

    • 任務的狀態(如 PENDINGSUCCESSFAILURE 等)可以通過 /tasks/{task_id} 接口查詢。
  3. 高性能和可擴展性

    • 使用 FastAPI 和 Celery 構建的異步任務引擎能夠處理高并發的任務請求。
    • Celery 的分布式特性使得系統可以輕松擴展以應對更多的任務。
  4. 安全性

    • 通過設置 time_limitexpires,可以限制任務的執行時間和過期時間,防止惡意代碼的長時間執行。
  5. 易用性

    • FastAPI 自動生成的 Swagger UI 使得 API 的使用和調試更加方便。
    • Pydantic 模型確保了請求和響應數據的類型安全。

價值

  1. 高效的任務處理

    • 該系統能夠高效地處理大量異步任務,適用于需要異步執行代碼的場景,如在線代碼執行、數據處理、圖像處理等。
  2. 可擴展性

    • 通過 Celery 的分布式任務隊列,系統可以輕松擴展以處理更多的任務,適合高并發場景。
  3. 安全性

    • 通過限制任務的執行時間和過期時間,系統能夠有效防止惡意代碼的濫用。
  4. 易用性

    • FastAPI 和 Pydantic 的結合使得 API 的開發和維護更加簡單,同時提供了自動生成的文檔和類型檢查。
  5. 靈活性

    • 系統支持自定義任務的執行邏輯,可以根據業務需求擴展任務類型和功能。

總結

通過 FastAPI、Celery 和 Redis 的結合,我們構建了一個高性能、可擴展的分布式異步任務引擎。它能夠高效地處理用戶提交的 Python 代碼,并提供任務狀態查詢功能。該系統適用于需要異步執行代碼的場景,具有高效、安全、易用和靈活的特點。

無論是構建一個在線代碼執行平臺,還是處理復雜的計算任務,這個項目都為你提供了一個強大的基礎。希望這篇文章能為你帶來啟發,讓你在異步任務處理的道路上走得更遠!

附圖

發送任務
在這里插入圖片描述
查詢結果
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/63682.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/63682.shtml
英文地址,請注明出處:http://en.pswp.cn/web/63682.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

介紹 Html 和 Html 5 的關系與區別

HTML(HyperText Markup Language)是構建網頁的標準標記語言,而 HTML5 是 HTML 的最新版本,包含了一些新的功能、元素、API 和屬性。HTML5 相對于早期版本的 HTML(比如 HTML4)有許多重要的改進和變化。以下是…

【win10+RAGFlow+Ollama】搭建本地大模型助手(教程+源碼)

一、RAGFlow簡介 RAGFlow是一個基于對文檔深入理解的開源RAG(Retrieval-augmented Generation,檢索增強生成)引擎。 主要作用: 讓用戶創建自有知識庫,根據設定的參數對知識庫中的文件進行切塊處理,用戶向大…

qwt 之 QwtPlotPicker

QwtPlotMarker 和 QwtPlotPicker 是 Qwt 庫中用于增強 QwtPlot 功能的兩個重要類。它們分別用于在圖中添加標記和實現交互式的選擇或拖動功能。 QwtPlotPicker 提供了交互式的選擇工具,它允許用戶通過鼠標點擊或拖動來選擇圖表中的數據點或區域。這對于實現縮放、平…

C/C++圣誕樹

系列文章 序號直達鏈接1C/C愛心代碼2C/C跳動的愛心3C/C李峋同款跳動的愛心代碼4C/C滿屏飄字表白代碼5C/C大雪紛飛代碼6C/C煙花代碼7C/C黑客帝國同款字母雨8C/C櫻花樹代碼9C/C奧特曼代碼10C/C精美圣誕樹11C/C俄羅斯方塊12C/C貪吃蛇13C/C孤單又燦爛的神-鬼怪14C/C閃爍的愛心15C…

lua dofile 傳參數

cat 1.lua arg[1] 111 arg[2] 222 dofile(./2.lua) cat 2.lua print("First argument is: " .. arg[1]) print("Second argument is: " .. arg[2]) 執行 lua 1.lua,結果為: First argument is: 111 Second argument is: 222 l…

電商數據流通的未來:API接口的智能化與自動化趨勢

在數字化時代,電子商務行業正在以前所未有的速度發展,而API(應用程序編程接口)接口作為電商領域的重要組成部分,其應用和發展趨勢也日益受到關注。API接口作為電商系統與外部服務或平臺交互的橋梁,對電商數…

投標心態:如何在“標海戰術”中保持清醒的頭腦?

在競爭激烈的市場環境下,“標海戰術”——即大規模參與投標——已經成為許多企業爭取市場份額的重要策略。然而,盲目追求投標數量可能導致資源浪費、團隊疲勞以及戰略目標的模糊化。在這種高強度的競爭模式中,如何保持清醒的頭腦,…

【蒼穹外賣】學習心得體會-隨筆

前言 寫了很久,終于可以完整運行項目了,記錄下這幾天的心得體會回顧一下知識點 第一天、Git 分布式版本控制工具 一、Git概述 定義:是分布式版本控制工具,用于管理軟件開發中的源代碼文件,像Java類、xml文件、html…

windows C#-使用構造函數

實例化類或結構時,將會調用其構造函數。 構造函數與該類或結構具有相同名稱,并且通常初始化新對象的數據成員。 在下面的示例中,通過使用簡單構造函數定義了一個名為 Taxi 的類。 然后使用 new 運算符對該類進行實例化。 在為新對象分配內存…

研發效能DevOps: Vite 使用 Element Plus

目錄 一、實驗 1.環境 2.初始化前端項目 3.安裝 vue-route 4.安裝 pinia 5.安裝 axios 6.安裝 Element Plus 7.gitee創建工程 8. 配置路由映射 9.Vite 使用 Element Plus 二、問題 1.README.md 文檔推送到gitee未自動換行 2.訪問login頁面顯示空白 3.表單輸入賬戶…

5G 模組 RG500Q常用AT命令

5G 模組 RG500Q常用AT命令 5G 模組 RG500Q常用AT命令 at ATQNWPREFCFG\"mode_pref\",nr5g && sleep 1 at ATQNWPREFCFG\"nr5g_band\",79 && sleep 1 at atqnwlock\"commo…

NVIDIA DeepStream插件之Gst-nvtracker

NVIDIA DeepStream插件之Gst-nvtracker 1. 源由2. 基礎知識3. Gst-nvtracker插件3.1 插件參數3.2 插件API接口 4. 分析問題5. 總結6. 參考資料 1. 源由 這篇的主要目的是稍微吐槽下NVIDIA的設計,當然其實他們做的還是不錯的(從系統架構設計角度看&#…

進程內存轉儲工具|內存鏡像提取-取證工具

1.內存轉儲,內存轉儲(Memory Dump)是將計算機的物理內存(RAM)內容復制到一個文件中的過程,這個文件通常被稱為“內存轉儲文件”或“核心轉儲文件”(Core Dump),內存轉儲的主要目的是…

Lua語言入門 - Lua 面向對象

Lua 面向對象 面向對象編程(Object Oriented Programming,OOP)是一種非常流行的計算機編程架構,通過創建和操作對象來設計應用程序。 以下幾種編程語言都支持面向對象編程: CJavaObjective-CSmalltalkC#Ruby Lua 是…

Pyqt6在lineEdit中輸入文件名稱并創建或刪除JSON文件

1、創建JSON文件 代碼 import osdef addModulekeyWordFile(self):if "" ! self.lineEdit_module.text():moduleFile self.lineEdit_module.text() .jsonelse:self.toolLogPrinting(請輸入模塊名稱)returnfilePath modulekeyWordFileDir moduleFileif os.path.e…

【Leetcode 熱題 100】236. 二叉樹的最近公共祖先

問題背景 給定一個二叉樹, 找到該樹中兩個指定節點的最近公共祖先。 最近公共祖先的定義為:對于有根樹 T T T 的兩個節點 p p p、 q q q,最近公共祖先表示為一個節點 x x x,滿足 x x x 是 p p p、 q q q 的祖先且 x x x 的深度盡可能大…

數據結構--堆的向上調整和向下調整

文章目錄 1.完全二叉樹2.堆向上調整3.堆向下調整4.測試代碼 1.完全二叉樹 下面的這個就是對于我們的完全二叉樹的這個邏輯結構和物理結構的說明: 邏輯結構就是我們自己認為的進行購想出來的; 但是這個物理結構卻是我們的這個數據結構在內存里面的真是…

智能掛號系統設計典范:SSM 結合 Vue 在醫院的應用實現

摘要 隨著信息技術在管理上越來越深入而廣泛的應用,管理信息系統的實施在技術上已逐步成熟。本文介紹了醫院預約掛號系統的開發全過程。通過分析醫院預約掛號系統管理的不足,創建了一個計算機管理醫院預約掛號系統的方案。文章介紹了醫院預約掛號系統的系…

“魔法糖果盒的秘密:用樸素貝葉斯算法猜糖果顏色”

想象一下,你有一個神奇的糖果盒,這個糖果盒里有兩種糖果:紅色的和藍色的。你閉上眼睛,從盒子里拿出一個糖果,然后嘗一嘗,你想知道這個糖果是紅色的還是藍色的。樸素貝葉斯算法就像是一個魔法規則&#xff0…

Transform組件的用法

文章目錄 1. 概念介紹2. 使用方法3. 示例代碼我們在上一章回中介紹了Checkbox Widget相關的內容,本章回中將介紹Transform Widget.閑話休提,讓我們一起Talk Flutter吧。 1. 概念介紹 我們在這里說的Transform是一種容器類widget,它和Container組件類似。它可以包含其它的組件…