學習目標:
通過自定義的CronScheduler調度器在兼容標準的調度器的情況下,查詢自定義任務表去生成調度任務并分配給celery worker進行執行
不了解Celery框架的小伙伴可以先看一下我的上一篇文章:Celery框架組件分析及使用
學習內容:
- 創建自定義的Scheduler,設置自定義的Scheduler實現對原有配置的定時任務的兼容
- 如何啟動自定義Scheduler對任務進行調度
- 創建自定義Scheduler遇到的一些問題
如何創建自定義的Scheduler:
在創建自定義Scheduler之前,我們先了解一下Scheduler類,這個類是Celery Beat的核心類,維護了任務的創建邏輯,調度邏輯。入下圖是整個celery-beat啟動執行流程圖
+---------------------+| Celery Beat 啟動 |+----------+----------+|v+----------+----------+| Service.start() | --> 初始化 Scheduler(此時調用 setup_schedule)+----------+----------+|v+----------+----------+| scheduler.tick() | 每隔 interval 觸發一次,把當前任務按最近執行時間放入 _heap(調度堆)+----------+----------+|v+--------------------------+| schedule.get_schedule() || 返回所有任務 ScheduleEntry | 此處會獲取所有的可執行的調度任務,需要重寫+--------------------------+|v+-----------------------------+| entry.is_due() || 判斷任務是否需要執行 |+-----------------------------+|如果是 True 否則等待下次 tick|v+-----------------------------+| apply_async(entry) || 發出任務執行 |+-----------------------------+
要定義一個DjangoCronScheduler繼承Scheduler,需要封裝自己的數據庫Cron表達式數據加載邏輯
import logging
import refrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from django.db import close_old_connectionsfrom record.models import KeywordVideoSchedule, AccountVideoSchedulelogger = logging.getLogger(__name__)class DjangoCronScheduler(Scheduler):"""從Django模型動態加載cron任務的調度器"""def __init__(self, *args, **kwargs):self._schedule = {}self._cron_schedule = {}self._last_refresh = Noneself._refresh_interval = kwargs.pop('refresh_interval', 3600) # 默認3600秒刷新一次super().__init__(*args, **kwargs)# 第一步,初始化所有的動態cron表達式的任務def setup_schedule(self):"""初始化調度"""super().setup_schedule()# 加載靜態配置中的任務(即 app.conf.beat_schedule)for name, entry in self.app.conf.beat_schedule.items():if isinstance(entry, dict):self._schedule[name] = self.Entry(name=name,task=entry['task'],schedule=entry['schedule'],args=entry.get('args', []),kwargs=entry.get('kwargs', {}),options=entry.get('options', {}),app=self.app)else:# 已經是 ScheduleEntry 的實例self._schedule[name] = entryself._refresh_cron_tasks(force=True)def tick(self):"""重寫tick方法,處理動態刷新"""if self.should_refresh():self._refresh_cron_tasks()return super().tick()def should_refresh(self):"""檢查是否需要刷新任務"""if self._last_refresh is None:return Truenow = self.app.now()return (now - self._last_refresh).total_seconds() >= self._refresh_intervaldef _refresh_cron_tasks(self, force=False):"""從Django模型刷新cron任務"""logger.info(f'begin refresh_cron_tasks: force={force}')try:close_old_connections() # 確保數據庫連接有效# 獲取所有啟用的任務(關鍵詞視頻任務)db_tasks_keyword = {f'keyword_video_{task.id}': taskfor task in KeywordVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 獲取所有啟用的任務(賬號視頻任務)db_tasks_account = {f'account_video_{task.id}': taskfor task in AccountVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 合并兩個字典db_tasks = {**db_tasks_keyword, **db_tasks_account}current_names = set(self._cron_schedule.keys())new_names = set(db_tasks.keys())# 刪除不再存在的任務for name in current_names - new_names:del self._cron_schedule[name]logging.info(f"Removed cron task: {name}")# 添加或更新任務for name, db_task in db_tasks.items():existing = self._cron_schedule.get(name)# 判斷任務類型,并賦值任務名if name.startswith('keyword_video_'):task_name = 'record.tasks.spider_keyword_video'elif name.startswith('account_video_'):task_name = 'record.tasks.spider_account_video'else:logger.error(f"Unknown cron task: {name}")continue# 如果是新任務或cron表達式有變化if not existing or getattr(existing, 'cron_expr', None) != db_task.collect_cron:try:crontab_schedule = crontab(*self._parse_cron_expr(db_task.collect_cron))entry = ScheduleEntry(name=name,task=task_name,schedule=crontab_schedule,args=[db_task.id],kwargs={},options={},total_run_count=0,last_run_at=existing.last_run_at if existing else None)self._cron_schedule[name] = entrylogging.info(f"{'Updated' if existing else 'Added'} cron task: {name} ({db_task.collect_cron})")except ValueError as e:logging.error(f"Invalid cron expression {db_task.collect_cron} for task {name}: {str(e)}")elif force:# 強制刷新時更新其他參數self._cron_schedule[name].update({'args': [db_task.id],'kwargs': {}})self._last_refresh = self.app.now()except Exception as e:logging.error(f"Failed to refresh cron tasks: {str(e)}")finally:close_old_connections() # 清理數據庫連接def _parse_cron_expr(self, cron_expr):"""解析cron表達式為celery crontab參數"""# 標準的cron表達式是包含秒的,但是python的crontab對象最小的單位為分,如果是6位含s的數據,需要舍棄前面的秒的處理cron_parts = cron_expr.strip().split()if len(cron_parts) == 6:# 忽略秒段(第 0 段)cron_parts = cron_parts[1:]if len(cron_parts) != 5:logger.error(f"無效的 Cron 表達式: {cron_expr} (需要5或6個字段)")raise ValueError("Cron expression must have 5 parts or 6 parts")logger.info(f'cron_parts : {cron_parts}')# 通用轉換規則converted = []for part in cron_parts:# 規則1: 將 0/x 轉換為 */x。celery不支持0/5這種格式if re.match(r'^0/\d+$', part):part = part.replace('0/', '*/')# 規則2: 將 1-5/x 轉換為 1-5/x (保持不變)# 規則3: 保持 *、數字、, 等標準語法不變converted.append(part)return (converted[0], # minuteconverted[1], # hourconverted[2], # day_of_monthconverted[3], # month_of_yearconverted[4] # day_of_week)@propertydef schedule(self):"""確保 Celery Beat 能識別自定義調度任務"""return self.schedule()def get_schedule(self):"""獲取當前調度任務"""return {**self._schedule, **self._cron_schedule}
- 定義一個
_schedule
屬性接收原有的通過代碼編寫的固定cron表達式的執行task,例如現有項目在celery_app.py
當中配置的
app.conf.beat_schedule = {'resume_autoclip_task_5min': {'task': 'record.comsumers.resume_autoclip_task','schedule': crontab(minute='*/1')},'flash_all_template_view_and_like_1day': {'task': 'record.comsumers.flash_all_template_view_and_like','schedule': crontab(hour='1', minute='0')}
}
-
定義一個
_cron_schedule
屬性用來接收數據庫當中配置的動態cron表達式的數據,根據這個表達式生成ScheduleEntry對象,也就是每個動態需要執行task的對象. -
定義一個
_refresh_interval
屬性進行刷新,因為我們的任務數據是配置在數據,數據可能會刪除,也可能會修改,暫時還沒考慮實時去更新Scheduler的條件下,我們就需要考慮動態刷新的額情況 -
對設置的屬性進行初始化賦值,我們需要關注的函數為setup_schedule和tick setup_schedule函數是sever啟動的初始化入口,我們可以在項目啟動時把我們需要初始化的數據都初始化進去,我們需要額外注意的是
crontab
對象接收的cron表達式和我們傳統不一樣,一般我們傳統的cron表達式都是6位,crontab只接收5位參數,不攜帶秒的那一位,而且crontab不接收0/5這種格式,會報錯。所以我在此處特殊處理了cron表達式的,將其通過_parse_cron_expr
轉換成crontab對象能夠接收的參數。 -
def schedule(self)函數的作用,tick() 方法將按照執行時間獲取可執行的任務,會觸發調用schedule(self)方法,該方法會將系統代碼配置任務和cron表達式的任務合并返回給調度器。理論上在
setup_schedule
階段也可以將所有的數據都加載到同一個_schedule屬性下,但是博主測試之后發現不成功,只能使用該方式進行處理了。
至此我們通過自定義的Scheduler就算編寫完成了,下面我們來配置及啟動測試。
如何啟動自定義Scheduler對任務進行調度
- 首先我們使用了celery beat組件,需要我們在項目當中引入celery beat,博主項目結合使用的是Django框架,所以在setting.py文件的屬性
INSTALLED_APPS
當中添加組件django_celery_beat
INSTALLED_APPS = ['django_celery_beat', # 需要引入celery_beat進行任務生成調度
]
- 啟動celery beat使用自定義的Scheduler,有兩種方式,一個是通過代碼配置,另外一種是通過命令行參數啟動
- 通過代碼配置,需要在celery_app.py文件中指定我們的beat_scheduler
# 使用自定義的調度器,app為你的app名稱,modules為定義的自定義Scheduler的文件名稱 app.conf.beat_scheduler = 'app.modules.DjangoCronScheduler'
- 配置完成后我們通過標準的啟動方式就能啟動了
celery -A you_projiect_name beat -l info
- 啟動完成后即可看到,啟動的提示信息
- 通過代碼配置,需要在celery_app.py文件中指定我們的beat_scheduler
- 啟動參數指定啟動的Scheduler,此時我們就不需要配置celery_app.py當中的beat_scheduler,直接在啟動命令行上去配置
celery -A autoclip beat -S app.modules.DjangoCronScheduler -l info
創建自定義Scheduler遇到的一些問題
- 在編寫自定義Scheduler之前,博主也不是很了解Scheduler的執行原理,只是看到網上很多使用了Scheduler自帶的cron配置頁去配置的cron表達式任務,它是有自己兩張固定的表結構的,如果想使用自己的cron表達式和表數據,還必須將自己的數據同步到
django_celery_beat_periodictask
和django_celery_beat_crontabschedule
表當中。博主就想能不能使用自己的表去執行任務,統計結果,因為每次網上述的兩個表同步數據感覺處理下來也不是很方便。 - Scheduler數據的加載的時機和機制,如果不了解這個的話,確實是無從下手,就算初始化了數據也不清楚是怎么同步到任務堆里的,需要好好看一下上述的執行流程圖。
- crontab對象的參數和標準的cron表達式不一致導致的報錯,包括長度的區別,對于0/4這種格式的處理區別。
- 理論上還確少,數據動態變化時實時的變更
schedule
的方法,博主這邊偷懶使用了定時刷新的機制,這個使用場景對于配置實時性感知要求不高的情況下是滿足要求的。