項目架構:
? ? FastAPI(folder)
? ? ? ? ? ? ? ? ?>app(folder)
? ? ? ? ? ? ? ? ?>core(folder)
? ? ? ? ? ? ? ? ?>models(folder)
? ? ? ? ? ? ? ? ?>routers(folder)
? ? ? ? ? ? ? ? ?>utils(folder)
? ? ? ? ? ? ? ? ? ?main.py(file)
1 utils文件夾下新建schedulers.py? ?
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
#特殊說明:此處顯示指定DB,會在DB里創建數據庫apscheduler_db
jobstores={
? ? ? ? 'default':MongoDBJobStore(
? ? ? ? ? ? ? database='apscheduler_db',
? ? ? ? ? ? ? collection='custom_jobs', ?
? ? ? ? ? ? ? host='localhost',
? ? ? ? ? ? ? port=27017
? ? ? ? )
}
#特殊說明:replace_existing=True會覆蓋同名的JOB,但不影響數據庫中的,僅處理job_id相同的沖突
scheduler=BackgroundScheduler(jobstores=jobstores,replace_existing=True)
2? main.py中在lifespan上下文初始化和關閉scheduler
import uvicorn
from contextlib import asynccontextmanager
from app.utils.schedulers import jobstores
scheduler=None
#特殊說明:yield中可以監控到正常結束比如ctrl+c,異常結束不能執行yield后代碼
@asynccontextmanager
async def lifespan(app:FastAPI):
? ? jobstores['default'].remove_all_jobs()
? ? from app.utils.schedulers import scheduler
? ? yield
? ? scheduler.remove_all_jobs()
? ? scheduler.shutdown(wait=False)
app=FastAPI(lifespan=lifespan)
?3 models文件夾新建scheduler.py文件配置基礎參數類和默認值
from pydantic import BaseModel
class job_config(BaseModel):
? ? ? ? job_id:str="default"
? ? ? ? job_name:str="default"
? ? ? ? trigger_type:str="interval"
? ? ? ? trigger_kwargs:dict={}
? ? ? ? seconds:int=30
? ? ? ? pass
4 api文件夾下添加sechedulers.py配置添加創建job方法
from app.utils.schedulers import scheduler
from app.models.schedulers import job_config
from fastapi import APIRouter
from typing import Coroutine,Callable
from datetime import datetime
@router.get("create_job")
def create_job(jobconfig,func):
? ? try:
? ? ? ? if jobconfig is None:
? ? ? ? ? ?jobconfig=job_config()
? ? ? ? scheduler.add_job(
? ? ? ? ? ? func,
? ? ? ? ? ? trigger=jobconfig.trigger_type,
? ? ? ? ? ? kwargs=jobconfig.trigger_kwargs,#特殊說明:這里可以添加自定義參數
? ? ? ? ? ? id=jobconfig.job_id,
? ? ? ? ? ? name=jobconfig.job_name,
? ? ? ? ? ? seconds=jobconfig.seconds
? ? ? ? )
? ? ? ?
? ? ? ? scheduler.start()
? ? except Exception as e:
? ? ? ? raise e
? ? except (KeyboardInterrupt, SystemExit):
? ? ? ? scheduler.shutdown()
?5 測試,調用test方法
@router.get("/function")
def function1():
? ? try:
? ? ? ? with open("D:\\demo.txt", "a") as file:
? ? ? ? ? ? ?print("寫入文件"+ str(datetime.now()), file=file)
? ? except:
? ? ? ? pass
@router.get("/test")
def test():
? ? try:
? ? ? ? create_job(None,function1)
? ? except:
? ? ? ? pass