celery獨立部署接入數據庫配置

目錄結構:

config下配置:

__init__:

import os
import sys
sys.path.append(os.getcwd())
from celery import Celeryapp=Celery('celeryTester') # 創建一個Celery實例,名字自定義
app.config_from_object('config.celery_config') # 從celery_config獲取配置

celery_config:

from .celery_signals import task_failure_handler,task_postrun_handler # 全局信號
broker_url = 'redis://xxx' # 作為任務隊列,beat及手動調用往里添加任務,worker從這里取任務
result_backend = 'redis://xxx'    # 任務結果
broker_connection_retry_on_startup=True # 重連可以不配
# 序列化相關,默認都是json
task_serializer='json'
accept_content=['json']
result_serializer='json'
#時區設置
timezone='Asia/Shanghai'
enable_utc=True
# 從哪些模塊獲取任務,一般是從根目錄啟動worker及beat進程,目錄以根目錄起
include=['tasks.sample_tasks']
# 使用自定義調度器
beat_schedule= {} # 動態獲取這里配置為空
beat_scheduler = 'my_scheduler.dynamic_scheduler.DynamicScheduler' # 使用自定義scheduler類
beat_scheduler_reload_interval = 300  # beat_scheduler多久重新加載,可覆蓋默認值

celery_signals:

import datetimefrom celery.signals import task_postrun, task_failure
from utils.coon import DatabaseConnection@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):# 只記錄有返回值的成功任務,暫時只更新periodic_task表里的狀態if state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': True},# set值'task=%s',(task.name,))# where后condition值@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 任務失敗走此信號機制duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': False},'task=%s',(task.name,))

dynamic_scheduler:

import json
import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from utils.coon import DatabaseConnectionclass DynamicScheduler(Scheduler):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._schedule = {}self._last_reload = time.time()self.reload_interval = kwargs.get('reload_interval', 60*5)  # 默認300秒重載def setup_schedule(self):"""初始化加載調度配置,起beat進程時會調用"""self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""從配置源加載調度配置"""schedule = {}tasks =DatabaseConnection().fetch_all('select * from periodic_task where enabled=1')for item in tasks:name = item['name']if item['interval']:sche = item['interval']else:cron = item['crontab']minute, hour, day, month, week_day = cron.strip().split(' ')sche = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)item['args']=item['args'] and json.loads(item['args'])item['kwargs']=item['kwargs'] and json.loads(item['kwargs'])schedule[name] = ScheduleEntry(name=name,task=item['task'],schedule=sche,args=item.get('args' or ()),kwargs=item.get('kwargs', {}))return scheduledef tick(self, event_t=..., min=..., max=...):"""重載tick方法實現定期檢查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:"""返回當前調度配置"""return self._schedule

sample_tasks:

from config import app@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def hello_world():return "Hello World!"

coon下面就是一個數據庫連接類,自己隨便找個就行這里就不貼出來了

啟動命令:

windows機器下
worker進程:
celery -A app worker --loglevel=info --pool=solo
beat進程
celery -A app beat --loglevel=info

最后解釋下原理,一般最簡單的是在celery_config配置beat_schedule,我們是通過自定義Scheduler類,從數據庫里面取值,類似于動態拿到這個beat_schedule值。好處就是可以直接通過配置修改或者添加定時任務,不用再去代碼修改添加了,并在最開始和結束添加落表等操作

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/906446.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/906446.shtml
英文地址,請注明出處:http://en.pswp.cn/news/906446.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

攻防世界-題目名稱-文件包含

進入環境 看到 include(),想到文件包含,用php偽協議 /?filenamephp://filter/readconvert.base64-encode/resourceflag.php do not hack!猜測可能是黑名單檢測的敏感字符 輸入單個字符串/?filenamebase64 還是顯示do not hack! 構造payl…

MySQL高頻面試八連問(附場景化解析)

文章目錄 "為什么訂單查詢突然變慢了?"——從這個問題開始說起一、索引的生死時速(必考題!)二、事務的"套娃"藝術三、鎖機制的相愛相殺四、存儲引擎的抉擇五、慢查詢的破案技巧六、分頁的深度優化七、高可用架…

Android 中 自定義生成的 APK/AAR 文件名稱

在 Kotlin DSL 中,可以通過配置 build.gradle.kts 文件來自定義生成的 APK 或 AAR 文件名稱。 1、自定義 APK 名稱 在模塊的 build.gradle.kts 中通過修改 applicationVariants.all 配置來實現。 android {......applicationVariants.all {outputs.all {val df …

《從零開始:Spring Cloud Eureka 配置與服務注冊全流程》?

關于Eureka的學習,主要學習如何搭建Eureka,將order-service和product-service都注冊到Eureka。 1.為什么使用Eureka? 我在實現一個查詢訂單功能時,希望可以根據訂單中productId去獲取對應商品的詳細信息,但是產品服務和訂單服…

鴻蒙開發進階:深入解析ArkTS語言特性與高性能編程實踐

一、前言 在鴻蒙生態蓬勃發展的當下,開發者對于高效、優質的應用開發語言需求愈發迫切。ArkTS 作為鴻蒙應用開發的核心語言,在繼承 TypeScript 優勢的基礎上,進行了諸多優化與擴展,為開發者帶來了全新的編程體驗。本文將深入剖析…

ARM-Linux 完全入門

1.準備部分 1.1 虛擬機安裝 準備VMware軟件、ubuntu系統鏡像安裝過程 VMware安裝 破解(自己百度破解碼,多試幾個網址,會有能用的)Ubuntu安裝 配置聯網 橋接 虛擬機Ubuntu系統必須能連接到外網,不然不能更新軟件安裝…

深度學習驅動下的目標檢測技術:原理、算法與應用創新(三)

五、基于深度學習的目標檢測代碼實現 5.1 開發環境搭建 開發基于深度學習的目標檢測項目,首先需要搭建合適的開發環境,確保所需的工具和庫能夠正常運行。以下將詳細介紹 Python、PyTorch 等關鍵開發工具和庫的安裝與配置過程。 Python 是一種廣泛應用于…

致敬經典 << KR C >> 之打印輸入單詞水平直方圖和以每行一個單詞打印輸入 (練習1-12和練習1-13)

1. 前言 不知道有多少同學正在自學C/C, 無論你是一個在校學生, 還是已經是上班族. 如果你想從事或即將從事軟件開發這個行業, C/C都是一個幾乎必須要接觸的系統級程序開發語言. 雖然現在有Rust更安全的系統級編程語言作為C/C的替代, 但作為入門, C應該還是要好好學的. C最早由B…

【Leetcode 每日一題】3355. 零數組變換 I

問題背景 給定一個長度為 n n n 的整數數組 n u m s nums nums 和一個二維數組 q u e r i e s queries queries,其中 q u e r i e s [ i ] [ l i , r i ] queries[i] [l_i, r_i] queries[i][li?,ri?]。 對于每個查詢 q u e r i e s [ i ] queries[i] quer…

[java八股文][Java虛擬機面試篇]垃圾回收

什么是Java里的垃圾回收?如何觸發垃圾回收? 垃圾回收(Garbage Collection, GC)是自動管理內存的一種機制,它負責自動釋放不再被程序引用的對象所占用的內存,這種機制減少了內存泄漏和內存管理錯誤的可能性…

ubuntu服務器版啟動卡在start job is running for wait for...to be Configured

目錄 前言 一、原因分析 二、解決方法 總結 前言 當 Ubuntu 服務器啟動時,系統會顯示類似 “start job is running for wait for Network to be Configured” 或 “start job is running for wait for Plymouth Boot Screen Service” 等提示信息,并且…

Android 手寫簽名功能詳解:從原理到實踐

Android 手寫簽名功能詳解 1. 引言2. 手寫簽名核心實現:SignatureView 類3. 交互層實現:MainActivity 類4. 布局與配置5. 性能優化與擴展方向 1. 引言 在電子政務、金融服務等移動應用場景中,手寫簽名功能已成為提升用戶體驗與業務合規性的關…

【nRF9160 常用prj.conf配置與AT指令介紹】

參考資料: 技術討論:Q群:542294007 nRF91 NCS SDK安裝工具與SDK安裝包等常用軟件下載地址 云盤下載:pan.olib.cn 一、nRF9160 常用prj.conf配置介紹 nRF9160通過prj.conf配置網絡模式為:CAT-M模式 CONFIG_LTE_NETWOR…

小型化邊緣計算設備

以下是關于小型化邊緣計算設備的核心技術與應用特點的綜合分析: 一、核心硬件平臺與算力表現? NVIDIA Jetson Orin系列? Jetson Orin Nano?:配備1024個CUDA核心和32個Tensor核心,支持高達100 TOPS的AI算力,適用于機器人、無…

css使用clip-path屬性切割顯示可見內容

1. 需求 想要實現一個漸變的箭頭Dom&#xff0c;不想使用svg、canvas去畫&#xff0c;可以考慮使用css的clip-path屬性切割顯示內容。 2. 實現 <div class"arrow">箭頭 </div>.arrow{width: 200px;height: 60px;background-image: linear-gradient(45…

Kotlin與物聯網(IoT):Android Things開發探索

在物聯網&#xff08;IoT&#xff09;領域&#xff0c;Kotlin 憑借其簡潔性、安全性和與 Java 生態的無縫兼容性&#xff0c;逐漸成為 Android Things 開發的有力工具。盡管 Google 已于 2022 年宣布停止對 Android Things 的官方支持&#xff0c;但其技術思想仍值得探索&#…

2025年AI搜索引擎發展洞察:技術革新與市場變革

引言&#xff1a;AI搜索的崛起與市場格局重塑 2024-2025年&#xff0c;AI搜索市場迎來了前所未有的變革期。隨著DeepSeek-R1等先進大語言模型的推出&#xff0c;傳統搜索引擎、AI原生搜索平臺以及各類內容平臺紛紛加速智能化轉型&#xff0c;推動搜索技術從基礎信息檢索向深度…

基于 ESP32 與 AWS 全托管服務的 IoT 架構:MQTT + WebSocket 實現設備-云-APP 高效互聯

目錄 一、總體架構圖 二、設備端(ESP32)低功耗設計(適配 AWS IoT) 1.MQTT 設置(ESP32 連接 AWS IoT Core) 2.低功耗策略總結(ESP32) 三、云端架構(基于 AWS Serverless + IoT Core) 1.AWS IoT Core 接入 2.云端 → APP:WebSocket 推送方案 流程: 3.數據存…

【LeetCode 熱題 100】有效的括號 / 最小棧 / 字符串解碼 / 柱狀圖中最大的矩形

??個人主頁&#xff1a;小羊 ??所屬專欄&#xff1a;LeetCode 熱題 100 很榮幸您能閱讀我的文章&#xff0c;誠請評論指點&#xff0c;歡迎歡迎 ~ 目錄 棧有效的括號最小棧字符串解碼每日溫度柱狀圖中最大的矩形 堆數組中的第K個最大元素 棧 有效的括號 有效的括號 cl…

Petalinux

Petalinux 命令 參考《UG 1157 PetaLinux Command Line Reference Guide》 //創建petalinux工程 petalinux-create -t project --template zynq -n <name> //配置工程 cd 上一步的工程 petalinux-config --get-hw-description ../xsa_folder///配置Linux內核 petalinux-…