目錄結構:
config下配置:
__init__:
import os
import sys
sys.path.append(os.getcwd())
from celery import Celeryapp=Celery('celeryTester') # 創建一個Celery實例,名字自定義
app.config_from_object('config.celery_config') # 從celery_config獲取配置
celery_config:
from .celery_signals import task_failure_handler,task_postrun_handler # 全局信號
broker_url = 'redis://xxx' # 作為任務隊列,beat及手動調用往里添加任務,worker從這里取任務
result_backend = 'redis://xxx' # 任務結果
broker_connection_retry_on_startup=True # 重連可以不配
# 序列化相關,默認都是json
task_serializer='json'
accept_content=['json']
result_serializer='json'
#時區設置
timezone='Asia/Shanghai'
enable_utc=True
# 從哪些模塊獲取任務,一般是從根目錄啟動worker及beat進程,目錄以根目錄起
include=['tasks.sample_tasks']
# 使用自定義調度器
beat_schedule= {} # 動態獲取這里配置為空
beat_scheduler = 'my_scheduler.dynamic_scheduler.DynamicScheduler' # 使用自定義scheduler類
beat_scheduler_reload_interval = 300 # beat_scheduler多久重新加載,可覆蓋默認值
celery_signals:
import datetimefrom celery.signals import task_postrun, task_failure
from utils.coon import DatabaseConnection@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):# 只記錄有返回值的成功任務,暫時只更新periodic_task表里的狀態if state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': True},# set值'task=%s',(task.name,))# where后condition值@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 任務失敗走此信號機制duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': False},'task=%s',(task.name,))
dynamic_scheduler:
import json
import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from utils.coon import DatabaseConnectionclass DynamicScheduler(Scheduler):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._schedule = {}self._last_reload = time.time()self.reload_interval = kwargs.get('reload_interval', 60*5) # 默認300秒重載def setup_schedule(self):"""初始化加載調度配置,起beat進程時會調用"""self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""從配置源加載調度配置"""schedule = {}tasks =DatabaseConnection().fetch_all('select * from periodic_task where enabled=1')for item in tasks:name = item['name']if item['interval']:sche = item['interval']else:cron = item['crontab']minute, hour, day, month, week_day = cron.strip().split(' ')sche = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)item['args']=item['args'] and json.loads(item['args'])item['kwargs']=item['kwargs'] and json.loads(item['kwargs'])schedule[name] = ScheduleEntry(name=name,task=item['task'],schedule=sche,args=item.get('args' or ()),kwargs=item.get('kwargs', {}))return scheduledef tick(self, event_t=..., min=..., max=...):"""重載tick方法實現定期檢查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:"""返回當前調度配置"""return self._schedule
sample_tasks:
from config import app@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def hello_world():return "Hello World!"
coon下面就是一個數據庫連接類,自己隨便找個就行這里就不貼出來了
啟動命令:
windows機器下
worker進程:
celery -A app worker --loglevel=info --pool=solo
beat進程
celery -A app beat --loglevel=info
最后解釋下原理,一般最簡單的是在celery_config配置beat_schedule,我們是通過自定義Scheduler類,從數據庫里面取值,類似于動態拿到這個beat_schedule值。好處就是可以直接通過配置修改或者添加定時任務,不用再去代碼修改添加了,并在最開始和結束添加落表等操作