一、系統環境依賴
系統:windows10
python: python==3.9.0
djnago==3.2.0
APScheduler==3.10.1
二、django項目配置
1、創建utils包,在包里面創建schedulers包
utils/schedulers/task.py
#1、設置 Django 環境,就可以導入項目的模型類這些了
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "項目根目錄名.settings")
import django
django.setup()
#2、一些需要的模塊
from datetime import datetime,timedelta,date#3、django項目中模型類NOW_DATETIME = datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S')
NOW_DATE = date.today().strftime('%Y-%m-%d')def example_interval():'''每隔一段固定時間就執行一次:return:'''print('interval',NOW_DATETIME)def example_cron():'''在每天的固定時間執行:return:'''print('cron,凌晨開始執行的定時任務')def example_date():'''在指定日期執行一次,就執行一次:return:'''print(f'date,指定日期執行一次:{NOW_DATETIME}')
utils/schedulers/scheduler.py
# 2、導入所需的調度器類和觸發器類
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from pytz import timezone
from threading import RLock
from django.conf import settings
from datetime import datetime,timedelta
lock = RLock()#3、導入定時任務
from .task import example_interval #例子,時間間隔,每隔一段時間執行
from .task import example_cron #指定時間執行,在指定時間點執行
from .task import example_date #指定日期執行,執行一次class __SchedulerManage(BackgroundScheduler):_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance:return cls._instancewith lock:if cls._instance:return cls._instancecls._instance = super().__new__(cls)return cls._instancedef __init__(self):super().__init__()# 1、設置時區self.timezone = timezone(settings.TIME_ZONE)# 2、使用內存存儲定時任務信息jobstore_redis = RedisJobStore(host='localhost', port=6379, db=0, password='redis密碼')jobstore_memory = MemoryJobStore()self.add_jobstore(jobstore_memory)# 3、添加任務self.add_task()def add_task(self):'''自定義的功能: 用來添加定時任務的:return:''''1、三種觸發器的例子'#每隔一段固定時間段執行一次,1小時執行一次,設置開始時間是啟動時間后的3分鐘self.add_job(example_interval, trigger=IntervalTrigger(hours=1,start_date=datetime.now()+timedelta(minutes=3)), id='example_interval', replace_existing=True)#設置每天的11:03:10 執行一次self.add_job(example_cron,trigger=CronTrigger(hour=11,minute=3,second=10),id='example_cron',replace_existing=True)#設置在2023-08-10 11:03:01執行一次,只執行一次self.add_job(example_date,trigger=DateTrigger(run_date=datetime(2023,8,10,11,3,1)),id='example_date',replace_existing=True)#也可以在實例化時設置時區:__SchedulerManage(timezone=timezone('Asia/Shanghai'))
scheduler_ = __SchedulerManage()
if __name__ == '__main__':#啟動 scheduler_.start() 或者 scheduler_() 兩種方式都okscheduler_()
utils/schedulers/__init__.py
from .scheduler import scheduler_
2、項目配置文件settings.py
####配置定時任務
#啟動定時任務
from utils.schedulers import scheduler_
scheduler_.start()