Celery 是一個基于 Python 的分布式任務隊列框架,它允許你在不同的進程甚至不同的服務器上異步執行任務。
特點
- 簡單:易于使用和配置,提供了簡潔的 API。
- 高可用:支持任務的可靠交付,即使在出現故障時也能保證任務不丟失。
- 分布式:可以在多個節點上分布式執行任務,提高系統的處理能力和可擴展性。
- 多語言支持:雖然主要用 Python 編寫,但也支持其他語言與它進行交互。
架構
- 任務隊列:存儲等待處理的任務,通常使用消息中間件(如
RabbitMQ
、Redis
等)來實現。 - 工作者(Worker):負責從任務隊列中獲取任務并執行。
- 應用(App):定義任務和配置 Celery 的相關設置。
1.安裝配置
安裝 Celery:pip install celery
配置消息中間件:如果使用 Redis 作為消息中間件,則需要在Celery 配置中指定 Redis 作為后端和消息代理。
定義任務:在 Python 代碼中定義需要異步執行的任務函數,使用@app.task
裝飾器將函數標記為 Celery 任務。
新建 celery_tasks.py
文件, 定義了兩個任務
import time
from celery import Celery# 創建Celery應用
app = Celery('celery_app',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0',broker_transport_options={'global_keyprefix': 'celery_tasks:'},result_backend_transport_options={'global_keyprefix': 'celery_tasks:'})# 定義發送消息任務
@app.task
def send_msg(message):print(f"開始發送消息: {message}")time.sleep(3)print(f"發送消息完成: {message}")return message# 定義發送郵箱任務
@app.task
def send_email(email):print(f"開始發送郵箱: {email}")time.sleep(3)print(f"發送郵箱完成: {email}")return email
'celery_app'
:這是應用的名稱broker
:指的是消息代理,其作用是在任務生產者(調用任務的代碼)和任務消費者(Celery 工作進程)之間傳遞任務消息。backend
:是結果后端,用來存儲任務的執行結果。當任務執行完成后,結果會被存儲到結果后端中,之后可以通過任務的 ID 來獲取這些結果
2.啟動工作者
啟動工作者:在命令行中啟動 Celery 工作者,監聽任務隊列并執行任務。
celery -A celery_tasks worker --loglevel=info -P eventlet
-A celery_tasks
: -A 參數指定了包含 Celery 應用實例的模塊名。
Celery 會去加載celery_tasks.py
文件中的app
實例,并啟動一個工作進程來監聽消息隊列。當有新的send_email
任務或者send_email
任務被發送到消息隊列時,工作進程就會執行該任務。worker
: 表示你想要啟動一個 worker 進程來執行任務隊列中的任務。--loglevel=info
: 設置日志級別為 info-P eventlet
: 指定使用 eventlet 作為并發池的實現。默認情況下,Celery 使用的是 prefork(多進程)模型,但在某些環境下,比如 Windows 上,或者當你需要更高效的 I/O 操作時,可以使用 eventlet 或 gevent 這樣的協程庫來代替。Eventlet 提供了基于 green thread 的并發模型,對于 I/O 密集型的任務來說可能更加高效,并且解決了在 Windows 上由于缺少 fork 支持而可能出現的問題。安裝:pip install eventlet
啟動之后redis數據庫會生成這幾個
3.調用任務
調用任務:在其他地方的代碼中,可以通過調用任務函數的delay
或apply_async
方法來將任務發送到消息隊列中。
from celery_tasks import send_msg, send_emailresult_msg = send_msg.delay('Hello, World!')
result_email = send_email.delay('123456@qq.com')print(f"Message task ID: {result_msg.id}")
print(f"Email task ID: {result_email.id}")
Message task ID: ab6ecfcc-ee26-4265-8097-5087786e4659
Email task ID: 1cf3f8f8-4e1a-4d8c-88a5-f2a32356f763
運行完成,獲取到了ID
可以看到Celery worker 是能夠并行處理這兩個任務的
于此同時,redis數據庫多了兩個,我們就可以根據ID取獲取數據