Celery在Django中的應用

在這里插入圖片描述

Celery在Django中的應用

  • 一、項目配置
  • 二、異步任務
    • 2.1 普通用法
      • 2.1.1 通過delay
      • 2.1.2 通過apply_async
    • 2.2 高級用法
      • 2.2.1 任務回調(Callback)
      • 2.2.2 任務鏈(Chaining)
      • 2.2.3 任務組(Group)
      • 2.2.4 任務和弦(Chord)
  • 三、定時任務
  • 四、啟動celery
    • 4.1 命令行方式
    • 4.2 腳本方式(容易出問題,建議用命令行方式,很多默認配置內置好)
      • 4.2.1 啟動worker(腳本方式博主暫時沒找到好方法能捕獲任務結果)
      • 4.2.2 啟動beat
      • 4.2.3 合并啟動worker和beat
  • 五、監控管理
    • 5.1 celery inspect
    • 5.2 celery control
    • 5.3 celery event
    • 5.4 celery multi
    • 5.5 celery purge
    • 5.6 celery flower

一、項目配置

1.1 確認celery及django版本相對應,本文使用django3.2celery5.5
1.2 創建一個名為CeleryStudydjango項目,以及一個名為test1_app,目錄結構如下:
在這里插入圖片描述
1.3 配置celery的setting參數(大部分不需要全局配置,可以針對tasks單獨配置)

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'django-db'  # django-db(使用 Django 數據庫存儲結果)
CELERY_ACCEPT_CONTENT = ['json'] # 指定 Celery 接受的任務序列化格式(避免反序列化安全問題)。
CELERY_TASK_SERIALIZER = 'json' # 指定任務的序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 指定結果的序列化方式
CELERY_TIMEZONE = TIME_ZONE # 設置 Celery 的時區(影響定時任務的調度時間)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定定時任務調度器的后端
# 默認內存調度:celery.beat:PersistentScheduler(需配合 beat_schedule_filename)
# Redis 調度:celery.beat:RedisScheduler(需安裝 celery-redis-scheduler)
CELERYD_CONCURRENCY = 4 # Worker 并發數(默認 CPU 核心數)
CELERY_BEAT_SCHEDULE = {  # 一般在celery.py文件配置'every-10-seconds': {'task': 'myapp.tasks.debug','schedule': 10.0,},
}
CELERY_BEAT_MAX_LOOP_INTERVAL = 300  # 秒, Beat 調度器的最大循環間隔(默認 5 分鐘)
CELERY_TASK_TIME_LIMIT = 300  # 硬超時 5 分鐘(任務被強制終止)
CELERY_TASK_SOFT_TIME_LIMIT = 240  # 軟超時 4 分鐘(觸發 `SoftTimeLimitExceeded`)
CELERY_TASK_DEFAULT_RETRY_DELAY = 60  # 任務重試間隔 1 分鐘
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(levelname)s] %(message)s' # 自定義 Worker 日志格式
...... # 等等等等等等, 還有一大堆配置

1.4 celery全局配置

# celery.pyimport os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryStudy.settings')  # 其作用是為 Django 提供配置文件的定位信息,確保框架能正確加載項目的各項設置app = Celery('CeleryStudy')		# celery實例,一般命名為項目名稱
app.config_from_object('django.conf:settings', namespace='CELERY')	# celery實例從setting中CELERY開頭的配置獲取app.autodiscover_tasks()  # 自動發現并注冊項目中定義的tasks,會發現 @shared_task 和 @app.task

1.5 修改項目init文件,通過給外部導入

# __init__.pyfrom .celery import app as celery_app__all__ = ("celery_app",)

1.6 在app中新建tasks寫入任務邏輯

# tasks.py
"""
@app.task 是“專屬任務”,綁定到具體應用,適合簡單場景。
@shared_task 是“共享任務”,解耦于應用,適合復雜架構。
"""寫法一:
from celery import Celery
app = Celery('proj')@app.task  # 綁定到當前 `app` 實例
def add(x, y):return x + y寫法二: 
import time
from celery import shared_task@shared_task
def test_add(x, y):time.sleep(2)return x + y@shared_task
def pre_task_test(x):# 定時任務return x

二、異步任務

2.1 普通用法

2.1.1 通過delay

# views.pyfrom django.http import HttpResponse
from .tasks import test_add# Create your views here.def test_celery(request):result = test_add.delay(1, 5)return HttpResponse(result.task_id + ' : ' + result.status)

2.1.2 通過apply_async

# countdown: 延遲執行(秒)。
# eta: 指定具體執行時間(datetime)。
# queue: 指定任務隊列。
# expires: 任務過期時間。
# retry: 是否啟用重試from datetime import datetime, timedelta# 延遲 10 秒執行
test_add.apply_async(args=(1, 5), countdown=10)
# 指定具體執行時間
test_add.apply_async(args=(1, 5), eta=datetime.now() + timedelta(minutes=1))
# 指定隊列和過期時間
test_add.apply_async(args=(1, 5), queue='priority', expires=3600)

2.2 高級用法

通過 signature 對象調用,預生成任務簽名(task.s()),用于創建一個可序列化的任務調用對象。它允許你預定義任務及其參數,而無需立即執行,從而支持更靈活的任務組合(如鏈式調用、組調用等),簽名對象是可序列化的,可以存儲到數據庫通過網絡傳遞

sig = test_add.s(1, 5)  # 創建簽名對象
sig.apply_async()       # 異步執行
sig.delay()             # 等價于 apply_async

2.2.1 任務回調(Callback)

在任務成功后觸發另一個任務(通過 link 參數)

test_add.apply_async(args=(1, 5), link=send_notification.s("Task completed!"))

2.2.2 任務鏈(Chaining)

通過 | 符號或 chain() 將多個任務串聯,前一個任務的結果作為后一個任務的輸入

from celery import chain# 方法1:使用 | 符號
result = (task1.s(1, 2) | task2.s() | task3.s())()
# 方法2:使用 chain()
result = chain(task1.s(1, 2), task2.s(), task3.s())()

2.2.3 任務組(Group)

并行執行多個任務,等待所有任務完成

from celery import groupresult = group(task1.s(i) for i in range(10))()  # 并發執行 10 個 task1

2.2.4 任務和弦(Chord)

先并行執行一組任務(group),全部完成后執行一個匯總任務

from celery import chordresult = chord((task1.s(i) for i in range(10)), task2.s())()  # 10 個 task1 完成后執行 task2

三、定時任務

在celery文件中添加定時任務路由表

# celery.pyapp.conf.beat_schedule = {'task-name': {  # 任務名稱(自定義)'task': 'myapp.tasks.my_task',  # 任務函數路徑(需可導入)'schedule': 30,  # 執行時間規則(固定間隔)# 或 'schedule': crontab(minute='*/5'),  # Cron 表達式'args': (16, 16),  # 傳遞給任務的參數(可選)'options': {'queue': 'priority'},  # 其他選項(如指定隊列)},# 可定義多個任務
}

通過安裝pip install django-celery-beat可以實現在admin后臺動態修改定時任務配置

INSTALLED_APPS = [...,'django_celery_beat',
]# 替換 Celery 的調度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

配置完記得遷移數據庫 python manage.py migrate

在這里插入圖片描述


四、啟動celery

worker = 干活的(執行任務)。
beat = 發任務的(定時生成任務)。
協作關系:beat 是“計劃部門”,worker是“執行部門”,兩者通過 Broker(消息隊列)解耦。
生產建議:分開啟動,Worker 可橫向擴展,Beat 保持單例
在這里插入圖片描述
在這里插入圖片描述

4.1 命令行方式

# 啟動 Worker(處理任務)
celery -A CeleryStudy worker -l infocelery -A CeleryStudy worker -l info -P eventlet # windows環境下命令
"""
prefork 是 Celery 在 Linux 上的默認并發模型,它使用多進程(Multiprocessing)處理任務,
適合 CPU 密集型場景。但 Windows 系統不支持 fork() 系統調用,因此無法使用 prefork 池。
在 Windows 上嘗試使用 prefork 會直接報錯,導致 Worker 無法啟動。eventlet 是一個基于協程(Coroutine)的并發庫,通過綠色線程(Green Thread)實現高并發,
適合 I/O 密集型任務(如 Celery 的異步任務場景)。
在 Windows 上,eventlet 是少數可用的高性能并發池之一。
它通過非阻塞 I/O 和協程調度,避免了線程切換的開銷,同時繞過了 GIL 的限制,
能顯著提升任務處理效率
"""# 啟動 Beat(調度任務)
celery -A CeleryStudy beat -l info# 合并啟動
celery -A CeleryStudy worker --beat -l info

4.2 腳本方式(容易出問題,建議用命令行方式,很多默認配置內置好)

注:涉及django自定義管理命令,自己創建一個commands_app

4.2.1 啟動worker(腳本方式博主暫時沒找到好方法能捕獲任務結果)

# CeleryStudy/commands_app/management/commands/run_celery_worker.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):worker = celery_app.Worker(hostname='worker1@%h',  # Worker 名稱pool='eventlet',        # 進程池類型(prefork/solo/gevent)concurrency=4,         # 并發數loglevel='INFO',       # 日志級別logfile='/var/log/celery/worker.log',  # 日志文件(可選))worker.start()
python manage.py run_celery_worker

4.2.2 啟動beat

# CeleryStudy/commands_app/management/commands/run_celery_beat.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):beat = celery_app.Beat(loglevel='INFO',logfile='/var/log/celery/beat.log',  # 日志文件(可選)scheduler='django_celery_beat.schedulers:DatabaseScheduler',  # 使用數據庫調度)beat.run()
python manage.py run_celery_beat

4.2.3 合并啟動worker和beat

# CeleryStudy/commands_app/management/commands/run_celery.py
from django.core.management.base import BaseCommand
from threading import Thread
from CeleryStudy.celery import app as celery_appclass Command(BaseCommand):def handle(self, *args, **options):# 配置定時任務(可選)celery_app.conf.beat_schedule = {'add-every-10-seconds': {'task': 'proj.celery.debug_task','schedule': 10.0,'args': (16, 16),},}# 啟動 Workerworker = celery_app.Worker(hostname='worker1@%h',pool='prefork',concurrency=4,loglevel='INFO',)# 啟動 Beatbeat = celery_app.Beat(loglevel='INFO',scheduler='django_celery_beat.schedulers:DatabaseScheduler',)"""在后臺線程中運行 Beat如果直接調用 beat.run(),它會阻塞主線程,導致 Worker 無法啟動因此,需要通過線程(Thread)將 Beat 放在后臺運行,避免阻塞主線程"""beat_thread = Thread(target=beat.run)beat_thread.daemon = Truebeat_thread.start()# 啟動 Workerworker.start()
python manage.py run_celery

五、監控管理

5.1 celery inspect

作用:檢查 Worker 狀態、任務信息等(無需停止服務)。

celery -A proj inspect active          # 查看正在執行的任務
celery -A proj inspect registered      # 查看已注冊的任務列表
celery -A proj inspect scheduled       # 查看待執行的定時任務(需 Beat 運行)
celery -A proj inspect reserved        # 查看 Worker 已獲取但未執行的任務
celery -A proj inspect stats           # 查看 Worker 統計信息(如任務處理數)

5.2 celery control

作用:動態控制 Worker 行為(如關閉、重啟、調整并發數)。

celery -A proj control shutdown        # 優雅關閉所有 Worker
celery -A proj control add_consumer Q1 # 動態添加監聽隊列 Q1
celery -A proj control cancel_consumer Q1 # 動態移除監聽隊列 Q1
celery -A proj control pool_grow 10    # 增加 Worker 并發數到 10
celery -A proj control pool_shrink 5   # 減少 Worker 并發數到 5

5.3 celery event

作用:監控 Celery 事件(如任務開始、成功、失敗),可用于自定義儀表盤。

celery -A proj events                  # 啟動事件監控(輸出到終端)
celery -A proj events -d dump          # 以 JSON 格式輸出事件
celery -A proj events -f events.log    # 將事件記錄到文件

5.4 celery multi

作用:同時啟動多個 Worker 或 Beat 實例(適用于分布式部署)。

celery multi start w1 w2 -A proj -l info -Q high,low  # 啟動兩個 Worker,分別監聽不同隊列
celery multi stop w1 w2                               # 停止指定 Worker

5.5 celery purge

作用:清空消息隊列中的所有任務

celery -A proj purge -Q celery       # 清空默認隊列
celery -A proj purge -Q high,low    # 清空多個隊列

5.6 celery flower

作用:啟動基于 Web 的監控儀表盤(需單獨安裝 flower 包)。

pip install flower
celery -A CeleryStudy flower --port=5555     # 訪問 http://localhost:5555

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/93254.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/93254.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/93254.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

DeepSeek生成的高精度大數計算器

# 高精度計算器(精確顯示版)1. **精確顯示優化**:- 新增print_mpfr()函數專門處理MPFR數值的打印- 自動移除多余的尾隨零和小數點- 確保所有浮點結果都以完整十進制形式顯示,不使用科學計數法2. **浮點精度修復**:- 所…

08--深入解析C++ list:高效操作與實現原理

1. list介紹1.1. list概述template < class T, class Alloc allocator<T> > class list;Lists are sequence containers that allow constant time insert and erase operations anywhere within the sequence, and iteration in both directions.概述&#xff1…

GraphQL從入門到精通完整指南

目錄 什么是GraphQLGraphQL核心概念GraphQL Schema定義語言查詢(Queries)變更(Mutations)訂閱(Subscriptions)Schema設計最佳實踐服務端實現客戶端使用高級特性性能優化實戰項目 什么是GraphQL GraphQL是由Facebook開發的一種API查詢語言和運行時。它為API提供了完整且易于理…

使用 Dockerfile 與 Docker Compose 結合+Docker-compose.yml 文件詳解

使用 Dockerfile 與 Docker Compose 結合的完整流程 Dockerfile 用于定義單個容器的構建過程&#xff0c;而 Docker Compose 則用于編排多個容器。以下是結合使用兩者的完整方法&#xff1a; 1. 創建 Dockerfile 在項目目錄中創建 Dockerfile 定義應用鏡像的構建過程&#xff1…

15 ABP Framework 開發工具

ABP Framework 開發工具 概述 該頁面詳細介紹了 ABP Framework 提供的開發工具和命令行界面&#xff08;CLI&#xff09;&#xff0c;用于創建、管理和定制 ABP 項目。ABP CLI 是主要開發工具&#xff0c;支持項目腳手架、模塊添加、數據庫遷移管理及常見開發任務自動化。 ABP …

力扣top100(day02-01)--鏈表01

160. 相交鏈表 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode(int x) {* val x;* next null;* }* }*/ public class Solution {/*** 查找兩個鏈表的相交節點* param headA 第一個…

LLM 中 語音編碼與文本embeding的本質區別

直接使用語音編碼,是什么形式,和文本的區別 直接使用語音編碼的形式 語音編碼是將模擬語音信號轉換為數字信號的技術,其核心是對語音的聲學特征進行數字化表征,直接承載語音的物理聲學信息。其形式可分為以下幾類: 1. 基于波形的編碼(保留原始波形特征) 脈沖編碼調制…

模型選擇與調優

一、模型選擇與調優在機器學習中&#xff0c;模型的選擇和調優是一個重要的步驟&#xff0c;它直接影響到最終模型的性能1、交叉驗證在任何有監督機器學習項目的模型構建階段&#xff0c;我們訓練模型的目的是從標記的示例中學習所有權重和偏差的最佳值如果我們使用相同的標記示…

vue+Django農產品推薦與價格預測系統、雙推薦+機器學習預測+知識圖譜

vueflask農產品推薦與價格預測系統、雙推薦機器學習價格預測知識圖譜文章結尾部分有CSDN官方提供的學長 聯系方式名片 文章結尾部分有CSDN官方提供的學長 聯系方式名片 關注B站&#xff0c;有好處&#xff01;編號: D010 技術架構: vueflaskmysqlneo4j 核心技術&#xff1a; 基…

數據分析小白訓練營:基于python編程語言的Numpy庫介紹(第三方庫)(下篇)

銜接上篇文章&#xff1a;數據分析小白訓練營&#xff1a;基于python編程語言的Numpy庫介紹&#xff08;第三方庫&#xff09;&#xff08;上篇&#xff09;&#xff08;十一&#xff09;數組的組合核心功能&#xff1a;一、生成基數組np.arange().reshape() 基礎運算功能&…

負載因子(Load Factor) :哈希表(Hash Table)中的一個關鍵性能指標

負載因子&#xff08;Load Factor&#xff09; 是哈希表&#xff08;Hash Table&#xff09;中的一個關鍵性能指標&#xff0c;用于衡量哈希表的空間利用率和發生哈希沖突的可能性。一&#xff1a;定義負載因子&#xff08;通常用希臘字母 λ 表示&#xff09;的計算公式為&…

監控插件SkyWalking(一)原理

一、介紹 1、簡介 SkyWalking 是一個 開源的 APM&#xff08;Application Performance Monitoring&#xff0c;應用性能監控&#xff09;和分布式追蹤系統&#xff0c;主要用于監控、追蹤、分析分布式系統中的調用鏈路、性能指標和日志。 它由 Apache 基金會托管&#xff0c;…

【接口自動化測試】---自動化框架pytest

目錄 1、用例運行規則 2、pytest命令參數 3、pytest配置文件 4、前后置 5、斷言 6、參數化---對函數的參數&#xff08;重要&#xff09; 7、fixture 7.1、基本用法 7.2、fixture嵌套&#xff1a; 7.3、請求多個fixture&#xff1a; 7.4、yield fixture 7.5、帶參數…

Flink Stream API 源碼走讀 - socketTextStream

概述 本文深入分析了 Flink 中 socketTextStream() 方法的源碼實現&#xff0c;從用戶API調用到最終返回 DataStream 的完整流程。 核心知識點 1. socketTextStream 方法重載鏈 // 用戶調用入口 env.socketTextStream("hostname", 9999)↓ 補充分隔符參數 env.socket…

待辦事項小程序開發

1. 項目規劃功能需求&#xff1a;添加待辦事項標記完成/未完成刪除待辦事項分類或標簽管理&#xff08;可選&#xff09;數據持久化&#xff08;本地存儲&#xff09;2. 實現功能添加待辦事項&#xff1a;監聽輸入框和按鈕事件&#xff0c;將輸入內容添加到列表。 標記完成/未完…

【C#】Region、Exclude的用法

在 C# 中&#xff0c;Region 和 Exclude 是與圖形編程相關的概念&#xff0c;通常在使用 System.Drawing 命名空間進行 GDI 繪圖時出現。它們主要用于定義和操作二維空間中的區域&#xff08;幾何區域&#xff09;&#xff0c;常用于窗體裁剪、控件重繪、圖形繪制優化等場景。 …

機器學習 - Kaggle項目實踐(3)Digit Recognizer 手寫數字識別

Digit Recognizer | Kaggle 題面 Digit Recognizer-CNN | Kaggle 下面代碼的kaggle版本 使用CNN進行手寫數字識別 學習到了網絡搭建手法學習率退火數據增廣 提高訓練效果。 使用混淆矩陣 以及對分類出錯概率最大的例子單獨拎出來分析。 最終以99.546%正確率 排在 86/1035 …

新手如何高效運營亞馬遜跨境電商:從傳統SP廣告到DeepBI智能策略

"為什么我的廣告點擊量很高但訂單轉化率卻很低&#xff1f;""如何避免新品期廣告預算被大詞消耗殆盡&#xff1f;""為什么手動調整關鍵詞和出價總是慢市場半拍&#xff1f;""競品ASIN投放到底該怎么做才有效&#xff1f;""有沒有…

【論文閱讀 | CVPR 2024 | UniRGB-IR:通過適配器調優實現可見光-紅外語義任務的統一框架】

論文閱讀 | CVPR 2024 | UniRGB-IR&#xff1a;通過適配器調優實現可見光-紅外語義任務的統一框架?1&&2. 摘要&&引言3.方法3.1 整體架構3.2 多模態特征池3.3 補充特征注入器3.4 適配器調優范式4 實驗4.1 RGB-IR 目標檢測4.2 RGB-IR 語義分割4.3 RGB-IR 顯著目…

Hyperf 百度翻譯接口實現方案

保留 HTML/XML 標簽結構&#xff0c;僅翻譯文本內容&#xff0c;避免破壞富文本格式。采用「HTML 解析 → 文本提取 → 批量翻譯 → 回填」的流程。百度翻譯集成方案&#xff1a;富文本內容翻譯系統 HTML 解析 百度翻譯 API 集成 文件結構 app/ ├── Controller/ │ └──…