Celery 是一個強大而靈活的分布式任務隊列系統,旨在幫助應用程序在后臺異步運行耗時的任務,提高系統的響應速度和性能。在 Dify 項目中,Celery 被廣泛用于處理異步任務和定時任務,并與其他工具(如 Sentry、OpenTelemetry)集成,實現了任務的監控和追蹤。
本文將詳細介紹 Celery 在 Dify 項目中的應用,包括其集成方式、使用方法,以及涉及的關鍵文件和代碼片段。
概述
Dify 項目采用 Flask 作為 Web 框架,為了提升系統性能和用戶體驗,引入了 Celery 來處理耗時的后臺任務。通過將任務分配到不同的隊列,并使用 Celery 的 Worker 進行異步執行,Dify 實現了任務的解耦和并發處理。同時,項目還集成了 Sentry 和 OpenTelemetry,對任務執行進行實時監控和性能追蹤。
Celery 的集成與配置
1. 應用工廠 app_factory.py
文件路徑:
./api/app_factory.py
內容分析:
# 將 Celery 擴展導入應用工廠
ext_celery,
在 Flask 應用工廠中,Celery 被作為擴展(ext_celery
)引入。這意味著在創建 Flask 應用實例時,Celery 也會被初始化并與應用集成。這種方式常用于 Flask 框架,便于統一管理應用的各個部分。
2. Celery 擴展 ext_celery.py
文件路徑:
./api/extensions/ext_celery.py
內容分析:
from celery import Celery, Task
from celery.schedules import crontabdef init_celery_app(app):celery_app = Celery(app.import_name)# 更新 Celery 配置celery_app.conf.update(broker_url=app.config['CELERY_BROKER_URL'],result_backend=app.config['CELERY_RESULT_BACKEND'],task_serializer='json',accept_content=['json'],timezone='UTC',enable_utc=True,)# 注冊定時任務和導入任務模塊celery_app.conf.update(beat_schedule=beat_schedule,imports=imports,)# 將 Celery 實例注冊到 Flask 應用app.extensions["celery"] = celery_appreturn celery_app
解釋:
- 初始化 Celery 應用實例:使用
Celery(app.import_name)
創建 Celery 實例。 - 配置更新:通過
celery_app.conf.update()
設置消息代理、結果后端、任務序列化等配置。 - 注冊到應用擴展:將 Celery 實例添加到 Flask 應用的擴展中,便于全局訪問和使用。
3. 配置文件 .env
和 pyproject.toml
文件路徑:
./docker/.env
./api/pyproject.toml
內容分析:
# .env 配置
# 使用 Redis 作為 Celery 的消息代理,數據庫索引為 1
CELERY_BROKER_URL=redis://localhost:6379/1
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# pyproject.toml 中的依賴
"celery~=5.5.2",
"opentelemetry-instrumentation-celery==0.48b0",
解釋:
- 消息代理與結果后端:在
.env
文件中,指定了 Celery 使用 Redis 作為消息代理和結果后端。 - 依賴管理:在
pyproject.toml
中,明確了項目對 Celery 及其相關監控工具的依賴,確保了環境的一致性。
任務的定義與處理
1. 任務定義文件 tasks/
文件路徑:
./api/tasks/
內容分析:
from celery import shared_task@shared_task
def some_task(args):# 任務的具體實現pass
解釋:
- 使用
shared_task
裝飾器:在任務模塊中,使用@shared_task
裝飾器定義任務。這種方式無需直接引用 Celery 應用實例,避免了循環導入的問題,提高了模塊的獨立性和可重用性。 - 任務類型:任務包括數據處理、郵件發送、操作追蹤等,滿足項目的多種業務需求。
2. 定時任務 schedule/
文件路徑:
./api/schedule/
內容分析:
@app.celery.task(queue="dataset")
def scheduled_task():# 定時任務的邏輯實現pass
解釋:
- 使用
@app.celery.task
裝飾器:在定時任務中,直接引用了 Celery 應用實例,便于指定任務的隊列和其他配置。 - 任務調度:定時任務通過 Celery Beat 進行調度,按照預定義的時間間隔自動執行。
3. 操作追蹤管理器 ops_trace_manager.py
文件路徑:
./api/core/ops/ops_trace_manager.py
內容分析:
def send_to_celery(self, tasks: list[TraceTask]):# 將任務發送到 Celery 隊列進行異步處理pass
解釋:
- 異步處理任務:定義了方法,將追蹤任務發送到 Celery,利用其異步處理能力,提高系統的性能。
Celery 的啟動與運行
1. 啟動腳本 entrypoint.sh
文件路徑:
./api/docker/entrypoint.sh
內容分析:
# 啟動 Celery Worker
exec celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \--loglevel ${LOG_LEVEL:-INFO} --queues ${CELERY_QUEUES:-default}# 啟動 Celery Beat(定時任務調度器)
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}
解釋:
- 啟動 Celery Worker:使用
celery -A app.celery worker
命令,指定應用實例為app.celery
,并使用gevent
并發池,提高異步任務的執行效率。 - 啟動 Celery Beat:使用
celery -A app.celery beat
命令,啟動定時任務調度器,按計劃執行定時任務。
2. 啟動命令示例 README.md
文件路徑:
./api/README.md
內容分析:
uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion
解釋:
- 指定任務隊列:使用
-Q
參數,指定 Worker 監聽的任務隊列,如dataset
、generation
、mail
等,實現任務的分類處理和資源優化。 - 設置并發和日志:
-P gevent
指定并發池,-c 1
設置并發數量,--loglevel INFO
設置日志級別為 INFO,便于監控任務執行情況。
與其他工具的集成
1. Sentry 集成 ext_sentry.py
文件路徑:
./api/extensions/ext_sentry.py
內容分析:
from sentry_sdk.integrations.celery import CeleryIntegrationdef init_sentry():sentry_sdk.init(dsn="your_sentry_dsn",integrations=[CeleryIntegration()],# 其他配置)
解釋:
- 錯誤監控:通過集成 Sentry,Celery 任務執行中的異常將被捕獲并發送到 Sentry,方便開發者及時發現和解決問題。
2. OpenTelemetry 集成 ext_otel.py
文件路徑:
./api/extensions/ext_otel.py
內容分析:
from opentelemetry.instrumentation.celery import CeleryInstrumentordef init_tracing():# 判斷是否為 Celery Worker 進程if not is_celery_worker():CeleryInstrumentor().instrument()else:# 在 Celery Worker 初始化時進行 Instrumentationworker_init.connect(init_celery_worker)def init_celery_worker(*args, **kwargs):CeleryInstrumentor().instrument()
解釋:
- 性能監控:通過 OpenTelemetry,對 Celery 任務的執行進行性能監控和分布式追蹤,助力分析系統的瓶頸和優化方向。
數據庫支持與依賴管理
1. 數據庫遷移 64b051264f32_init.py
文件路徑:
./api/migrations/versions/64b051264f32_init.py
內容分析:
op.create_table('celery_taskmeta', ...)
op.create_table('celery_tasksetmeta', ...)# 刪除表
op.drop_table('celery_tasksetmeta')
op.drop_table('celery_taskmeta')
解釋:
- 任務狀態存儲:通過 Alembic 遷移,創建 Celery 用于存儲任務元數據和結果的數據庫表,實現任務狀態的持久化管理。
2. 依賴管理 uv.lock
文件路徑:
./api/uv.lock
內容分析:
name = "celery"
version = "5.5.2"
...name = "opentelemetry-instrumentation-celery"
version = "0.48b0"
...
解釋:
- 鎖定依賴:
uv.lock
文件記錄了項目的依賴庫和版本信息,確保了在不同環境下安裝一致的依賴,防止版本沖突。
開發與調試支持
1. VSCode 調試配置 launch.json.example
文件路徑:
./api/.vscode/launch.json.example
內容分析:
{"name": "Celery Worker","type": "python","request": "launch","module": "celery","args": ["worker","-A","app.celery","--loglevel=INFO"],"console": "integratedTerminal"
}
解釋:
- 調試支持:提供了 VSCode 的調試配置,方便開發人員在 IDE 中對 Celery Worker 進行調試和測試。
2. 開發腳本 start-worker
文件路徑:
./dev/start-worker
內容分析:
#!/bin/bash
# 啟動開發環境下的 Celery Worker
celery -A app.celery worker --loglevel=INFO
解釋:
- 快速啟動:提供了腳本,簡化了開發環境下 Celery Worker 的啟動命令,提高了開發效率。
實踐建議
- 任務隊列劃分:根據任務的性質和資源需求,將任務分配到不同的隊列,合理配置 Worker,提高系統的性能和可靠性。
- 監控與日志:利用 Sentry 和 OpenTelemetry,對任務的執行狀態和性能進行實時監控,及時發現潛在問題。
- 資源優化:根據任務類型(I/O 密集型或 CPU 密集型),選擇合適的并發模式(如
gevent
、eventlet
或prefork
),優化資源利用率。 - 依賴管理:通過
pyproject.toml
和uv.lock
等文件,明確項目的依賴版本,確保環境的一致性。 - 安全性考慮:在配置文件和環境變量中,注意保護敏感信息,如數據庫連接字符串和 Sentry 的 DSN,避免泄漏。
總結
Celery 在 Dify 項目中扮演了關鍵角色,通過處理異步任務和定時任務,提升了系統的性能和用戶體驗。通過與 Flask 應用的深度集成,以及與 Sentry、OpenTelemetry 等工具的結合,Dify 實現了對任務的高效管理和監控。
通過對 Celery 在項目中的集成方式、任務定義、啟動運行和監控手段的了解,我們可以更好地理解其運作原理,并在實際開發中應用這些經驗,提高系統的穩健性和可維護性。
參考資料:
- Celery 官方文檔
- Flask 與 Celery 的集成
- Sentry 對 Celery 的支持
- OpenTelemetry 對 Celery 的支持