Django 自定義celery-beat調度器,查詢自定義表的Cron表達式進行任務調度

學習目標:

通過自定義的CronScheduler調度器在兼容標準的調度器的情況下,查詢自定義任務表去生成調度任務并分配給celery worker進行執行

不了解Celery框架的小伙伴可以先看一下我的上一篇文章:Celery框架組件分析及使用


學習內容:

  1. 創建自定義的Scheduler,設置自定義的Scheduler實現對原有配置的定時任務的兼容
  2. 如何啟動自定義Scheduler對任務進行調度
  3. 創建自定義Scheduler遇到的一些問題

如何創建自定義的Scheduler:

在創建自定義Scheduler之前,我們先了解一下Scheduler類,這個類是Celery Beat的核心類,維護了任務的創建邏輯,調度邏輯。入下圖是整個celery-beat啟動執行流程圖

     +---------------------+| Celery Beat 啟動    |+----------+----------+|v+----------+----------+| Service.start()     |  --> 初始化 Scheduler(此時調用 setup_schedule)+----------+----------+|v+----------+----------+| scheduler.tick()    | 每隔 interval 觸發一次,把當前任務按最近執行時間放入 _heap(調度堆)+----------+----------+|v+--------------------------+| schedule.get_schedule()  || 返回所有任務 ScheduleEntry | 此處會獲取所有的可執行的調度任務,需要重寫+--------------------------+|v+-----------------------------+| entry.is_due()              || 判斷任務是否需要執行		   |+-----------------------------+|如果是 True           否則等待下次 tick|v+-----------------------------+| apply_async(entry)          || 發出任務執行                  |+-----------------------------+

要定義一個DjangoCronScheduler繼承Scheduler,需要封裝自己的數據庫Cron表達式數據加載邏輯

import logging
import refrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from django.db import close_old_connectionsfrom record.models import KeywordVideoSchedule, AccountVideoSchedulelogger = logging.getLogger(__name__)class DjangoCronScheduler(Scheduler):"""從Django模型動態加載cron任務的調度器"""def __init__(self, *args, **kwargs):self._schedule = {}self._cron_schedule = {}self._last_refresh = Noneself._refresh_interval = kwargs.pop('refresh_interval', 3600)  # 默認3600秒刷新一次super().__init__(*args, **kwargs)# 第一步,初始化所有的動態cron表達式的任務def setup_schedule(self):"""初始化調度"""super().setup_schedule()# 加載靜態配置中的任務(即 app.conf.beat_schedule)for name, entry in self.app.conf.beat_schedule.items():if isinstance(entry, dict):self._schedule[name] = self.Entry(name=name,task=entry['task'],schedule=entry['schedule'],args=entry.get('args', []),kwargs=entry.get('kwargs', {}),options=entry.get('options', {}),app=self.app)else:# 已經是 ScheduleEntry 的實例self._schedule[name] = entryself._refresh_cron_tasks(force=True)def tick(self):"""重寫tick方法,處理動態刷新"""if self.should_refresh():self._refresh_cron_tasks()return super().tick()def should_refresh(self):"""檢查是否需要刷新任務"""if self._last_refresh is None:return Truenow = self.app.now()return (now - self._last_refresh).total_seconds() >= self._refresh_intervaldef _refresh_cron_tasks(self, force=False):"""從Django模型刷新cron任務"""logger.info(f'begin refresh_cron_tasks: force={force}')try:close_old_connections()  # 確保數據庫連接有效# 獲取所有啟用的任務(關鍵詞視頻任務)db_tasks_keyword = {f'keyword_video_{task.id}': taskfor task in KeywordVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 獲取所有啟用的任務(賬號視頻任務)db_tasks_account = {f'account_video_{task.id}': taskfor task in AccountVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)}# 合并兩個字典db_tasks = {**db_tasks_keyword, **db_tasks_account}current_names = set(self._cron_schedule.keys())new_names = set(db_tasks.keys())# 刪除不再存在的任務for name in current_names - new_names:del self._cron_schedule[name]logging.info(f"Removed cron task: {name}")# 添加或更新任務for name, db_task in db_tasks.items():existing = self._cron_schedule.get(name)# 判斷任務類型,并賦值任務名if name.startswith('keyword_video_'):task_name = 'record.tasks.spider_keyword_video'elif name.startswith('account_video_'):task_name = 'record.tasks.spider_account_video'else:logger.error(f"Unknown cron task: {name}")continue# 如果是新任務或cron表達式有變化if not existing or getattr(existing, 'cron_expr', None) != db_task.collect_cron:try:crontab_schedule = crontab(*self._parse_cron_expr(db_task.collect_cron))entry = ScheduleEntry(name=name,task=task_name,schedule=crontab_schedule,args=[db_task.id],kwargs={},options={},total_run_count=0,last_run_at=existing.last_run_at if existing else None)self._cron_schedule[name] = entrylogging.info(f"{'Updated' if existing else 'Added'} cron task: {name} ({db_task.collect_cron})")except ValueError as e:logging.error(f"Invalid cron expression {db_task.collect_cron} for task {name}: {str(e)}")elif force:# 強制刷新時更新其他參數self._cron_schedule[name].update({'args': [db_task.id],'kwargs': {}})self._last_refresh = self.app.now()except Exception as e:logging.error(f"Failed to refresh cron tasks: {str(e)}")finally:close_old_connections()  # 清理數據庫連接def _parse_cron_expr(self, cron_expr):"""解析cron表達式為celery crontab參數"""# 標準的cron表達式是包含秒的,但是python的crontab對象最小的單位為分,如果是6位含s的數據,需要舍棄前面的秒的處理cron_parts = cron_expr.strip().split()if len(cron_parts) == 6:# 忽略秒段(第 0 段)cron_parts = cron_parts[1:]if len(cron_parts) != 5:logger.error(f"無效的 Cron 表達式: {cron_expr} (需要5或6個字段)")raise ValueError("Cron expression must have 5 parts or 6 parts")logger.info(f'cron_parts : {cron_parts}')# 通用轉換規則converted = []for part in cron_parts:# 規則1: 將 0/x 轉換為 */x。celery不支持0/5這種格式if re.match(r'^0/\d+$', part):part = part.replace('0/', '*/')# 規則2: 將 1-5/x 轉換為 1-5/x (保持不變)# 規則3: 保持 *、數字、, 等標準語法不變converted.append(part)return (converted[0],  # minuteconverted[1],  # hourconverted[2],  # day_of_monthconverted[3],  # month_of_yearconverted[4]  # day_of_week)@propertydef schedule(self):"""確保 Celery Beat 能識別自定義調度任務"""return self.schedule()def get_schedule(self):"""獲取當前調度任務"""return {**self._schedule, **self._cron_schedule}
  1. 定義一個_schedule屬性接收原有的通過代碼編寫的固定cron表達式的執行task,例如現有項目在celery_app.py當中配置的
app.conf.beat_schedule = {'resume_autoclip_task_5min': {'task': 'record.comsumers.resume_autoclip_task','schedule': crontab(minute='*/1')},'flash_all_template_view_and_like_1day': {'task': 'record.comsumers.flash_all_template_view_and_like','schedule': crontab(hour='1', minute='0')}
}
  1. 定義一個 _cron_schedule 屬性用來接收數據庫當中配置的動態cron表達式的數據,根據這個表達式生成ScheduleEntry對象,也就是每個動態需要執行task的對象.

  2. 定義一個_refresh_interval屬性進行刷新,因為我們的任務數據是配置在數據,數據可能會刪除,也可能會修改,暫時還沒考慮實時去更新Scheduler的條件下,我們就需要考慮動態刷新的額情況

  3. 對設置的屬性進行初始化賦值,我們需要關注的函數為setup_schedule和tick setup_schedule函數是sever啟動的初始化入口,我們可以在項目啟動時把我們需要初始化的數據都初始化進去,我們需要額外注意的是crontab對象接收的cron表達式和我們傳統不一樣,一般我們傳統的cron表達式都是6位,crontab只接收5位參數,不攜帶秒的那一位,而且crontab不接收0/5這種格式,會報錯。所以我在此處特殊處理了cron表達式的,將其通過_parse_cron_expr轉換成crontab對象能夠接收的參數。

  4. def schedule(self)函數的作用,tick() 方法將按照執行時間獲取可執行的任務,會觸發調用schedule(self)方法,該方法會將系統代碼配置任務和cron表達式的任務合并返回給調度器。理論上在setup_schedule階段也可以將所有的數據都加載到同一個_schedule屬性下,但是博主測試之后發現不成功,只能使用該方式進行處理了。

至此我們通過自定義的Scheduler就算編寫完成了,下面我們來配置及啟動測試。


如何啟動自定義Scheduler對任務進行調度

  • 首先我們使用了celery beat組件,需要我們在項目當中引入celery beat,博主項目結合使用的是Django框架,所以在setting.py文件的屬性INSTALLED_APPS 當中添加組件django_celery_beat
INSTALLED_APPS = ['django_celery_beat',  # 需要引入celery_beat進行任務生成調度
]
  • 啟動celery beat使用自定義的Scheduler,有兩種方式,一個是通過代碼配置,另外一種是通過命令行參數啟動
    • 通過代碼配置,需要在celery_app.py文件中指定我們的beat_scheduler
      # 使用自定義的調度器,app為你的app名稱,modules為定義的自定義Scheduler的文件名稱
      app.conf.beat_scheduler = 'app.modules.DjangoCronScheduler'
      
    • 配置完成后我們通過標準的啟動方式就能啟動了
       celery -A you_projiect_name beat  -l info
      
    • 啟動完成后即可看到,啟動的提示信息
      在這里插入圖片描述
  • 啟動參數指定啟動的Scheduler,此時我們就不需要配置celery_app.py當中的beat_scheduler,直接在啟動命令行上去配置
    celery -A autoclip beat -S app.modules.DjangoCronScheduler -l info
    

創建自定義Scheduler遇到的一些問題

  • 在編寫自定義Scheduler之前,博主也不是很了解Scheduler的執行原理,只是看到網上很多使用了Scheduler自帶的cron配置頁去配置的cron表達式任務,它是有自己兩張固定的表結構的,如果想使用自己的cron表達式和表數據,還必須將自己的數據同步到django_celery_beat_periodictaskdjango_celery_beat_crontabschedule表當中。博主就想能不能使用自己的表去執行任務,統計結果,因為每次網上述的兩個表同步數據感覺處理下來也不是很方便。
  • Scheduler數據的加載的時機和機制,如果不了解這個的話,確實是無從下手,就算初始化了數據也不清楚是怎么同步到任務堆里的,需要好好看一下上述的執行流程圖。
  • crontab對象的參數和標準的cron表達式不一致導致的報錯,包括長度的區別,對于0/4這種格式的處理區別。
  • 理論上還確少,數據動態變化時實時的變更schedule的方法,博主這邊偷懶使用了定時刷新的機制,這個使用場景對于配置實時性感知要求不高的情況下是滿足要求的。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/81081.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/81081.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/81081.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

藍橋杯 1. 確定字符串是否包含唯一字符

確定字符串是否包含唯一字符 原題目鏈接 題目描述 實現一個算法來識別一個字符串的字符是否是唯一的(忽略字母大小寫)。 若唯一,則輸出 YES,否則輸出 NO。 輸入描述 輸入一行字符串,長度不超過 100。 輸出描述 輸…

a-upload組件實現文件的上傳——.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt

實現下面的上傳/下載/刪除功能&#xff1a;要求支持&#xff1a;【.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt】 分析上面的效果圖&#xff0c;分為【上傳】按鈕和【文件列表】功能&#xff1a; 解決步驟1&#xff1a;上傳按鈕 直接上代碼&#xff1a; <a-uploadmultip…

.NET Core 數據庫ORM框架用法簡述

.NET Core ORM框架用法簡述 一、主流.NET Core ORM框架概述 在.NET Core生態系統中&#xff0c;主流的ORM(Object-Relational Mapping)框架包括&#xff1a; ??Entity Framework Core (EF Core)?? - 微軟官方推出的ORM框架??Dapper?? - 輕量級微ORM??Npgsql.Entit…

halcon打開圖形窗口

1、dev_open_window 參數如下&#xff1a; 1&#xff09;Row(輸入參數) y方向上&#xff0c;圖形窗口距離左上角頂端的像素個數 2&#xff09;Column(輸入參數) x方向上&#xff0c;距離左上角左邊的像素個數 3&#xff09;Width(輸入參數) 圖形窗口寬度 4&#xff09;He…

2025東三省D題深圳杯D題數學建模挑戰賽數模思路代碼文章教學

完整內容請看文章最下面的推廣群 一、問題一&#xff1a;混合STR圖譜中貢獻者人數判定 問題解析 給定混合STR圖譜&#xff0c;識別其中的真實貢獻者人數是后續基因型分離與個體識別的前提。圖譜中每個位點最多應出現2n個峰&#xff08;n為人數&#xff09;&#xff0c;但由…

iView Table 組件跨頁選擇功能實現文檔

iView Table 組件跨頁選擇功能實現文檔 功能概述 實現基于 iView Table 組件的多選功能&#xff0c;支持以下特性&#xff1a; ? 跨頁數據持久化選擇? 當前頁全選/取消全選? 自動同步選中狀態顯示? 分頁切換狀態保持? 高性能大數據量支持 實現方案 技術棧 iView UI 4…

家庭服務器IPV6搭建無限郵箱系統指南

qq郵箱操作 // 郵箱配置信息 // 注意&#xff1a;使用QQ郵箱需要先開啟IMAP服務并獲取授權碼 // 設置方法&#xff1a;登錄QQ郵箱 -> 設置 -> 賬戶 -> 開啟IMAP/SMTP服務 -> 生成授權碼 服務器操作 fetchmail 同步QQ郵箱 nginx搭建web顯示本地同步過來的郵箱 ssh…

Tauri v1 與 v2 配置對比

本文檔對比 Tauri v1 和 v2 版本的配置結構和內容差異&#xff0c;幫助開發者了解版本變更并進行遷移。 配置結構變化 v1 配置結構 {"package": { ... },"tauri": { "allowlist": { ... },"bundle": { ... },"security":…

對js的Date二次封裝,繼承了原Date的所有方法,增加了自己擴展的方法,可以實現任意時間往前往后推算多少小時、多少天、多少周、多少月;

封裝js時間工具 概述 該方法繼承了 js 中 Date的所有方法&#xff1b;同時擴展了一部分自用方法&#xff1a; 1、任意時間 往前推多少小時&#xff0c;天&#xff0c;月&#xff0c;周&#xff1b;參數1、2必填&#xff0c;參數3可選beforeDate(num,formatter,dateVal); befo…

TimeDistill:通過跨架構蒸餾的MLP高效長期時間序列預測

原文地址&#xff1a;https://arxiv.org/abs/2502.15016 發表會議&#xff1a;暫定&#xff08;但是Star很高&#xff09; 代碼地址&#xff1a;無 作者&#xff1a;Juntong Ni &#xff08;倪浚桐&#xff09;, Zewen Liu &#xff08;劉澤文&#xff09;, Shiyu Wang&…

DeepSeek最新大模型發布-DeepSeek-Prover-V2-671B

2025 年 4 月 30 日&#xff0c;DeepSeek 開源了新模型 DeepSeek-Prover-V2-671B&#xff0c;該模型聚焦數學定理證明任務&#xff0c;基于混合專家架構&#xff0c;使用 Lean 4 框架進行形式化推理訓練&#xff0c;參數規模達 6710 億&#xff0c;結合強化學習與大規模合成數據…

如何用AI生成假期旅行照?

以下是2025年最新AI生成假期旅行照片的實用工具推薦及使用指南&#xff0c;結合工具特點、研發背景和適用場景進行綜合解析&#xff1a; 一、主流AI旅行照片生成工具推薦與對比 1. 搜狐簡單AI&#xff08;國內工具&#xff09; ? 特點&#xff1a; ? 一鍵優化與背景替換&…

ElaticSearch

ElaticSearch: 全文搜索 超級強&#xff0c;比如模糊查詢、關鍵詞高亮等 海量數據 高效查詢&#xff0c;比傳統關系數據庫快得多&#xff08;尤其是搜索&#xff09; 靈活的數據結構&#xff08;Schema靈活&#xff0c;可以動態字段&#xff09; 分布式高可用&#xff0c;天…

Android開發,實現一個簡約又好看的登錄頁

文章目錄 1. 編寫布局文件2.設計要點說明3. 效果圖4. 關于作者其它項目視頻教程介紹 1. 編寫布局文件 編寫activity.login.xml 布局文件 <?xml version"1.0" encoding"utf-8"?> <androidx.appcompat.widget.LinearLayoutCompat xmlns:android…

機器學習:【拋擲硬幣的貝葉斯后驗概率】

首先,拋硬幣的問題通常涉及先驗概率、似然函數和后驗概率。假設用戶可能想通過觀察一系列的正面(H)和反面(T)來更新硬幣的偏差概率。例如,先驗可能假設硬幣是均勻的,但隨著觀察到更多數據,用貝葉斯定理計算后驗分布。 通常,硬幣的偏差可以用Beta分布作為先驗,因為它…

Echarts 問題:自定義的 legend 點擊后消失,格式化 legend 的隱藏文本樣式

文章目錄 問題分析實現步驟代碼解釋問題 如下圖所示,在自定義的 legend 點擊后會消失 分析 我把隱藏的圖例字體顏色設為灰色,可以借助 legend.formatter 和 legend.textStyle 結合 option.series 的 show 屬性來達成。以下是具體的實現步驟和示例代碼: <!DOCTYPE ht…

光譜相機如何提升目標檢測與識別精度

光譜相機&#xff08;多光譜/高光譜&#xff09;通過捕捉目標在多個波段的光譜特征&#xff0c;能夠揭示傳統RGB相機無法感知的材質、化學成分及物理特性差異。以下是提升其目標檢測與識別精度的核心方法&#xff1a; ?1. 硬件優化&#xff1a;提升數據質量? ?(1) 光譜分辨…

springboot項目配置nacos,指定使用環境

遇到這樣一個問題&#xff0c;在開發、測試、生成環境之間切換的問題。 大多數的操作是通過修改spring.profiles.active來確定指向使用的環境配置文件&#xff0c;對應項目中需要增加對應的配置文件。 但是現在幾乎所有公司都會有代碼管理不管是SVN、git&#xff0c;這樣就會涉…

AI代碼審查的落地實施方案 - Java架構師面試實戰

AI代碼審查的落地實施方案 - Java架構師面試實戰 本文通過模擬一位擁有十年Java研發經驗的資深架構師馬架構與面試官之間的對話&#xff0c;深入探討了AI代碼審查的落地實施方案。 第一輪提問 面試官&#xff1a; 馬架構&#xff0c;請介紹一下您對AI代碼審查的理解。 馬架…

TDengine 訂閱不到數據問題排查

簡介 TDengine 在實際生產應用中&#xff0c;經常會遇到訂閱程序訂閱不到數據的問題&#xff0c;總結大部分都為使用不當或狀態不正確等問題&#xff0c;需手工解決。 查看服務端狀態 通過 sql 命令查看有問題的 topic 和consumer_group 組訂閱是否正常。 select * from inf…