Django+celery異步:拿來即用,可移植性高

一、依賴環境

1、python解釋器版本:python3.7.5

2、穩定依賴包

# Celery 核心
celery==5.2.7
kombu==5.2.4
billiard==3.6.4.0
vine==5.0.0# Redis broker backend
redis==4.3.6# eventlet (如果用 -P eventlet)【windows系統可以使用】
eventlet==0.33.3
greenlet==1.1.3# 避免 importlib_metadata API 沖突
importlib_metadata<5# Django相關
Django==3.2.25
django-celery-beat==2.3.0
django-celery-results==2.4.0# 數據庫客戶端
PyMySQL==1.1.1
# 日志包
concurrent-log-handler==0.9.28
# 數據庫連接池包
django-db-connection-pool==1.0.7
# 啟動django項目
uwsgi==2.0.22

二、項目結構

  • simple_system
    • settings.py? # django 配置文件
  • celery_app?
    • config? ?
      • config.py? ?# 主要配置
      • logger_config.py? ?# 日志配置
      • scheduler_config.py? # 定時任務配置
    • runs
      • celery_beat.service? ?# 使用systemctl啟動定時進程
      • celery_worker.service? # 使用systemctl啟動異步進程
    • tasks
      • task_async.py? # 存放異步任務
      • task_scheduler.py # 存放定時任務
    • celery.py? # celery 項目啟動入口

gitee上項目源碼

https://gitee.com/liuhaizhang/simple_systemhttps://gitee.com/liuhaizhang/simple_system

三、django相關配置

settings.py

# 使用pymysql代替mysqlclient
import pymysql
pymysql.install_as_MySQLdb()# 禁用debug模式
DEBUG = False
ALLOWED_HOSTS = ["*"]# mysql配置連接池
DATABASES = {"default": {"ENGINE": "dj_db_conn_pool.backends.mysql","NAME": 'simple_system',"USER": 'root',"PASSWORD": 'gs-root',"HOST": '127.0.0.1',"PORT": 3306,"ATOMIC_REQUESTS": True,"CHARSET": "utf8","COLLATION": "utf8_bin",# django-db-connection-pool 配置參數"POOL_OPTIONS": {"POOL_SIZE": 10,  # 基礎連接量"MAX_OVERFLOW": 15,  # 最大允許溢出"RECYCLE": 1 * 60 * 60,  # 閑置的連接回收時間"TIMEOUT": 30,  # 獲取連接超時時間"PRE_PING": True,  # 啟用連接前檢查(關鍵配置)},}
}# 時間本地化
LANGUAGE_CODE = "zh-hans"  # 改為簡體中文(可選)
TIME_ZONE = "Asia/Shanghai"  # 關鍵修改:設置為上海時區(北京時間)
USE_I18N = True  # 禁用國際化,不然時間就是utc時間
USE_TZ = False  # 設置True的化,存到數據庫的時間都是utc時間,False就是存本地時間# Redis broker,密碼是gs-Admin1,去掉 :gs-Admin1@  就是沒有配置密碼
REDIS_BROKER_URL = 'redis://:gs-Admin1@127.0.0.1:6379/0'
REDIS_RESULT_BACKEND = 'redis://:gs-Admin1@127.0.0.1:6379/1'

四、celery配置

4.1、配置文件

celery_app/config/config.py

# 導入 Django 的環境
import os
import django
import sys# 導入django項目的根目錄,可以隨意導入模塊
django_root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, django_root_path)
# django配置文件
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "simple_system.settings")
# 導入django環境,異步/定時任務中,就可以隨意調用django中的orm,cache等服務了
django.setup()from django.conf import settingsfrom celery_app.config.scheduler_config import beat_schedule# 顯示指明異步/定時任務所在的py文件路徑
tasks_module = ["celery_app.tasks.task_scheduler", "celery_app.tasks.task_async"]# celery的配置
config_dict = {"broker_url": settings.REDIS_BROKER_URL,  # 'redis://:123456@127.0.0.1:6379/1' 有密碼時,123456是密碼"result_backend": settings.REDIS_RESULT_BACKEND,"task_serializer": "json","result_serializer": "json","accept_content": ["json"],"timezone": "Asia/Shanghai","enable_utc": False,"result_expires": 1 * 60 * 60,"beat_schedule": beat_schedule,
}"""
參數解析:
accept_content:允許的內容類型/序列化程序的白名單,如果收到不在此列表中的消息,則該消息將被丟棄并出現錯誤,默認只為json;
task_serializer:標識要使用的默認序列化方法的字符串,默認值為json;
result_serializer:結果序列化格式,默認值為json;
timezone:配置Celery以使用自定義時區;
enable_utc:啟用消息中的日期和時間,將轉換為使用 UTC 時區,與timezone連用,當設置為 false 時,將使用系統本地時區。
result_expires: 異步任務結果存活時長
beat_schedule:設置定時任務
"""

celery_app/config/config_logger.py

import os
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler# 項目根目錄的絕對路徑
BASE_DIR = Path(__file__).resolve().parent.parent.parent
# celery日志的絕對路徑
LOGS_PATH = os.path.join(BASE_DIR, "logs")
if not os.path.exists(LOGS_PATH):os.makedirs(LOGS_PATH)# 日志配置(集成ConcurrentRotatingFileHandler)
LOGGING_CONFIG = {"version": 1,"disable_existing_loggers": False,"formatters": {"verbose": {"format": "[{asctime}][{levelname}][{filename}:{module}:{lineno:d}:{funcName}]:{message}","datefmt": "%Y-%m-%d %H:%M:%S","style": "{",},},"handlers": {# 使用第三方的ConcurrentRotatingFileHandler"concurrent_file": {"level": "INFO",# 第三方處理器的類路徑(固定寫法)"class": "concurrent_log_handler.ConcurrentRotatingFileHandler",# 日志文件路徑"filename": os.path.join(LOGS_PATH, "celery.log"),# 單個文件最大大小(5MB)"maxBytes": 10 * 1024 * 1024,# 保留備份文件數量"backupCount": 15,# 編碼格式"encoding": "utf-8",# 日志格式"formatter": "verbose",# 可選:多進程安全的額外參數(如鎖定超時) 版本≥ 0.9.19才能使用(linux/unix系統才能使用)# "lockTimeout": 5,  # 等待日志鎖的超時時間(秒)},},"loggers": {# Celery相關日志會使用上述處理器"celery.worker": {"handlers": ["concurrent_file"],"level": "INFO","propagate": True,},# 任務模塊的日志也會被捕獲"celery_app.tasks": {"handlers": ["concurrent_file"],"level": "INFO","propagate": False,  # 避免重復日志},},
}

celery_app/config/config_scheduler.py

from celery.schedules import crontab
from datetime import timedelta# 定時任務
beat_schedule = {  # 定時任務配置# 名字隨意命名"add-func-3-seconds": {# 執行add_task下的addy函數"task": "celery_app.tasks.task_scheduler.add_func",  # 任務函數的導入路徑# 每3秒執行一次"schedule": timedelta(minutes=30),# add函數傳遞的參數"args": (10, 21),},# 名字隨意起"add-func-5-minutes": {"task": "celery_app.tasks.task_scheduler.add_func",  # 任務函數的導入路徑# crontab不傳的參數默認就是每的意思,比如這里是每年每月每日每天每小時的5分執行該任務"schedule": crontab(minute="5"),  # 之前時間點執行,每小時的第5分鐘執行任務, 改成小時,分鐘,秒 就是每天的哪個小時哪分鐘哪秒鐘執行"args": (19, 22),  # 定時任務需要的參數},# 查詢orm表"task-ope_model-3-seconds":{# 執行add_task下的addy函數"task": "celery_app.tasks.task_scheduler.ope_model",  # 任務函數的導入路徑# 每3秒執行一次"schedule": timedelta(minutes=30),},"task-rename_uwsgi_backup_log":{# 執行add_task下的addy函數"task": "celery_app.tasks.task_scheduler.rename_uwsgi_backup_log",  # 任務函數的導入路徑# 每天凌晨零點執行一次"schedule": crontab(hour=0,minute=0),}
}

4.2、systemctl文件

celery_app/runs/celery_beat.service

[Unit]
Description=Celery Beat Service[Service]
# 運行用戶(替換為你的用戶名和組)
User=root
Group=root# 項目根目錄ls,看到celery_app包
WorkingDirectory=/home/lhz/simple_system# 啟動命令(使用虛擬環境中 celery 的絕對路徑)
ExecStart=/home/lhz/venv/simple_system/bin/celery -A celery_app.celery beat -l info# 進程退出后自動重啟
Restart=always
RestartSec=5# 環境變量(確保虛擬環境和項目依賴正常加載)
Environment="PATH=/home/lhz/venv/simple_system/bin"
Environment="PYTHONPATH=/home/lhz/simple_system"  # 確保項目能被導入[Install]
WantedBy=multi-user.target

celery_app/runs/celery_worker.service

[Unit]
Description=Celery Worker Service[Service]
# 運行用戶和組
User=root
Group=root# 項目根目錄,里面能看到 celery_app 包
WorkingDirectory=/home/lhz/simple_system# 啟動命令(使用虛擬環境中的 celery)
ExecStart=/home/lhz/venv/simple_system/bin/celery -A celery_app.celery worker --loglevel=INFO# 進程退出后自動重啟
Restart=always
RestartSec=5# 環境變量(確保虛擬環境和項目依賴正常加載)
Environment="PATH=/home/lhz/venv/simple_system/bin"
Environment="PYTHONPATH=/home/lhz/simple_system"[Install]
WantedBy=multi-user.target

4.3、任務模塊

celery/tasks/task_async.py

from celery import shared_task
from celery_app.celery import logger@shared_task(bind=True)
def push_template_message(self):for i in range(10):logger.info(f"定時任務開始執行(任務ID:{self.request.id}),i={i}")

celery/tasks/task_scheduler.py

import os.pathfrom celery import shared_task
from home.models import UserModel
from celery_app.celery import logger
from celery_app.config.logger_config import LOGS_PATH
from datetime import datetime@shared_task(bind=True)
def add_func(self,a,b):for i in range(10):logger.info(f"定時任務開始執行(任務ID:{self.request.id}),a+b+i={a+b+i}")@shared_task(bind=True)
def ope_model(self):objs = UserModel.objects.all()for obj in objs:logger.info(f"異步任務開始執行(任務ID:{self.request.id}),{obj.account}")@shared_task(bind=True)
def rename_uwsgi_backup_log(self):# 將 根目錄/logs中uwsgi.log.172384中的時間戳轉成日期name_list = os.listdir(LOGS_PATH)name_pref = 'uwsgi.log.'task_id = f"任務ID={self.request.id}"for name in name_list:if name.startswith(name_pref):_,time_str = name.rsplit('.',1)try:time_int = int(time_str)date_str = datetime.fromtimestamp(int(time_int)).strftime("%Y-%m-%d_%H-%M-%S")new_name = f'{name_pref}{date_str}'old_path = os.path.join(LOGS_PATH,name)new_path = os.path.join(LOGS_PATH,new_name)if not os.path.exists(new_path):os.rename(old_path,new_path)logger.info(f'{task_id},{name} 重命名成{new_name}')else:for i in range(10):new_name += f'-{i+1}'new_path = os.path.join(LOGS_PATH,new_name)if not os.path.exists(new_path):os.rename(old_path, new_path)logger.info(f'{task_id},{name} 重命名成{new_name}')breakelse:logger.info(f"{task_id},{name}進行10次重命名都失敗了")except Exception as e:logger.error(f"{task_id},{name}重命名失敗")

4.4、celery主模塊

celery_app/celery.py

from datetime import datetime
import logging.config
from celery import Celery
from celery_app.config.config import config_dict,tasks_module
from celery_app.config.logger_config import LOGGING_CONFIG# 實例化celery對象,傳遞一個名字
celery_app = Celery("celery_app")# 顯式指定 任務函數所在的模塊
celery_app.autodiscover_tasks(tasks_module)# 通過dictConfig加載日志配置
logging.config.dictConfig(LOGGING_CONFIG)# 導入配置消息
celery_app.conf.update(**config_dict)# 日志記錄器,給任務使用
logger = logging.getLogger("celery_app.tasks")

五、測試使用

5.1、基本準備

1、安裝好mysql、redis等

2、給user表創建3條記錄

????????get請求:/home/user/??

3、在視圖中調用異步任務

? ? ? ? get請求:/home/test/

5.2、啟動命令

異步任務啟動

1、windows系統 (在django項目根目錄下執行)

celery -A celery_app.celery worker --loglevel=info  -P  eventlet

2、linux系統(在django項目根目錄下執行)

celery -A celery_app.celery worker --loglevel=info 

啟動成功:

定時任務啟動

windows/linux系統(在項目根目錄下執行)

 celery -A celery_app.celery beat -l info

啟動成功:

推送了定時任務:

六、遷移到自己項目

只需要將celery_app整個包都遷移即可

1、修改celery_app/config/config.py

# simple_system 改成 自己的項目名
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "simple_system.settings")

2、修改自己項目的settings.py

# Redis broker,密碼是gs-Admin1,去掉 :gs-Admin1@  就是沒有配置密碼
REDIS_BROKER_URL = 'redis://:gs-Admin1@127.0.0.1:6379/0'
REDIS_RESULT_BACKEND = 'redis://:gs-Admin1@127.0.0.1:6379/1'

3、celery_app/tasks/task_scheduler.py

# 如果沒有UserModel ,就不引入和刪除此方法@shared_task(bind=True)
def ope_model(self):objs = UserModel.objects.all()for obj in objs:logger.info(f"異步任務開始執行(任務ID:{self.request.id}),{obj.account}")

4、celery_app/runs 中兩個配置,根據實際情況修改

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/93621.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/93621.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/93621.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Ubuntu18.04 LTS +RTL 8125 出現安裝完系統后沒有網絡問題

Ubuntu18.04 LTS RTL 8125 出現安裝完系統后沒有網絡問題問題描述最終解決方案1.下載對應的Realtek網卡驅動&#xff0c;使用命令lspci查看網卡信息安裝網卡3.重啟電腦記錄過程1.內核升級方式1&#xff09;下載新的內核驅動2&#xff09;安裝內核驅動3&#xff09;重啟電腦4&am…

集成電路學習:什么是ARM CortexM處理器核心

ARM Cortex-M是ARM公司專為微控制器( Microcontroller)設計的處理器核心系列,它以其高性能、低功耗和易于開發的特點,在嵌入式系統和微控制器領域得到了廣泛應用。以下是關于ARM Cortex-M的詳細介紹: 一、ARM Cortex-M的概述 ARM Cortex-M系列處理器是基于ARM架構的高能效…

Apache Ignite 的分布式原子類型(Atomic Types)

以下的內容是關于 Apache Ignite 的分布式原子類型&#xff08;Atomic Types&#xff09;&#xff0c;主要包括 IgniteAtomicLong 和 IgniteAtomicReference。它們是 跨集群節點的“全局共享變量”&#xff0c;支持線程安全、原子性操作&#xff0c;即使多個節點同時訪問也能保…

Leetcode 08 java

283. 移動零 提示 給定一個數組 nums&#xff0c;編寫一個函數將所有 0 移動到數組的末尾&#xff0c;同時保持非零元素的相對順序。 請注意 &#xff0c;必須在不復制數組的情況下原地對數組進行操作。 示例 1: 輸入: nums [0,1,0,3,12] 輸出: [1,3,12,0,0] 示例 2: 輸…

LeetCode 56 - 合并區間

思路 排序&#xff1a;將所有區間按起始點從小到大排序。貪心合并&#xff1a;初始化一個結果列表&#xff0c;放入第一個區間。然后遍歷剩余區間&#xff0c;將當前區間與結果列表中的最后一個區間比較&#xff1a; 若重疊&#xff08;當前區間起點 ≤ 結果區間終點&#xff0…

DNS 正向查找與反向查找

DNS 區域是 DNS 中基本的組織單元&#xff0c;為域名定義了管理和權威邊界。一個 DNS 區域通常包含一系列 DNS 資源記錄&#xff0c;包括名稱到地址的映射&#xff08;正向查找&#xff09;和地址到名稱的映射&#xff08;反向查找&#xff09;。這些區域對于高效管理和解析網絡…

Oracle ERP FORM開發 — 新增查詢條件

1 根據值來查詢具體流程步驟看第2節&#xff0c;這里提供核心的增加查詢條件的觸發器代碼&#xff1a;1.1 可完全匹配的值比如“是”&#xff0c;“否”&#xff0c;“物料”&#xff0c;“”汽車 等等這些可以直接通過對應的值匹配&#xff0c;特點就是詞語&#xff0c;短小。…

Flutter實現列表功能

在Flutter中,可以通過ListView和ListTile等組件來實現類似Android中RecyclerView和Adapter的功能。以下是一個通用的設計架構,用于設計列表數據: 1. 定義數據模型 首先,定義一個數據模型類,用于存儲列表中每一項的數據。例如: class ItemModel {final String title;fi…

2.1、Redis的單線程本質和多線程的操作

Redis的單線程本質 1. 核心單線程部分 #mermaid-svg-iFErSltthPIEsuiP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-iFErSltthPIEsuiP .error-icon{fill:#552222;}#mermaid-svg-iFErSltthPIEsuiP .error-text{fil…

文件權限值的表示方法

文章目錄字符表示方法8 進制數值表示方法字符表示方法 Linux表示說明Linux表示說明r--只讀-w-僅可寫--x僅可執行rw-可讀可寫-wx可寫可執行r-x可讀可執行rwx可讀可寫可執行---無權限 8 進制數值表示方法 權限符號8進制2進制r4100w2010x1001rw6110rx5101wx3011rwx7111---0000

【38】WinForm入門到精通 ——WinForm平臺為AnyCPU 無法切換為x64,也無法添加 x64及其他平臺

WinForm 是 Windows Form 的簡稱&#xff0c;是基于 .NET Framework 平臺的客戶端&#xff08;PC軟件&#xff09;開發技術&#xff0c;是 C# 語言中的一個重要應用。.NET 提供了大量 Windows 風格的控件和事件&#xff0c;可以直接拿來使用。本專欄內容是按照標題序號逐漸深入…

門控激活函數:GLU/GTU/Swish/HSwish/Mish/SwiGLU

10 門控激活函數 10.1 GLU&#xff1a;門控線性單元函數Gated Linear Unit10.2 GTU&#xff1a;門控Tanh單元函數Gated Tanh Unit自門控激活函數&#xff08;Self-gated activation function&#xff09;是一種通過自身機制動態調節信息流動的激活函數&#xff0c;其核心在于模…

Linux內核IPv4多播路由深度解析:從數據結構到高效轉發

多播路由是網絡通信的核心技術之一&#xff0c;Linux內核通過精密的多層設計實現了高性能的IPv4多播路由功能。本文將深入剖析其核心實現機制&#xff0c;揭示其高效運轉的秘密。一、核心數據結構&#xff1a;路由系統的基石1. 多播路由表&#xff08;struct mr_table&#xff…

ffmpeg-7.1.1 下載安裝 windows 版,MP4 轉 m3u8 切片,遇到報錯 Unrecognized option ‘vbsf‘的解決辦法

工作中偶爾會需要造指定大小的文檔文件&#xff0c;不要求內容&#xff0c;可以隨意填充任意無毒內容&#xff0c;所以打算用ts文件填充&#xff0c;現記錄下過程。一、下載 ffmpeg廢話不多說&#xff0c;上鏈接&#xff0c;https://ffmpeg.org/會跳轉新頁面&#xff0c;向下拉…

Linux網絡編程:網絡基礎概念(下)

目錄 前言&#xff1a; 一、網絡傳輸基本流程 1.1、認識MAC地址 1.2、認識IP地址 二、socket編程預備 2.1、端口號 2.2、傳輸層的代表 2.3、網絡字節序 2.4、sockaddr 結構 總結&#xff1a; 前言&#xff1a; 大家好&#xff0c;上一篇文章&#xff0c;我們說到了…

亞馬遜廣告進階指南:如何優化流量實現新品快速起量

“新品上架如何快速獲取精準流量&#xff1f;”“如何降低ACOS同時提升轉化率&#xff1f;”“競品流量攔截有哪些高效方法&#xff1f;”“關鍵詞廣告和ASIN廣告如何協同投放&#xff1f;”“人工投放效果不穩定&#xff0c;AI工具真的能解決問題嗎&#xff1f;”如果你也在思…

路徑平滑優化算法--Clothoid 路徑平滑

路徑平滑優化算法–Clothoid 路徑平滑 文章目錄路徑平滑優化算法--Clothoid 路徑平滑0 為什么選 Clothoid&#xff1f;1 數學基礎&#xff1a;嚴謹推導&#xff08;Mathematical Foundation&#xff09;可視化解釋1.1 曲率線性假設1.2 切向角&#xff08;Heading Angle&#…

PCB學習筆記(一)

文章目錄一、PCB的制作過程了解1.1 覆銅板一、核心作用&#xff1a;制作印制電路板&#xff08;PCB&#xff09;二、不同類型覆銅板的針對性用途三、延伸應用1.2 覆銅板和信號線的關系一、覆銅板不是“全是銅”&#xff0c;原始結構是“絕緣基材銅箔”二、信號線就是銅&#xf…

【19】C# 窗體應用WinForm ——【列表框ListBox、復選列表框CheckedListBox】屬性、方法、實例應用

文章目錄9 復選列表框CheckedListBox10. 列表框ListBox10.1 實例&#xff1a;買菜10.2 實例&#xff1a;購菜 應用二WinForm 是 Windows Form 的簡稱&#xff0c;是基于 .NET Framework 平臺的客戶端&#xff08;PC軟件&#xff09;開發技術&#xff0c;是 C# 語言中的一個重要…

新注冊企業信息查詢“數據大集網”:驅動企業增長的源頭活水

商貿繁榮的齊魯大地上&#xff0c;“趕大集”曾是千年傳承的民間智慧。而今天&#xff0c;一場以新注冊企業信息為核心的“數據大集”正悄然重塑商業生態——數據大集網&#xff0c;以“聚天下好數&#xff0c;促萬企互聯”為使命&#xff0c;將分散的企業信息轉化為精準商機&a…