python中使用高并發分布式隊列庫celery的那些坑
- 🌟 簡單理解
- 🛠? 核心功能
- 🚀 工作機制
- 📦 示例代碼(使用 Redis 作為 broker)
- 🔗 常見搭配
- 📦 我的環境
- 📦第一個問題
- 📦第二個問題
- 原因分析
Celery 是一個用于 分布式任務隊列 的 Python 庫,常用于處理異步任務(即任務不需要立即執行,后臺慢慢做),尤其適合執行定時任務或耗時操作。
🌟 簡單理解
Celery 就是讓你把“任務”扔到后臺執行,而不是阻塞當前程序。
🛠? 核心功能
功能 | 說明 |
---|---|
異步任務執行 | 比如發郵件、處理圖片、生成報告等不需要立即完成的操作。 |
分布式任務調度 | 可以運行在多臺服務器上,實現任務負載均衡。 |
定時任務(周期任務) | 類似 crontab ,可設置任務定時執行(如每天 8 點發日報)。 |
任務重試機制 | 失敗任務可以自動重試,適用于網絡波動等場景。 |
與Django/Flask集成 | 非常適合與這些 Web 框架配合使用,將長耗時任務下放到 Celery。 |
🚀 工作機制
Celery 一般由以下幾部分組成:
- Producer(生產者):你寫的代碼,會將任務“發”出去。
- Broker(中間人):任務先存放在消息隊列(如 Redis、RabbitMQ)中。
- Worker(工人):后臺運行的進程,專門“接收”和“執行”這些任務。
- Result Backend(結果后端):可選,記錄任務結果,如執行成功或失敗。
📦 示例代碼(使用 Redis 作為 broker)
# tasks.py
from celery import Celeryapp = Celery('mytasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
運行方式:
celery -A tasks worker --loglevel=info
調用方式(異步執行):
add.delay(3, 5) # 返回一個異步結果對象
🔗 常見搭配
- 消息中間件:Redis、RabbitMQ(推薦 Redis 簡單易用)
- Web框架集成:Django、Flask
- 配合 Flower、Prometheus、Grafana 等工具可實現任務監控
如果你正在開發一個 需要做“異步處理”或“后臺任務”的系統,Celery 是 Python 中的主流選擇之一。但是該庫看似簡單,卻隱藏著無數坑,本文就帶大家了解一下我在使用過程中遇到的那些坑。
📦 我的環境
- windows 10
- python 3.12
- celery 5.5.2
📦第一個問題
執行命令:
celery -A main_async:celery_app worker --loglevel=info
報錯:
[2025-05-29 19:40:22,107: INFO/MainProcess] Task main_async.background_content_similarity[4c84e1c8-6a13-4241-8e62-04e17b3884cb] received
[2025-05-29 19:40:22,142: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
billiard.einfo.RemoteTraceback:
"""
Traceback (most recent call last):File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloopresult = (True, prepare_result(fun(*args, **kwargs)))^^^^^^^^^^^^^^^^^^^^File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_tasktasks, accept, hostname = _loc^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)
"""The above exception was the direct cause of the following exception:Traceback (most recent call last):File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloopresult = (True, prepare_result(fun(*args, **kwargs)))^^^^^^^^^^^^^^^^^^^^File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_tasktasks, accept, hostname = _loc^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)
該問題是由于celery的默認并發網絡編程線程庫引起的,換成eventlet可以解決問題,只需修改啟動命令即可,如下:
celery -A main_async:celery_app worker --loglevel=info -P eventlet
📦第二個問題
第二個問題是日志問題,報錯類似如下所示:
'LoggingProxy' object has no attribute 'encoding'"
原因分析
Celery 在啟動 worker 時,默認會將標準輸出和標準錯誤重定向到其日志系統中。這意味著 sys.stdout 和 sys.stderr 被替換為 LoggingProxy 對象。然而,某些庫或代碼可能期望這些對象具有標準文件對象的屬性,如 encoding 或 fileno,從而導致 AttributeError。
此時只需要將worker_redirect_stdouts參數設置為False即可解決問題,代碼如下:
# Celery 配置
celery_app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Asia/Shanghai",enable_utc=True,include=["main_async"], # 顯式指定任務模塊task_track_started=True, # 跟蹤任務開始狀態task_ignore_result=False, # 保存任務結果task_store_errors_even_if_ignored=True, # 存儲錯誤worker_redirect_stdouts = False # 禁止將stdout和stderr重定向到當前記錄器。
)