Celery在Django中的應用
- 一、項目配置
- 二、異步任務
- 2.1 普通用法
- 2.1.1 通過delay
- 2.1.2 通過apply_async
- 2.2 高級用法
- 2.2.1 任務回調(Callback)
- 2.2.2 任務鏈(Chaining)
- 2.2.3 任務組(Group)
- 2.2.4 任務和弦(Chord)
- 三、定時任務
- 四、啟動celery
- 4.1 命令行方式
- 4.2 腳本方式(容易出問題,建議用命令行方式,很多默認配置內置好)
- 4.2.1 啟動worker(腳本方式博主暫時沒找到好方法能捕獲任務結果)
- 4.2.2 啟動beat
- 4.2.3 合并啟動worker和beat
- 五、監控管理
- 5.1 celery inspect
- 5.2 celery control
- 5.3 celery event
- 5.4 celery multi
- 5.5 celery purge
- 5.6 celery flower
一、項目配置
1.1 確認celery及django版本相對應,本文使用django3.2、celery5.5
1.2 創建一個名為CeleryStudy的django項目,以及一個名為test1_app,目錄結構如下:
1.3 配置celery的setting參數(大部分不需要全局配置,可以針對tasks單獨配置)
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'django-db' # django-db(使用 Django 數據庫存儲結果)
CELERY_ACCEPT_CONTENT = ['json'] # 指定 Celery 接受的任務序列化格式(避免反序列化安全問題)。
CELERY_TASK_SERIALIZER = 'json' # 指定任務的序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 指定結果的序列化方式
CELERY_TIMEZONE = TIME_ZONE # 設置 Celery 的時區(影響定時任務的調度時間)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定定時任務調度器的后端
# 默認內存調度:celery.beat:PersistentScheduler(需配合 beat_schedule_filename)
# Redis 調度:celery.beat:RedisScheduler(需安裝 celery-redis-scheduler)
CELERYD_CONCURRENCY = 4 # Worker 并發數(默認 CPU 核心數)
CELERY_BEAT_SCHEDULE = { # 一般在celery.py文件配置'every-10-seconds': {'task': 'myapp.tasks.debug','schedule': 10.0,},
}
CELERY_BEAT_MAX_LOOP_INTERVAL = 300 # 秒, Beat 調度器的最大循環間隔(默認 5 分鐘)
CELERY_TASK_TIME_LIMIT = 300 # 硬超時 5 分鐘(任務被強制終止)
CELERY_TASK_SOFT_TIME_LIMIT = 240 # 軟超時 4 分鐘(觸發 `SoftTimeLimitExceeded`)
CELERY_TASK_DEFAULT_RETRY_DELAY = 60 # 任務重試間隔 1 分鐘
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(levelname)s] %(message)s' # 自定義 Worker 日志格式
...... # 等等等等等等, 還有一大堆配置
1.4 celery全局配置
# celery.pyimport os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryStudy.settings') # 其作用是為 Django 提供配置文件的定位信息,確保框架能正確加載項目的各項設置app = Celery('CeleryStudy') # celery實例,一般命名為項目名稱
app.config_from_object('django.conf:settings', namespace='CELERY') # celery實例從setting中CELERY開頭的配置獲取app.autodiscover_tasks() # 自動發現并注冊項目中定義的tasks,會發現 @shared_task 和 @app.task
1.5 修改項目init文件,通過給外部導入
# __init__.pyfrom .celery import app as celery_app__all__ = ("celery_app",)
1.6 在app中新建tasks寫入任務邏輯
# tasks.py
"""
@app.task 是“專屬任務”,綁定到具體應用,適合簡單場景。
@shared_task 是“共享任務”,解耦于應用,適合復雜架構。
"""寫法一:
from celery import Celery
app = Celery('proj')@app.task # 綁定到當前 `app` 實例
def add(x, y):return x + y寫法二:
import time
from celery import shared_task@shared_task
def test_add(x, y):time.sleep(2)return x + y@shared_task
def pre_task_test(x):# 定時任務return x
二、異步任務
2.1 普通用法
2.1.1 通過delay
# views.pyfrom django.http import HttpResponse
from .tasks import test_add# Create your views here.def test_celery(request):result = test_add.delay(1, 5)return HttpResponse(result.task_id + ' : ' + result.status)
2.1.2 通過apply_async
# countdown: 延遲執行(秒)。
# eta: 指定具體執行時間(datetime)。
# queue: 指定任務隊列。
# expires: 任務過期時間。
# retry: 是否啟用重試from datetime import datetime, timedelta# 延遲 10 秒執行
test_add.apply_async(args=(1, 5), countdown=10)
# 指定具體執行時間
test_add.apply_async(args=(1, 5), eta=datetime.now() + timedelta(minutes=1))
# 指定隊列和過期時間
test_add.apply_async(args=(1, 5), queue='priority', expires=3600)
2.2 高級用法
通過 signature 對象調用,預生成任務簽名(task.s()),用于創建一個可序列化的任務調用對象。它允許你預定義任務及其參數,而無需立即執行,從而支持更靈活的任務組合(如鏈式調用、組調用等),簽名對象是可序列化的,可以存儲到數據庫或通過網絡傳遞。
sig = test_add.s(1, 5) # 創建簽名對象
sig.apply_async() # 異步執行
sig.delay() # 等價于 apply_async
2.2.1 任務回調(Callback)
在任務成功后觸發另一個任務(通過 link 參數)
test_add.apply_async(args=(1, 5), link=send_notification.s("Task completed!"))
2.2.2 任務鏈(Chaining)
通過 | 符號或 chain() 將多個任務串聯,前一個任務的結果作為后一個任務的輸入
from celery import chain# 方法1:使用 | 符號
result = (task1.s(1, 2) | task2.s() | task3.s())()
# 方法2:使用 chain()
result = chain(task1.s(1, 2), task2.s(), task3.s())()
2.2.3 任務組(Group)
并行執行多個任務,等待所有任務完成
from celery import groupresult = group(task1.s(i) for i in range(10))() # 并發執行 10 個 task1
2.2.4 任務和弦(Chord)
先并行執行一組任務(group),全部完成后執行一個匯總任務
from celery import chordresult = chord((task1.s(i) for i in range(10)), task2.s())() # 10 個 task1 完成后執行 task2
三、定時任務
在celery文件中添加定時任務路由表
# celery.pyapp.conf.beat_schedule = {'task-name': { # 任務名稱(自定義)'task': 'myapp.tasks.my_task', # 任務函數路徑(需可導入)'schedule': 30, # 執行時間規則(固定間隔)# 或 'schedule': crontab(minute='*/5'), # Cron 表達式'args': (16, 16), # 傳遞給任務的參數(可選)'options': {'queue': 'priority'}, # 其他選項(如指定隊列)},# 可定義多個任務
}
通過安裝pip install django-celery-beat
可以實現在admin后臺動態修改定時任務配置
INSTALLED_APPS = [...,'django_celery_beat',
]# 替換 Celery 的調度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
配置完記得遷移數據庫 python manage.py migrate
四、啟動celery
worker = 干活的(執行任務)。
beat = 發任務的(定時生成任務)。
協作關系:beat 是“計劃部門”,worker是“執行部門”,兩者通過 Broker(消息隊列)解耦。
生產建議:分開啟動,Worker 可橫向擴展,Beat 保持單例
4.1 命令行方式
# 啟動 Worker(處理任務)
celery -A CeleryStudy worker -l infocelery -A CeleryStudy worker -l info -P eventlet # windows環境下命令
"""
prefork 是 Celery 在 Linux 上的默認并發模型,它使用多進程(Multiprocessing)處理任務,
適合 CPU 密集型場景。但 Windows 系統不支持 fork() 系統調用,因此無法使用 prefork 池。
在 Windows 上嘗試使用 prefork 會直接報錯,導致 Worker 無法啟動。eventlet 是一個基于協程(Coroutine)的并發庫,通過綠色線程(Green Thread)實現高并發,
適合 I/O 密集型任務(如 Celery 的異步任務場景)。
在 Windows 上,eventlet 是少數可用的高性能并發池之一。
它通過非阻塞 I/O 和協程調度,避免了線程切換的開銷,同時繞過了 GIL 的限制,
能顯著提升任務處理效率
"""# 啟動 Beat(調度任務)
celery -A CeleryStudy beat -l info# 合并啟動
celery -A CeleryStudy worker --beat -l info
4.2 腳本方式(容易出問題,建議用命令行方式,很多默認配置內置好)
注:涉及django自定義管理命令,自己創建一個commands_app
4.2.1 啟動worker(腳本方式博主暫時沒找到好方法能捕獲任務結果)
# CeleryStudy/commands_app/management/commands/run_celery_worker.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):worker = celery_app.Worker(hostname='worker1@%h', # Worker 名稱pool='eventlet', # 進程池類型(prefork/solo/gevent)concurrency=4, # 并發數loglevel='INFO', # 日志級別logfile='/var/log/celery/worker.log', # 日志文件(可選))worker.start()
python manage.py run_celery_worker
4.2.2 啟動beat
# CeleryStudy/commands_app/management/commands/run_celery_beat.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):beat = celery_app.Beat(loglevel='INFO',logfile='/var/log/celery/beat.log', # 日志文件(可選)scheduler='django_celery_beat.schedulers:DatabaseScheduler', # 使用數據庫調度)beat.run()
python manage.py run_celery_beat
4.2.3 合并啟動worker和beat
# CeleryStudy/commands_app/management/commands/run_celery.py
from django.core.management.base import BaseCommand
from threading import Thread
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):# 配置定時任務(可選)celery_app.conf.beat_schedule = {'add-every-10-seconds': {'task': 'proj.celery.debug_task','schedule': 10.0,'args': (16, 16),},}# 啟動 Workerworker = celery_app.Worker(hostname='worker1@%h',pool='prefork',concurrency=4,loglevel='INFO',)# 啟動 Beatbeat = celery_app.Beat(loglevel='INFO',scheduler='django_celery_beat.schedulers:DatabaseScheduler',)"""在后臺線程中運行 Beat如果直接調用 beat.run(),它會阻塞主線程,導致 Worker 無法啟動因此,需要通過線程(Thread)將 Beat 放在后臺運行,避免阻塞主線程"""beat_thread = Thread(target=beat.run)beat_thread.daemon = Truebeat_thread.start()# 啟動 Workerworker.start()
python manage.py run_celery
五、監控管理
5.1 celery inspect
作用:檢查 Worker 狀態、任務信息等(無需停止服務)。
celery -A proj inspect active # 查看正在執行的任務
celery -A proj inspect registered # 查看已注冊的任務列表
celery -A proj inspect scheduled # 查看待執行的定時任務(需 Beat 運行)
celery -A proj inspect reserved # 查看 Worker 已獲取但未執行的任務
celery -A proj inspect stats # 查看 Worker 統計信息(如任務處理數)
5.2 celery control
作用:動態控制 Worker 行為(如關閉、重啟、調整并發數)。
celery -A proj control shutdown # 優雅關閉所有 Worker
celery -A proj control add_consumer Q1 # 動態添加監聽隊列 Q1
celery -A proj control cancel_consumer Q1 # 動態移除監聽隊列 Q1
celery -A proj control pool_grow 10 # 增加 Worker 并發數到 10
celery -A proj control pool_shrink 5 # 減少 Worker 并發數到 5
5.3 celery event
作用:監控 Celery 事件(如任務開始、成功、失敗),可用于自定義儀表盤。
celery -A proj events # 啟動事件監控(輸出到終端)
celery -A proj events -d dump # 以 JSON 格式輸出事件
celery -A proj events -f events.log # 將事件記錄到文件
5.4 celery multi
作用:同時啟動多個 Worker 或 Beat 實例(適用于分布式部署)。
celery multi start w1 w2 -A proj -l info -Q high,low # 啟動兩個 Worker,分別監聽不同隊列
celery multi stop w1 w2 # 停止指定 Worker
5.5 celery purge
作用:清空消息隊列中的所有任務
celery -A proj purge -Q celery # 清空默認隊列
celery -A proj purge -Q high,low # 清空多個隊列
5.6 celery flower
作用:啟動基于 Web 的監控儀表盤(需單獨安裝 flower 包)。
pip install flower
celery -A CeleryStudy flower --port=5555 # 訪問 http://localhost:5555