APscheduler簡介
APscheduler全稱Advanced Python Scheduler,作用為在指定的時間規則執行指定的作業,其是基于Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基于日期、固定時間間隔以及crontab類型的任務,并且可以持久化任務。基于這些功能,我們可以很方便的實現一個python定時任務系統。
組成部分說明
# 觸發器(trigger):
包含調度邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運行。
除了他們自己初始配置意外,觸發器完全是無狀態的。# 作業存儲(job store):
存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。
一個作業的數據講在保存在持久化作業存儲時被序列化,并在加載時被反序列化。
調度器不能分享同一個作業存儲。# 執行器(executor):
處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。
當作業完成時,執行器將會通知調度器。# 調度器(scheduler):
通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。
配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。
框架對比:Celery VS APScheduler:
celery:
celery是一個專注于實時處理和任務調度的任務隊列,任務就是消息(消息隊列使用rabbitmq或者redie),消息中的有效載荷中包含要執行任務的全部數據。我們通常將celery作為一個任務隊列來使用,但是celery也有定時任務的功能,但是celery無法在flask這樣的系統中動態的添加定時任務,而且單獨為定時任務功能而搭建celery顯得過于重量級。
apscheduler:
apscheduler是基于Quartz的一個Python定時任務框架,提供了基于日期、固定時間間隔以及crontab類型的任務,并且可以持久化作業。APScheduler算是在實際項目中最好用的一個工具庫,不僅可以在程序中動態的添加和刪除定時任務,還支持持久化。
函數封裝:
# 自定義調用函數
def scheduler_function(request):try:scheduler_id= "示例:123"scheduler_type = "示例:1"task_time = "示例:2024-01-01 00:00:00"task_hour= "示例:1"task_week_days= "示例:mon"task_week_times= "示例:22:00"data= "函數傳參"# 示例:創建定時任務,可自主定義傳參add_scheduler_task(scheduler_type, scheduler_id, task_time, task_hour, task_week_days, task_week_times, data)except Exception as e:return False, f"創建定時任務失敗,原因是:{str(e)}"
# 添加定時任務
def add_scheduler_task(scheduler_type, scheduler_id, task_time, task_hour, task_week_days, task_week_times, data):try:scheduler = BackgroundScheduler()scheduler.add_jobstore(DjangoJobStore(), 'default')if scheduler_type == 1:# 某個時刻執行scheduler.add_job(run_task,'date', id=str(scheduler_id),run_date=task_time, args=(data,))scheduler.start()elif scheduler_type == 2:# 間隔時間性執行scheduler.add_job(run_task,'interval', id=str(scheduler_id), minutes=task_hour, args=(data,))scheduler.start()elif scheduler_type == 3:# 每周幾點執行start_time = task_week_times.split(':')h = int(start_time[0])m = int(start_time[1])scheduler.add_job(run_task, 'cron', id=str(scheduler_id),day_of_week=task_week_days, hour=h, minute=m,args=(data,))scheduler.start()return True, "創建定時任務成功"except Exception as e:return False, f"創建定時任務失敗,原因是:{str(e)}"# 刪除定時任務
def remove_scheduler(scheduler_id):try:scheduler = BackgroundScheduler()scheduler.add_jobstore(DjangoJobStore(), 'default')DjangoJob.objects.filter(id=str(scheduler_id)).delete()return True, "刪除定時任務成功"except Exception as e:return False, f"刪除定時任務失敗,原因是:{str(e)}"# 獲取定時任務,判斷是否存在,若有,則返回下一次執行的時間
def get_scheduler(scheduler_id):try:job = DjangoJob.objects.filter(id=str(scheduler_id)).values().first()if job:return True, job["next_run_time"]else:return False, ""except Exception as e:return False, f"查詢定時任務列表失敗,原因:{str(e)}"
總結:
以上封裝的函數可以解決絕大部分所需要的定時任務類型,分別是單次執行,間隔執行,周期性執行,滿足絕大多數的自動化測試使用,注:個人愚見,僅供參考!!!