FastAPI集成APsecheduler的BackgroundScheduler+mongodb(精簡)

項目架構:

? ? FastAPI(folder)

? ? ? ? ? ? ? ? ?>app(folder)

? ? ? ? ? ? ? ? ?>core(folder)

? ? ? ? ? ? ? ? ?>models(folder)

? ? ? ? ? ? ? ? ?>routers(folder)

? ? ? ? ? ? ? ? ?>utils(folder)

? ? ? ? ? ? ? ? ? ?main.py(file)

1 utils文件夾下新建schedulers.py? ?

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

#特殊說明:此處顯示指定DB,會在DB里創建數據庫apscheduler_db

jobstores={

? ? ? ? 'default':MongoDBJobStore(

? ? ? ? ? ? ? database='apscheduler_db',

? ? ? ? ? ? ? collection='custom_jobs', ?

? ? ? ? ? ? ? host='localhost',

? ? ? ? ? ? ? port=27017

? ? ? ? )

}

#特殊說明:replace_existing=True會覆蓋同名的JOB,但不影響數據庫中的,僅處理job_id相同的沖突

scheduler=BackgroundScheduler(jobstores=jobstores,replace_existing=True)

2? main.py中在lifespan上下文初始化和關閉scheduler

import uvicorn

from contextlib import asynccontextmanager

from app.utils.schedulers import jobstores

scheduler=None

#特殊說明:yield中可以監控到正常結束比如ctrl+c,異常結束不能執行yield后代碼

@asynccontextmanager

async def lifespan(app:FastAPI):

? ? jobstores['default'].remove_all_jobs()

? ? from app.utils.schedulers import scheduler

? ? yield

? ? scheduler.remove_all_jobs()

? ? scheduler.shutdown(wait=False)

app=FastAPI(lifespan=lifespan)

?3 models文件夾新建scheduler.py文件配置基礎參數類和默認值

from pydantic import BaseModel

class job_config(BaseModel):

? ? ? ? job_id:str="default"

? ? ? ? job_name:str="default"

? ? ? ? trigger_type:str="interval"

? ? ? ? trigger_kwargs:dict={}

? ? ? ? seconds:int=30

? ? ? ? pass

4 api文件夾下添加sechedulers.py配置添加創建job方法

from app.utils.schedulers import scheduler

from app.models.schedulers import job_config

from fastapi import APIRouter

from typing import Coroutine,Callable

from datetime import datetime

@router.get("create_job")

def create_job(jobconfig,func):

? ? try:

? ? ? ? if jobconfig is None:

? ? ? ? ? ?jobconfig=job_config()

? ? ? ? scheduler.add_job(

? ? ? ? ? ? func,

? ? ? ? ? ? trigger=jobconfig.trigger_type,

? ? ? ? ? ? kwargs=jobconfig.trigger_kwargs,#特殊說明:這里可以添加自定義參數

? ? ? ? ? ? id=jobconfig.job_id,

? ? ? ? ? ? name=jobconfig.job_name,

? ? ? ? ? ? seconds=jobconfig.seconds

? ? ? ? )

? ? ? ?

? ? ? ? scheduler.start()

? ? except Exception as e:

? ? ? ? raise e

? ? except (KeyboardInterrupt, SystemExit):

? ? ? ? scheduler.shutdown()

?5 測試,調用test方法

@router.get("/function")

def function1():

? ? try:

? ? ? ? with open("D:\\demo.txt", "a") as file:

? ? ? ? ? ? ?print("寫入文件"+ str(datetime.now()), file=file)

? ? except:

? ? ? ? pass

@router.get("/test")

def test():

? ? try:

? ? ? ? create_job(None,function1)

? ? except:

? ? ? ? pass

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/82895.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/82895.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/82895.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

聊一聊接口測試中耗時請求如何合理安排?

目錄 一、異步處理與輪詢機制 輪詢檢查機制 二、 并行化測試執行 三、模擬與樁技術(Mock/Stub) 四、動態超時與重試策略 五、測試架構設計優化 分層測試策略 并行化執行 網絡優化 六、測試用例分層管理 金字塔策略 七、 緩存與數據復用 響應…

深入詳解DICOMweb:WADO與STOW-RS的技術解析與實現

🧑 博主簡介:CSDN博客專家、CSDN平臺優質創作者,高級開發工程師,數學專業,10年以上C/C, C#, Java等多種編程語言開發經驗,擁有高級工程師證書;擅長C/C、C#等開發語言,熟悉Java常用開…

Splunk Validated Architecture (SVA):構建企業級可觀測性與安全的基石

Splunk Validated Architecture (SVA) 是 Splunk 官方提供的一套經過嚴格測試、性能驗證和最佳實踐指導的參考架構藍圖。它并非單一固定方案,而是根據企業數據規模、性能需求、高可用性目標和合規要求,提供一系列可落地的部署模型。SVA 的核心價值在于為…

Armv7l或樹莓派32位RPI 4B編譯faiss

pip3 install faiss-cpu當然找不到預編譯的包 手動下載 git clone https://github.com/facebookresearch/faiss.git cd faiss #能需要切換到特定版本標簽,例如 v1.7.1,這個版本Cmake 3.18可以過,因為apt install安裝的cmake只更新到這里&am…

C++之string的模擬實現

string 手寫C字符串類類的基本結構與成員變量一、構造函數與析構函數二、賦值運算符重載三、迭代器支持四、內存管理與擴容機制五、字符串操作函數六、運算符重載總結 手寫C字符串類 從零實現一個簡易版std::string 類的基本結構與成員變量 namespace zzh { class string { …

修改Docker鏡像源

配置文件位置: sudo vim /etc/docker/daemon.json Docker 或 containerd 的鏡像加速器配置,旨在提高從 Docker Hub 拉取鏡像的速度。 { "features": { "buildkit": true, "containerd-snapshotter": true }, …

服務器帶寬線路的區別(GIA、CN2、BGP、CMI等)

服務器帶寬線路的區別(GIA、CN2、BGP、CMI等) 一、BGP線路 1. 定義與技術特點 BGP(Border Gateway Protocol,邊界網關協議)是一種用于不同自治系統(AS)之間交換路由信息的協議,屬…

從0到1搭建AI繪畫模型:Stable Diffusion微調全流程避坑指南

從0到1搭建AI繪畫模型:Stable Diffusion微調全流程避坑指南 系統化學習人工智能網站(收藏):https://www.captainbed.cn/flu 文章目錄 從0到1搭建AI繪畫模型:Stable Diffusion微調全流程避坑指南摘要引言一、數據集構…

VSCode + GD32F407 構建燒錄

前言 最近調試一塊 GD32F407VET6(168Mhz,8Mhz晶振) 板子時,踩了一些“啟動失敗”的坑。本以為是時鐘配置有誤,最后發現是鏈接腳本(.ld 文件)沒有配置好,導致程序根本沒能正常執行 ma…

AI繪畫提示詞:從零開始掌握Prompt Engineering的藝術

文章目錄 什么是AI繪畫提示詞?提示詞的基本結構主體描述場景/背景風格指定技術參數負面提示人物肖像模板風景模板 高級技巧權重調整混合風格顏色控制情緒氛圍 常見問題與解決方法手部變形問題構圖不理想風格不夠突出 提示詞示例庫科幻場景奇幻人物靜物畫 結語 在當今…

在 Linux 上安裝 Minikube:輕松搭建本地 Kubernetes 單節點集群

🔥「炎碼工坊」技術彈藥已裝填! 點擊關注 → 解鎖工業級干貨【工具實測|項目避坑|源碼燃燒指南】 一、Minikube 是什么? Minikube 是 Kubernetes 官方推出的輕量級工具,專為開發者設計,用于在本地快速搭建單節點 Kube…

day41 python圖像識別任務

目錄 一、數據預處理:為模型打下堅實基礎 二、模型構建:多層感知機的實現 三、訓練過程:迭代優化與性能評估 四、測試結果:模型性能的最終檢驗 五、總結與展望 在深度學習的旅程中,多層感知機(MLP&…

JS數組 concat() 與擴展運算符的深度解析與最佳實踐

文章目錄 前言一、語法對比1. Array.prototype.concat()2. 擴展運算符(解構賦值) 二、性能差異(大規模數組)關鍵差異原因 三、適用場景建議總結 前言 最近工作中遇到了一個大規模數組合并相關的問題,在數據合并時有些…

一套qt c++的串口通信

實現了創建線程使用串口的功能 具備功能: 1.線程使用串口 2.定時發送隊列內容,防止粘包 3.沒處理接收粘包,根據你的需求來,handleReadyRead函數中,可以通過m_receiveBuffer來緩存接收,然后拆分數據來處理 源碼 seri…

設計模式-發布訂閱

文章目錄 發布訂閱概念發布訂閱 vs 監聽者例子代碼 發布訂閱概念 發布/訂閱者模式最大的特點就是實現了松耦合,也就是說你可以讓發布者發布消息、訂閱者接受消息,而不是尋找一種方式把兩個分離 的系統連接在一起。當然這種松耦合也是發布/訂閱者模式最大…

windows-cmd 如何查詢cpu、內存、磁盤的使用情況

在 Windows 中,您可以使用命令提示符(CMD)通過一些命令來查詢 CPU、內存和磁盤的使用情況。以下是常用的命令和方法: 1. 查詢 CPU 使用情況 使用 wmic 命令 wmic cpu get loadpercentage 這個命令會顯示當前 CPU 的使用百分比…

allWebPlugin中間件VLC專用版之截圖功能介紹

背景 VLC控件原有接口具有視頻截圖方法,即video對象的takeSnapshot方法,但是該方法返回的是一個IPicture對象,不適合在谷歌等現代瀏覽器上使用。因此,本人增加一個新的視頻截圖方法takeSnapshot2B64方法,直接將視頻截圖…

第Y5周:yolo.py文件解讀

🍨 本文為🔗365天深度學習訓練營 中的學習記錄博客🍖 原作者:K同學啊 本次任務:將YOLOv5s網絡模型中的C3模塊按照下圖方式修改形成C2模塊,并將C2模塊插入第2層與第3層之間,且跑通YOLOv5s。 任務…

寶塔安裝ssh證書報錯:/usr/bin/curl: symbol lookup error: curl_easy_header

原因: 你當前的 curl 命令版本是 7.70.0(不是系統默認版本,應該是你手動安裝的)。它鏈接的是 /usr/local/lib/libcurl.so.4,而不是 CentOS 系統默認的 /usr/lib64/libcurl.so.4。/usr/local/lib/libcurl.so.4 很可能是…

Apache SeaTunnel 引擎深度解析:原理、技術與高效實踐

Apache SeaTunnel 作為新一代高性能分布式數據集成平臺,其核心引擎設計融合了現代大數據處理架構的精髓。 Apache SeaTunnel引擎通過分布式架構革新、精細化資源控制及企業級可靠性設計,顯著提升了數據集成管道的執行效率與運維體驗。其模塊化設計允許用…