Django中使用Celery(通用方案、官方方案)
目錄
- Django中使用Celery(通用方案、官方方案)
- 通用方案
- 場景
- 前置準備
- 完整代碼
- Celery官方方案
- 【1】注冊celery配置
- 【2】創建celery文件
- 【3】init注冊
- 【4】添加任務
- 【5】啟動worker異步任務
- 【6】在view中調用
通用方案
場景
現在定義了一張圖片表,要求每隔一段時間就將表內容更新進緩存庫中
前置準備
- 安裝模塊
pip install Django4.2.13
pip install celery
pip install cache
pip install eventlet
pip install redis
- 定義圖片表
from django.db import modelsclass Display(models.Model):title = models.CharField(max_length=16, unique=True, verbose_name='名稱')image = models.ImageField(upload_to='banner', verbose_name='圖片')link = models.CharField(max_length=64, verbose_name='跳轉鏈接')info = models.TextField(verbose_name='詳情')
- 路徑結構
完整代碼
# tasks.py
from celerys import app
from datetime import timedeltaapp.conf.beat_schedule = {'update_cache_every_30_seconds': {'task': 'celerys.update_cache','schedule': timedelta(seconds=30),},
}
# celerys.py
import os
import sys# 執行文件必須與Django配置的環境變量同級(manage.py)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Luffy_API.settings.dev')
import django
django.setup()from app.models import Display
from app.serializer import DisplaySerializer
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from django.core.cache import cachebroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('celery',broker=broker,backend=backend)@app.task
def update_cache():queryset = Display.objects.all()serializer = DisplaySerializer(instance=queryset, many=True)cache_data = queryset.values()cache.set('display', cache_data)return '緩存已更新'
- 執行命令
celery -A celerys worker -l info -P eventlet
celery -A tasks beat -l info
Celery官方方案
【1】注冊celery配置
在settings
中添加Celery
的配置文件
# 用redis創建隊列庫1
CELERY_BROKER_URL='redis://127.0.0.1:6379/1'
# 用redis創建結果存儲庫2
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TASK_SERIALIZER='json'
# 結果序列化方案
CELERY_RESULT_SERIALIZER='json'
# 任務結果過期時間,秒
CELERY_TASK_RESULT_EXPIRES=60 * 60 * 24
# 時區配置
CELERY_TIMEZONE='Asia/Shanghai'
【2】創建celery文件
新建tasks
文件和celerys
文件
- 目錄結構
【3】init注冊
在根目錄下的__init__.py
文件中注冊celerys
文件,如果不加則會自動識別根目錄中的celery
文件而不是我們創建的celerys
from .celerys import app as celery_app__all__=('celery_app',)
【4】添加任務
tasks.py
文件中存儲要被執行的異步文件
from celerys import app@add.task()
def add(a, b):return a + b
celerys.py
存放celery的配置文件
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Django_API.settings.dev')
import django
from celery import Celery, shared_task
from django.conf import settings# 啟動worker的命名
app = Celery('Django_API')# 自動檢索前綴為CELERY的配置
app.config_from_object('django.conf:settings', namespace='CELERY')# 自動搜索每個app中的tasks.py文件
app.autodiscover_tasks()
【5】啟動worker異步任務
啟動worker
celery -A Django_API worker -l info -P eventlet
【6】在view中調用
from rest_framework.views import APIView
from rest_framework.response import Response
from .tasks import addclass TestView(APIView):def get(self, request):res = add.delay(500, 2)return Response(f'res:{res}')