一、依賴環境
1、python解釋器版本:python3.7.5
2、穩定依賴包
# Celery 核心
celery==5.2.7
kombu==5.2.4
billiard==3.6.4.0
vine==5.0.0# Redis broker backend
redis==4.3.6# eventlet (如果用 -P eventlet)【windows系統可以使用】
eventlet==0.33.3
greenlet==1.1.3# 避免 importlib_metadata API 沖突
importlib_metadata<5# Django相關
Django==3.2.25
django-celery-beat==2.3.0
django-celery-results==2.4.0# 數據庫客戶端
PyMySQL==1.1.1
# 日志包
concurrent-log-handler==0.9.28
# 數據庫連接池包
django-db-connection-pool==1.0.7
# 啟動django項目
uwsgi==2.0.22
二、項目結構
- simple_system
- settings.py? # django 配置文件
- celery_app?
- config? ?
- config.py? ?# 主要配置
- logger_config.py? ?# 日志配置
- scheduler_config.py? # 定時任務配置
- runs
- celery_beat.service? ?# 使用systemctl啟動定時進程
- celery_worker.service? # 使用systemctl啟動異步進程
- tasks
- task_async.py? # 存放異步任務
- task_scheduler.py # 存放定時任務
- celery.py? # celery 項目啟動入口
- config? ?
gitee上項目源碼
https://gitee.com/liuhaizhang/simple_systemhttps://gitee.com/liuhaizhang/simple_system
三、django相關配置
settings.py
# 使用pymysql代替mysqlclient
import pymysql
pymysql.install_as_MySQLdb()# 禁用debug模式
DEBUG = False
ALLOWED_HOSTS = ["*"]# mysql配置連接池
DATABASES = {"default": {"ENGINE": "dj_db_conn_pool.backends.mysql","NAME": 'simple_system',"USER": 'root',"PASSWORD": 'gs-root',"HOST": '127.0.0.1',"PORT": 3306,"ATOMIC_REQUESTS": True,"CHARSET": "utf8","COLLATION": "utf8_bin",# django-db-connection-pool 配置參數"POOL_OPTIONS": {"POOL_SIZE": 10, # 基礎連接量"MAX_OVERFLOW": 15, # 最大允許溢出"RECYCLE": 1 * 60 * 60, # 閑置的連接回收時間"TIMEOUT": 30, # 獲取連接超時時間"PRE_PING": True, # 啟用連接前檢查(關鍵配置)},}
}# 時間本地化
LANGUAGE_CODE = "zh-hans" # 改為簡體中文(可選)
TIME_ZONE = "Asia/Shanghai" # 關鍵修改:設置為上海時區(北京時間)
USE_I18N = True # 禁用國際化,不然時間就是utc時間
USE_TZ = False # 設置True的化,存到數據庫的時間都是utc時間,False就是存本地時間# Redis broker,密碼是gs-Admin1,去掉 :gs-Admin1@ 就是沒有配置密碼
REDIS_BROKER_URL = 'redis://:gs-Admin1@127.0.0.1:6379/0'
REDIS_RESULT_BACKEND = 'redis://:gs-Admin1@127.0.0.1:6379/1'
四、celery配置
4.1、配置文件
celery_app/config/config.py
# 導入 Django 的環境
import os
import django
import sys# 導入django項目的根目錄,可以隨意導入模塊
django_root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, django_root_path)
# django配置文件
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "simple_system.settings")
# 導入django環境,異步/定時任務中,就可以隨意調用django中的orm,cache等服務了
django.setup()from django.conf import settingsfrom celery_app.config.scheduler_config import beat_schedule# 顯示指明異步/定時任務所在的py文件路徑
tasks_module = ["celery_app.tasks.task_scheduler", "celery_app.tasks.task_async"]# celery的配置
config_dict = {"broker_url": settings.REDIS_BROKER_URL, # 'redis://:123456@127.0.0.1:6379/1' 有密碼時,123456是密碼"result_backend": settings.REDIS_RESULT_BACKEND,"task_serializer": "json","result_serializer": "json","accept_content": ["json"],"timezone": "Asia/Shanghai","enable_utc": False,"result_expires": 1 * 60 * 60,"beat_schedule": beat_schedule,
}"""
參數解析:
accept_content:允許的內容類型/序列化程序的白名單,如果收到不在此列表中的消息,則該消息將被丟棄并出現錯誤,默認只為json;
task_serializer:標識要使用的默認序列化方法的字符串,默認值為json;
result_serializer:結果序列化格式,默認值為json;
timezone:配置Celery以使用自定義時區;
enable_utc:啟用消息中的日期和時間,將轉換為使用 UTC 時區,與timezone連用,當設置為 false 時,將使用系統本地時區。
result_expires: 異步任務結果存活時長
beat_schedule:設置定時任務
"""
celery_app/config/config_logger.py
import os
from pathlib import Path
import logging
from logging.handlers import RotatingFileHandler# 項目根目錄的絕對路徑
BASE_DIR = Path(__file__).resolve().parent.parent.parent
# celery日志的絕對路徑
LOGS_PATH = os.path.join(BASE_DIR, "logs")
if not os.path.exists(LOGS_PATH):os.makedirs(LOGS_PATH)# 日志配置(集成ConcurrentRotatingFileHandler)
LOGGING_CONFIG = {"version": 1,"disable_existing_loggers": False,"formatters": {"verbose": {"format": "[{asctime}][{levelname}][{filename}:{module}:{lineno:d}:{funcName}]:{message}","datefmt": "%Y-%m-%d %H:%M:%S","style": "{",},},"handlers": {# 使用第三方的ConcurrentRotatingFileHandler"concurrent_file": {"level": "INFO",# 第三方處理器的類路徑(固定寫法)"class": "concurrent_log_handler.ConcurrentRotatingFileHandler",# 日志文件路徑"filename": os.path.join(LOGS_PATH, "celery.log"),# 單個文件最大大小(5MB)"maxBytes": 10 * 1024 * 1024,# 保留備份文件數量"backupCount": 15,# 編碼格式"encoding": "utf-8",# 日志格式"formatter": "verbose",# 可選:多進程安全的額外參數(如鎖定超時) 版本≥ 0.9.19才能使用(linux/unix系統才能使用)# "lockTimeout": 5, # 等待日志鎖的超時時間(秒)},},"loggers": {# Celery相關日志會使用上述處理器"celery.worker": {"handlers": ["concurrent_file"],"level": "INFO","propagate": True,},# 任務模塊的日志也會被捕獲"celery_app.tasks": {"handlers": ["concurrent_file"],"level": "INFO","propagate": False, # 避免重復日志},},
}
celery_app/config/config_scheduler.py
from celery.schedules import crontab
from datetime import timedelta# 定時任務
beat_schedule = { # 定時任務配置# 名字隨意命名"add-func-3-seconds": {# 執行add_task下的addy函數"task": "celery_app.tasks.task_scheduler.add_func", # 任務函數的導入路徑# 每3秒執行一次"schedule": timedelta(minutes=30),# add函數傳遞的參數"args": (10, 21),},# 名字隨意起"add-func-5-minutes": {"task": "celery_app.tasks.task_scheduler.add_func", # 任務函數的導入路徑# crontab不傳的參數默認就是每的意思,比如這里是每年每月每日每天每小時的5分執行該任務"schedule": crontab(minute="5"), # 之前時間點執行,每小時的第5分鐘執行任務, 改成小時,分鐘,秒 就是每天的哪個小時哪分鐘哪秒鐘執行"args": (19, 22), # 定時任務需要的參數},# 查詢orm表"task-ope_model-3-seconds":{# 執行add_task下的addy函數"task": "celery_app.tasks.task_scheduler.ope_model", # 任務函數的導入路徑# 每3秒執行一次"schedule": timedelta(minutes=30),},"task-rename_uwsgi_backup_log":{# 執行add_task下的addy函數"task": "celery_app.tasks.task_scheduler.rename_uwsgi_backup_log", # 任務函數的導入路徑# 每天凌晨零點執行一次"schedule": crontab(hour=0,minute=0),}
}
4.2、systemctl文件
celery_app/runs/celery_beat.service
[Unit]
Description=Celery Beat Service[Service]
# 運行用戶(替換為你的用戶名和組)
User=root
Group=root# 項目根目錄ls,看到celery_app包
WorkingDirectory=/home/lhz/simple_system# 啟動命令(使用虛擬環境中 celery 的絕對路徑)
ExecStart=/home/lhz/venv/simple_system/bin/celery -A celery_app.celery beat -l info# 進程退出后自動重啟
Restart=always
RestartSec=5# 環境變量(確保虛擬環境和項目依賴正常加載)
Environment="PATH=/home/lhz/venv/simple_system/bin"
Environment="PYTHONPATH=/home/lhz/simple_system" # 確保項目能被導入[Install]
WantedBy=multi-user.target
celery_app/runs/celery_worker.service
[Unit]
Description=Celery Worker Service[Service]
# 運行用戶和組
User=root
Group=root# 項目根目錄,里面能看到 celery_app 包
WorkingDirectory=/home/lhz/simple_system# 啟動命令(使用虛擬環境中的 celery)
ExecStart=/home/lhz/venv/simple_system/bin/celery -A celery_app.celery worker --loglevel=INFO# 進程退出后自動重啟
Restart=always
RestartSec=5# 環境變量(確保虛擬環境和項目依賴正常加載)
Environment="PATH=/home/lhz/venv/simple_system/bin"
Environment="PYTHONPATH=/home/lhz/simple_system"[Install]
WantedBy=multi-user.target
4.3、任務模塊
celery/tasks/task_async.py
from celery import shared_task
from celery_app.celery import logger@shared_task(bind=True)
def push_template_message(self):for i in range(10):logger.info(f"定時任務開始執行(任務ID:{self.request.id}),i={i}")
celery/tasks/task_scheduler.py
import os.pathfrom celery import shared_task
from home.models import UserModel
from celery_app.celery import logger
from celery_app.config.logger_config import LOGS_PATH
from datetime import datetime@shared_task(bind=True)
def add_func(self,a,b):for i in range(10):logger.info(f"定時任務開始執行(任務ID:{self.request.id}),a+b+i={a+b+i}")@shared_task(bind=True)
def ope_model(self):objs = UserModel.objects.all()for obj in objs:logger.info(f"異步任務開始執行(任務ID:{self.request.id}),{obj.account}")@shared_task(bind=True)
def rename_uwsgi_backup_log(self):# 將 根目錄/logs中uwsgi.log.172384中的時間戳轉成日期name_list = os.listdir(LOGS_PATH)name_pref = 'uwsgi.log.'task_id = f"任務ID={self.request.id}"for name in name_list:if name.startswith(name_pref):_,time_str = name.rsplit('.',1)try:time_int = int(time_str)date_str = datetime.fromtimestamp(int(time_int)).strftime("%Y-%m-%d_%H-%M-%S")new_name = f'{name_pref}{date_str}'old_path = os.path.join(LOGS_PATH,name)new_path = os.path.join(LOGS_PATH,new_name)if not os.path.exists(new_path):os.rename(old_path,new_path)logger.info(f'{task_id},{name} 重命名成{new_name}')else:for i in range(10):new_name += f'-{i+1}'new_path = os.path.join(LOGS_PATH,new_name)if not os.path.exists(new_path):os.rename(old_path, new_path)logger.info(f'{task_id},{name} 重命名成{new_name}')breakelse:logger.info(f"{task_id},{name}進行10次重命名都失敗了")except Exception as e:logger.error(f"{task_id},{name}重命名失敗")
4.4、celery主模塊
celery_app/celery.py
from datetime import datetime
import logging.config
from celery import Celery
from celery_app.config.config import config_dict,tasks_module
from celery_app.config.logger_config import LOGGING_CONFIG# 實例化celery對象,傳遞一個名字
celery_app = Celery("celery_app")# 顯式指定 任務函數所在的模塊
celery_app.autodiscover_tasks(tasks_module)# 通過dictConfig加載日志配置
logging.config.dictConfig(LOGGING_CONFIG)# 導入配置消息
celery_app.conf.update(**config_dict)# 日志記錄器,給任務使用
logger = logging.getLogger("celery_app.tasks")
五、測試使用
5.1、基本準備
1、安裝好mysql、redis等
2、給user表創建3條記錄
????????get請求:/home/user/??
3、在視圖中調用異步任務
? ? ? ? get請求:/home/test/
5.2、啟動命令
異步任務啟動
1、windows系統 (在django項目根目錄下執行)
celery -A celery_app.celery worker --loglevel=info -P eventlet
2、linux系統(在django項目根目錄下執行)
celery -A celery_app.celery worker --loglevel=info
啟動成功:
定時任務啟動
windows/linux系統(在項目根目錄下執行)
celery -A celery_app.celery beat -l info
啟動成功:
推送了定時任務:
六、遷移到自己項目
只需要將celery_app整個包都遷移即可
1、修改celery_app/config/config.py
# simple_system 改成 自己的項目名
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "simple_system.settings")
2、修改自己項目的settings.py
# Redis broker,密碼是gs-Admin1,去掉 :gs-Admin1@ 就是沒有配置密碼
REDIS_BROKER_URL = 'redis://:gs-Admin1@127.0.0.1:6379/0'
REDIS_RESULT_BACKEND = 'redis://:gs-Admin1@127.0.0.1:6379/1'
3、celery_app/tasks/task_scheduler.py
# 如果沒有UserModel ,就不引入和刪除此方法@shared_task(bind=True)
def ope_model(self):objs = UserModel.objects.all()for obj in objs:logger.info(f"異步任務開始執行(任務ID:{self.request.id}),{obj.account}")
4、celery_app/runs 中兩個配置,根據實際情況修改