在 Dify 項目中的 Celery:異步任務的實現與集成

Celery 是一個強大而靈活的分布式任務隊列系統,旨在幫助應用程序在后臺異步運行耗時的任務,提高系統的響應速度和性能。在 Dify 項目中,Celery 被廣泛用于處理異步任務和定時任務,并與其他工具(如 Sentry、OpenTelemetry)集成,實現了任務的監控和追蹤。

本文將詳細介紹 Celery 在 Dify 項目中的應用,包括其集成方式、使用方法,以及涉及的關鍵文件和代碼片段。


概述

Dify 項目采用 Flask 作為 Web 框架,為了提升系統性能和用戶體驗,引入了 Celery 來處理耗時的后臺任務。通過將任務分配到不同的隊列,并使用 Celery 的 Worker 進行異步執行,Dify 實現了任務的解耦和并發處理。同時,項目還集成了 SentryOpenTelemetry,對任務執行進行實時監控和性能追蹤。


Celery 的集成與配置

1. 應用工廠 app_factory.py

文件路徑:

  • ./api/app_factory.py

內容分析:

# 將 Celery 擴展導入應用工廠
ext_celery,

在 Flask 應用工廠中,Celery 被作為擴展(ext_celery)引入。這意味著在創建 Flask 應用實例時,Celery 也會被初始化并與應用集成。這種方式常用于 Flask 框架,便于統一管理應用的各個部分。


2. Celery 擴展 ext_celery.py

文件路徑:

  • ./api/extensions/ext_celery.py

內容分析:

from celery import Celery, Task
from celery.schedules import crontabdef init_celery_app(app):celery_app = Celery(app.import_name)# 更新 Celery 配置celery_app.conf.update(broker_url=app.config['CELERY_BROKER_URL'],result_backend=app.config['CELERY_RESULT_BACKEND'],task_serializer='json',accept_content=['json'],timezone='UTC',enable_utc=True,)# 注冊定時任務和導入任務模塊celery_app.conf.update(beat_schedule=beat_schedule,imports=imports,)# 將 Celery 實例注冊到 Flask 應用app.extensions["celery"] = celery_appreturn celery_app

解釋:

  • 初始化 Celery 應用實例:使用 Celery(app.import_name) 創建 Celery 實例。
  • 配置更新:通過 celery_app.conf.update() 設置消息代理、結果后端、任務序列化等配置。
  • 注冊到應用擴展:將 Celery 實例添加到 Flask 應用的擴展中,便于全局訪問和使用。

3. 配置文件 .envpyproject.toml

文件路徑:

  • ./docker/.env
  • ./api/pyproject.toml

內容分析:

# .env 配置
# 使用 Redis 作為 Celery 的消息代理,數據庫索引為 1
CELERY_BROKER_URL=redis://localhost:6379/1
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# pyproject.toml 中的依賴
"celery~=5.5.2",
"opentelemetry-instrumentation-celery==0.48b0",

解釋:

  • 消息代理與結果后端:在 .env 文件中,指定了 Celery 使用 Redis 作為消息代理和結果后端。
  • 依賴管理:在 pyproject.toml 中,明確了項目對 Celery 及其相關監控工具的依賴,確保了環境的一致性。

任務的定義與處理

1. 任務定義文件 tasks/

文件路徑:

  • ./api/tasks/

內容分析:

from celery import shared_task@shared_task
def some_task(args):# 任務的具體實現pass

解釋:

  • 使用 shared_task 裝飾器:在任務模塊中,使用 @shared_task 裝飾器定義任務。這種方式無需直接引用 Celery 應用實例,避免了循環導入的問題,提高了模塊的獨立性和可重用性。
  • 任務類型:任務包括數據處理、郵件發送、操作追蹤等,滿足項目的多種業務需求。

2. 定時任務 schedule/

文件路徑:

  • ./api/schedule/

內容分析:

@app.celery.task(queue="dataset")
def scheduled_task():# 定時任務的邏輯實現pass

解釋:

  • 使用 @app.celery.task 裝飾器:在定時任務中,直接引用了 Celery 應用實例,便于指定任務的隊列和其他配置。
  • 任務調度:定時任務通過 Celery Beat 進行調度,按照預定義的時間間隔自動執行。

3. 操作追蹤管理器 ops_trace_manager.py

文件路徑:

  • ./api/core/ops/ops_trace_manager.py

內容分析:

def send_to_celery(self, tasks: list[TraceTask]):# 將任務發送到 Celery 隊列進行異步處理pass

解釋:

  • 異步處理任務:定義了方法,將追蹤任務發送到 Celery,利用其異步處理能力,提高系統的性能。

Celery 的啟動與運行

1. 啟動腳本 entrypoint.sh

文件路徑:

  • ./api/docker/entrypoint.sh

內容分析:

# 啟動 Celery Worker
exec celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \--loglevel ${LOG_LEVEL:-INFO} --queues ${CELERY_QUEUES:-default}# 啟動 Celery Beat(定時任務調度器)
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}

解釋:

  • 啟動 Celery Worker:使用 celery -A app.celery worker 命令,指定應用實例為 app.celery,并使用 gevent 并發池,提高異步任務的執行效率。
  • 啟動 Celery Beat:使用 celery -A app.celery beat 命令,啟動定時任務調度器,按計劃執行定時任務。

2. 啟動命令示例 README.md

文件路徑:

  • ./api/README.md

內容分析:

uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion

解釋:

  • 指定任務隊列:使用 -Q 參數,指定 Worker 監聽的任務隊列,如 datasetgenerationmail 等,實現任務的分類處理和資源優化。
  • 設置并發和日志-P gevent 指定并發池,-c 1 設置并發數量,--loglevel INFO 設置日志級別為 INFO,便于監控任務執行情況。

與其他工具的集成

1. Sentry 集成 ext_sentry.py

文件路徑:

  • ./api/extensions/ext_sentry.py

內容分析:

from sentry_sdk.integrations.celery import CeleryIntegrationdef init_sentry():sentry_sdk.init(dsn="your_sentry_dsn",integrations=[CeleryIntegration()],# 其他配置)

解釋:

  • 錯誤監控:通過集成 Sentry,Celery 任務執行中的異常將被捕獲并發送到 Sentry,方便開發者及時發現和解決問題。

2. OpenTelemetry 集成 ext_otel.py

文件路徑:

  • ./api/extensions/ext_otel.py

內容分析:

from opentelemetry.instrumentation.celery import CeleryInstrumentordef init_tracing():# 判斷是否為 Celery Worker 進程if not is_celery_worker():CeleryInstrumentor().instrument()else:# 在 Celery Worker 初始化時進行 Instrumentationworker_init.connect(init_celery_worker)def init_celery_worker(*args, **kwargs):CeleryInstrumentor().instrument()

解釋:

  • 性能監控:通過 OpenTelemetry,對 Celery 任務的執行進行性能監控和分布式追蹤,助力分析系統的瓶頸和優化方向。

數據庫支持與依賴管理

1. 數據庫遷移 64b051264f32_init.py

文件路徑:

  • ./api/migrations/versions/64b051264f32_init.py

內容分析:

op.create_table('celery_taskmeta', ...)
op.create_table('celery_tasksetmeta', ...)# 刪除表
op.drop_table('celery_tasksetmeta')
op.drop_table('celery_taskmeta')

解釋:

  • 任務狀態存儲:通過 Alembic 遷移,創建 Celery 用于存儲任務元數據和結果的數據庫表,實現任務狀態的持久化管理。

2. 依賴管理 uv.lock

文件路徑:

  • ./api/uv.lock

內容分析:

name = "celery"
version = "5.5.2"
...name = "opentelemetry-instrumentation-celery"
version = "0.48b0"
...

解釋:

  • 鎖定依賴uv.lock 文件記錄了項目的依賴庫和版本信息,確保了在不同環境下安裝一致的依賴,防止版本沖突。

開發與調試支持

1. VSCode 調試配置 launch.json.example

文件路徑:

  • ./api/.vscode/launch.json.example

內容分析:

{"name": "Celery Worker","type": "python","request": "launch","module": "celery","args": ["worker","-A","app.celery","--loglevel=INFO"],"console": "integratedTerminal"
}

解釋:

  • 調試支持:提供了 VSCode 的調試配置,方便開發人員在 IDE 中對 Celery Worker 進行調試和測試。

2. 開發腳本 start-worker

文件路徑:

  • ./dev/start-worker

內容分析:

#!/bin/bash
# 啟動開發環境下的 Celery Worker
celery -A app.celery worker --loglevel=INFO

解釋:

  • 快速啟動:提供了腳本,簡化了開發環境下 Celery Worker 的啟動命令,提高了開發效率。

實踐建議

  • 任務隊列劃分:根據任務的性質和資源需求,將任務分配到不同的隊列,合理配置 Worker,提高系統的性能和可靠性。
  • 監控與日志:利用 Sentry 和 OpenTelemetry,對任務的執行狀態和性能進行實時監控,及時發現潛在問題。
  • 資源優化:根據任務類型(I/O 密集型或 CPU 密集型),選擇合適的并發模式(如 geventeventletprefork),優化資源利用率。
  • 依賴管理:通過 pyproject.tomluv.lock 等文件,明確項目的依賴版本,確保環境的一致性。
  • 安全性考慮:在配置文件和環境變量中,注意保護敏感信息,如數據庫連接字符串和 Sentry 的 DSN,避免泄漏。

總結

Celery 在 Dify 項目中扮演了關鍵角色,通過處理異步任務和定時任務,提升了系統的性能和用戶體驗。通過與 Flask 應用的深度集成,以及與 Sentry、OpenTelemetry 等工具的結合,Dify 實現了對任務的高效管理和監控。

通過對 Celery 在項目中的集成方式、任務定義、啟動運行和監控手段的了解,我們可以更好地理解其運作原理,并在實際開發中應用這些經驗,提高系統的穩健性和可維護性。


參考資料:

  • Celery 官方文檔
  • Flask 與 Celery 的集成
  • Sentry 對 Celery 的支持
  • OpenTelemetry 對 Celery 的支持

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/908053.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/908053.shtml
英文地址,請注明出處:http://en.pswp.cn/news/908053.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Pytorch Geometric官方例程pytorch_geometric/examples/link_pred.py環境安裝教程及圖數據集制作

最近需要訓練圖卷積神經網絡(Graph Convolution Neural Network, GCNN),在配置GCNN環境上總結了一些經驗。 我覺得對于初學者而言,圖神經網絡的訓練會有2個難點: ①環境配置 ②數據集制作 一、環境配置 我最初光想…

2025年微信小程序開發:AR/VR與電商的最新案例

引言 微信小程序自2017年推出以來,已成為中國移動互聯網生態的核心組成部分。根據最新數據,截至2025年,微信小程序的日活躍用戶超過4.5億,總數超過430萬,覆蓋電商、社交、線下服務等多個領域(WeChat Mini …

互聯網向左,區塊鏈向右

2008年,中本聰首次提出了比特幣的設想,這打開了去中心化的大門。 比特幣白皮書清晰的描述了去中心化支付的解決方案,并分別從以下幾個方面闡述了他的理念: 一、由轉賬雙方點對點的通訊,而不通過中心化的第三方&#xf…

PV操作的C++代碼示例講解

文章目錄 一、PV操作基本概念(一)信號量(二)P操作(三)V操作 二、PV操作的意義三、C中實現PV操作的方法(一)使用信號量實現PV操作代碼解釋: (二)使…

《對象創建的秘密:Java 內存布局、逃逸分析與 TLAB 優化詳解》

大家好呀!今天我們來聊聊Java世界里那些"看不見摸不著"但又超級重要的東西——對象在內存里是怎么"住"的,以及JVM這個"超級管家"是怎么幫我們優化管理的。放心,我會用最接地氣的方式講解,保證連小學…

簡單實現Ajax基礎應用

Ajax不是一種技術,而是一個編程概念。HTML 和 CSS 可以組合使用來標記和設置信息樣式。JavaScript 可以修改網頁以動態顯示,并允許用戶與新信息進行交互。內置的 XMLHttpRequest 對象用于在網頁上執行 Ajax,允許網站將內容加載到屏幕上而無需…

詳解開漏輸出和推挽輸出

開漏輸出和推挽輸出 以上是 GPIO 配置為輸出時的內部示意圖,我們要關注的其實就是這兩個 MOS 管的開關狀態,可以組合出四種狀態: 兩個 MOS 管都關閉時,輸出處于一個浮空狀態,此時他對其他點的電阻是無窮大的&#xff…

Matlab實現LSTM-SVM回歸預測,作者:機器學習之心

Matlab實現LSTM-SVM回歸預測,作者:機器學習之心 目錄 Matlab實現LSTM-SVM回歸預測,作者:機器學習之心效果一覽基本介紹程序設計參考資料 效果一覽 基本介紹 代碼主要功能 該代碼實現了一個LSTM-SVM回歸預測模型,核心流…

Leetcode - 周賽 452

目錄 一,3566. 等積子集的劃分方案二,3567. 子矩陣的最小絕對差三,3568. 清理教室的最少移動四,3569. 分割數組后不同質數的最大數目 一,3566. 等積子集的劃分方案 題目列表 本題有兩種做法,dfs 選或不選…

【FAQ】HarmonyOS SDK 閉源開放能力 —Account Kit(5)

1.問題描述: 集成華為一鍵登錄的LoginWithHuaweiIDButton, 但是Button默認名字叫 “華為賬號一鍵登錄”,太長無法顯示,能否簡寫成“一鍵登錄”與其他端一致? 解決方案: 問題分兩個場景: 一、…

Asp.Net Core SignalR的分布式部署

文章目錄 前言一、核心二、解決方案架構三、實現方案1.使用 Azure SignalR Service2.Redis Backplane(Redis 背板方案)3.負載均衡配置粘性會話要求無粘性會話方案(僅WebSockets)完整部署示例(Redis Docker)性能優化技…

L2-054 三點共線 - java

L2-054 三點共線 語言時間限制內存限制代碼長度限制棧限制Java (javac)2600 ms512 MB16KB8192 KBPython (python3)2000 ms256 MB16KB8192 KB其他編譯器2000 ms64 MB16KB8192 KB 題目描述: 給定平面上 n n n 個點的坐標 ( x _ i , y _ i ) ( i 1 , ? , n ) (x\_i…

【 java 基礎知識 第一篇 】

目錄 1.概念 1.1.java的特定有哪些? 1.2.java有哪些優勢哪些劣勢? 1.3.java為什么可以跨平臺? 1.4JVM,JDK,JRE它們有什么區別? 1.5.編譯型語言與解釋型語言的區別? 2.數據類型 2.1.long與int類型可以互轉嗎&…

高效背誦英語四級范文

以下是結合認知科學和實戰驗證的 ??高效背誦英語作文五步法??,助你在30分鐘內牢固記憶一篇作文,特別適配考前沖刺場景: 📝 ??一、解構作文(5分鐘)?? ??拆解邏輯框架?? 用熒光筆標出&#xff…

RHEL7安裝教程

RHEL7安裝教程 下載RHEL7鏡像 通過網盤分享的文件:RHEL 7.zip 鏈接: https://pan.baidu.com/s/1ExLhdJigj-tcrHJxIca5XA?pwdjrrj 提取碼: jrrj --來自百度網盤超級會員v6的分享安裝 1.打開VMware,新建虛擬機,選擇自定義然后下一步 2.點擊…

結構型設計模式之Decorator(裝飾器)

結構型設計模式之Decorator(裝飾器) 前言: 本案例通過李四舉例,不改變源代碼的情況下 對“才藝”進行增強。 摘要: 摘要: 裝飾器模式是一種結構型設計模式,允許動態地為對象添加功能而不改變其…

Kotlin委托機制使用方式和原理

目錄 類委托屬性委托簡單的實現屬性委托Kotlin標準庫中提供的幾個委托延遲屬性LazyLazy委托參數可觀察屬性Observable委托vetoable委托屬性儲存在Map中 實踐方式雙擊back退出Fragment/Activity傳參ViewBinding和委托 類委托 類委托有點類似于Java中的代理模式 interface Base…

SpringBoot接入Kimi實踐記錄輕松上手

kimi簡單使用 什么是Kimi API 官網:https://platform.moonshot.cn/ Kimi API 并不是一個我所熟知的廣泛通用的術語。我的推測是,你可能想問的是關于 API 的一些基礎知識。API(Application Programming Interface,應用程序編程接…

書籍在其他數都出現k次的數組中找到只出現一次的數(7)0603

題目 給定一個整型數組arr和一個大于1的整數k。已知arr中只有1個數出現了1次,其他的數都出現了k次,請返回只出現了1次的數。 解答: 對此題進行思路轉換,可以將此題,轉換成k進制數。 k進制的兩個數c和d,…

React 項目初始化與搭建指南

React 項目初始化有多種方式,可以選擇已有的腳手架工具快速創建項目,也可以自定義項目結構并使用構建工具實現項目的構建打包流程。 1. 腳手架方案 1.1. Vite 通過 Vite 創建 React 項目非常簡單,只需一行命令即可完成。Vite 的工程初始化…