在Django項目中,如何集成使用Celery框架來完成一些異步任務以及定時任務呢?
1. 安裝
pip install celery # celery框架
pip install django-celery-beat # celery定時任務使用
pip install django-celery-results # celery存儲結果使用
2. Django集成celery
在 settings.py
配置文件中增加如下配置項:
INSTALLED_APPS = [...'celery','django_celery_beat','django_celery_results'
]"""以下是celery的相關配置"""
# 時區配置
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False# broker backend 配置:使用rabbitmq作為中間件
CELERY_BROKER_URL = "amqp://devops:devops123@127.0.0.1:5672/alarm"
CELERY_RESULT_BACKEND = "amqp://devops:devops123@127.0.0.1:5672/alarm"# 使用django_celery_beat動態配置任務
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'# celery序列化和反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']# 下面配置項在有些情況下可以防止死鎖 非常重要!
CELERYD_FORCE_EXECV = True
# 任務結果存儲的過期時間,默認1天過期。如果beat開啟,Celery每天會自動清除,設為0,存儲結果永不過期。
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# 每個worker執行1000次任務后死掉,會自動重啟worker,防止任務占用太多內存導致內存泄漏
CELERY_MAX_TASKS_PER_CHILD = 1000
# 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。
CELERY_DISABLE_RATE_LIMITS = True
# 單個任務的運行時間限制,否則會被殺死
CELERYD_TASK_TIME_LIMIT = 60 * 60
CELERY_TASK_RETRY = 2# celery 隊列配置
from kombu import Exchange, Queue
# consumer_arguments設置隊列的優先級
CELERY_TASK_QUEUES = (Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_email', consumer_arguments={'x-priority': 5}),Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_phone', consumer_arguments={'x-priority': 8}),Queue('calcu_queue', Exchange('calcu_exchange'), routing_key='calcu_feature', consumer_arguments={'x-priority': 10})
)CELERY_TASK_ROUTES = {'alarm.tasks.call_phone': {'queue': 'alarm_queue', 'routing_key': 'alarm_phone'},'alarm.tasks.send_email': {'queue': 'alarm_queue', 'routing_key': 'alarm_email'},'calcu.tasks.execute_calcu': {'queue': 'calcu_queue', 'routing_key': 'calcu_feature'},
}
在 settings.py
同級目錄下,新增一個 celery.py
文件:
from __future__ import absolute_import, unicode_literals # absolute_import: 使用python的庫而不是項目目錄下的文件
import os
from celery import Celery
from django.conf import settingsos.environ.setdefault("DJANGO_SETTINGS_MODULE", "celerymq.settings")
celery_app = Celery("celerymq")celery_app.config_from_object("django.conf:settings", namespace="CELERY")
celery_app.autodiscover_tasks(settings.INSTALLED_APPS)
在App中增加一個 tasks.py
文件,用于實現異步任務:
import time
from celery import shared_task@shared_task(ignore_result=True)
def execute_calcu(dataframe):print(f'execute celery task: calcu feature')time.sleep(10) # 這里寫比較耗時的邏輯print(f'execute calcu feature task run over')
在其他文件邏輯中進行異步調用:
execute_calcu.delay(dataframe)
項目啟動后,如果有異步任務進來,可以在 RabbitMQ
監控平臺看到隊列信息: http://127.0.0.1:15672/
。
3. 啟動命令
啟動 worker
去消費數據。
# 啟動worker
celery worker -A celerymq -l INFO -n alarm_queue -Q alarm_queue -P eventlet# 啟動beat
celery beat -A celerymq -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
在linux中啟動worker的話,可以去掉 -P eventlet
參數。
另外,定時任務推薦使用 django-admin
來下發。