目錄
一、理論
1.Celery
二、實驗
1.Windows11安裝Redis
2.Python3.8環境中配置Celery
三、問題
1.Celery命令報錯
2.執行Celery命令報錯
3.Win11啟動Celery報ValueErro錯誤
?
?
?
一、理論
1.Celery
(1) 概念
?Celery是一個基于python開發的分布式系統,它是簡單、靈活且可靠的,處理大量消息,專注于實時處理的異步任務隊列,同時也支持任務調度。
?
(2) 架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
1)消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等2)任務執行單元
Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中。3)任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等
?
?(3)? 特點
1)簡單
Celery易于使用和維護,并且它不需要配置文件并且配置和使用是比較簡單的2)高可用
當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務3)快速
單個 Celery 進程每分鐘可處理數以百萬計的任務,而保持往返延遲在亞毫秒級4)靈活
Celery幾乎所有部分都可以擴展或單獨使用,各個部分可以自定義。
?
(4)場景
Celery是一個強大的 分布式任務隊列的異步處理框架,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。通常使用它來實現異步任務(async task)和定時任務(crontab)。
1)異步任務
將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等2)定時任務
定時執行某件事情,比如每天數據統計
?
二、實驗
1.Windows11安裝Redis
(1)下載最新版Redis
Redis-x64-xxx.zip壓縮包到D盤,解壓后,將文件夾重新命名為 Redis
(2)查看目錄
D:\Redis>dir
(3)打開一個 cmd 窗口 使用 cd 命令切換目錄到 D:\Redis 運行
redis-server.exe redis.windows.conf
?
(4)把 redis 的路徑加到系統的環境變量
?
(5)另外開啟一個 cmd 窗口,原來的不要關閉,因為先前打開的是redis服務端
?
#切換到 redis 目錄下運行
redis-cli.exe -h 127.0.0.1 -p 6379
(6)檢測連接是否成功
#設置鍵值對
set firstKey 123#取出鍵值對
get firstKey#退出
exit
?
?
(7)ctrl+c 退出先前打開的服務端
(8)注冊Redis服務
#通過 cmd 命令行工具進入 Redis 安裝目錄,將 Redis 服務注冊到 Windows 服務中,執行以下命令
redis-server.exe --service-install redis.windows.conf --loglevel verbose
(9)啟動Redis服務
#執行以下命令啟動 Redis 服務
redis-server --service-start
(10)Redis 已經被添加到 Windows 服務中
(11)打開Redis服務,將啟動類型設置為自動,即可實現開機自啟動
?
2.Python3.8環境中配置Celery
(1) PyCharm安裝celery+redis
#celery是典型的生產者+消費者的模式,生產者生產任務并加入隊列中,消費者取出任務消費。多用于處理異步任務或者定時任務。#第一種方式
pip install celery
pip install redis#第二種方式
pip install -i https://pypi.douban.com/simple celery
pip install -i https://pypi.douban.com/simple redis
?
(2)新建異步任務執行文件celery_task.py.相當于注冊了celery app
# -*- coding: utf-8 -*-
from celery import Celery
import time
app = Celery('demo', backend='redis://localhost:6379/1', broker='redis://localhost:6379/2')
@app.task
def send_email(name):print("向%s發送郵件..."%name)time.sleep(5)print("向%s發送郵件完成"%name)return "ok"
(3)?在項目文件目錄下創建worker消費任務
PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO-------------- celery@node1 v5.3.5 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-11-22 17:26:39
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test:0x1e6fa358550
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. celerypro.celery_task.send_email[2023-11-22 17:26:39,265: WARNING/MainProcess] d:\soft\python38\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
[2023-11-22 20:30:08,249: INFO/MainProcess] mingle: searching for neighbors
[2023-11-22 20:30:15,379: INFO/MainProcess] mingle: all alone
[2023-11-22 20:30:25,608: INFO/MainProcess] celery@node1 ready.
?
(4)ctrl+c 退出
(5)修改celery_task.py文件,增加一個task
# -*- coding: utf-8 -*-
from celery import Celery
import time
app = Celery('demo', backend='redis://localhost:6379/1', broker='redis://localhost:6379/2')
@app.task
def send_email(name):print("向%s發送郵件..."%name)time.sleep(5)print("向%s發送郵件完成"%name)return "ok"
@app.task
def send_msg(name):print("向%s發送短信..."%name)time.sleep(5)print("向%s發送郵件完成"%name)return "ok"
(6)再次在項目文件目錄下創建worker消費任務
PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO-------------- celery@node1 v5.3.5 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-11-22 21:01:43
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: demo:0x29cea446250
- ** ---------- .> transport: redis://localhost:6379/2
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- -------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. celerypro.celery_task.send_email. celerypro.celery_task.send_msg[2023-11-22 21:01:43,381: WARNING/MainProcess] d:\soft\python38\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-23] child process 23988 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-17] child process 16184 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-21] child process 22444 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-27] child process 29480 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-24] child process 5844 calling self.run()
[2023-11-22 21:01:43,631: INFO/SpawnPoolWorker-25] child process 8896 calling self.run()
[2023-11-22 21:01:43,634: INFO/SpawnPoolWorker-29] child process 28068 calling self.run()
[2023-11-22 21:01:43,634: INFO/SpawnPoolWorker-28] child process 18952 calling self.run()
[2023-11-22 21:01:43,636: INFO/SpawnPoolWorker-26] child process 13680 calling self.run()
[2023-11-22 21:01:43,638: INFO/SpawnPoolWorker-31] child process 25472 calling self.run()
[2023-11-22 21:01:43,638: INFO/SpawnPoolWorker-30] child process 28688 calling self.run()
[2023-11-22 21:01:43,638: INFO/SpawnPoolWorker-32] child process 10072 calling self.run()
[2023-11-22 21:01:45,401: INFO/MainProcess] Connected to redis://localhost:6379/2
[2023-11-22 21:01:45,401: WARNING/MainProcess] d:\soft\python38\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.warnings.warn([2023-11-22 21:01:49,477: INFO/MainProcess] mingle: searching for neighbors
[2023-11-22 21:01:56,607: INFO/MainProcess] mingle: all alone
[2023-11-22 21:02:04,753: INFO/MainProcess] celery@node1 ready.
(6)ctrl+c 退出創建執行任務文件produce_task.py
# -*- coding: utf-8 -*-
from celerypro.celery_task import send_email,send_msg
result = send_email.delay("david")
print(result.id)
result2 = send_msg.delay("mao")
print(result2.id)
?
(7)運行produce_task.py
(8)同時取到id值
(9)如遇到報錯需要安裝包 eventlet
PS D:\soft\pythonProject> pip install eventlet
(10)重新在項目文件目錄下創建worker消費任務
PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO -P eventlet-------------- celery@node1 v5.3.5 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-11-22 21:29:34
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: demo:0x141511962e0
- ** ---------- .> transport: redis://localhost:6379/2
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 32 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. celerypro.celery_task.send_email. celerypro.celery_task.send_msgr_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.warnings.warn([2023-11-22 21:29:48,022: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/2.
[2023-11-22 21:29:52,117: INFO/MainProcess] celery@node1 ready.
(11)?運行produce_task.py
(12)生成id
(13)查看任務消息
[2023-11-22 21:30:35,194: INFO/MainProcess] Task celerypro.celery_task.send_email[c1a473d5-49ac-4468-9370-19226f377e00] received
[2023-11-22 21:30:35,195: WARNING/MainProcess] 向david發送郵件...
[2023-11-22 21:30:35,197: INFO/MainProcess] Task celerypro.celery_task.send_msg[de30d70b-9110-4dfb-bcfd-45a61403357f] received
[2023-11-22 21:30:35,198: WARNING/MainProcess] 向mao發送短信...
[2023-11-22 21:30:40,210: WARNING/MainProcess] 向david發送郵件完成
[2023-11-22 21:30:40,210: WARNING/MainProcess] 向mao發送郵件完成
[2023-11-22 21:30:42,270: INFO/MainProcess] Task celerypro.celery_task.send_msg[de30d70b-9110-4dfb-bcfd-45a61403357f] succeeded in 7.063000000001921s: 'ok'
[2023-11-22 21:30:42,270: INFO/MainProcess] Task celerypro.celery_task.send_email[c1a473d5-49ac-4468-9370-19226f377e00] succeeded in 7.063000000001921s: 'ok'
(14)創建py文件:result.py,查看任務執行結果
取第2個id:de30d70b-9110-4dfb-bcfd-45a61403357f
# -*- coding: utf-8 -*-
from celery.result import AsyncResult
from celerypro.celery_task import app
async_result = AsyncResult(id="de30d70b-9110-4dfb-bcfd-45a61403357f", app=app)
if async_result.successful():result = async_result.get()print(result)
elif async_result.failed():print('執行失敗')
elif async_result.status == 'PENDING':print('任務等待中被執行')
elif async_result.status == 'RETRY':print('任務異常后正在重試')
elif async_result.status == 'STARTED':print('任務已經開始被執行')
(15) 運行result.py文件
(16)輸出ok
?
三、問題
1.Celery命令報錯
(1)報錯
(2)原因分析
celery版本不同命令不同。
查看幫助命令
PS D:\soft\pythonProject> celery --help
Usage: celery [OPTIONS] COMMAND [ARGS]...Celery command entrypoint.Options:-A, --app APPLICATION-b, --broker TEXT--result-backend TEXT--loader TEXT--config TEXT--workdir PATH-C, --no-color-q, --quiet--version--skip-checks Skip Django core checks on startup. Setting theSKIP_CHECKS environment variable to any non-emptystring will have the same effect.--help Show this message and exit.Commands:amqp AMQP Administration Shell.beat Start the beat periodic task scheduler.call Call a task by name.control Workers remote control.events Event-stream utilities.graph The ``celery graph`` command.inspect Inspect the worker at runtime.list Get info from broker.logtool The ``celery logtool`` command.migrate Migrate tasks from one broker to another.multi Start multiple worker instances.purge Erase all messages from all known task queues.report Shows information useful to include in bug-reports.result Print the return value for a given task id.shell Start shell session with convenient access to celery symbols.status Show list of workers that are online.upgrade Perform upgrade between versions.worker Start worker instance.
PS D:\soft\pythonProject> celery worker --help
Usage: celery worker [OPTIONS]Start worker instance.Examples--------$ celery --app=proj worker -l INFO$ celery -A proj worker -l INFO -Q hipri,lopri$ celery -A proj worker --concurrency=4$ celery -A proj worker --concurrency=1000 -P eventlet$ celery worker --autoscale=10,0Worker Options:-n, --hostname HOSTNAME Set custom hostname (e.g., 'w1@%%h').Expands: %%h (hostname), %%n (name) and %%d,(domain).-D, --detach Start worker as a background process.-S, --statedb PATH Path to the state database. The extension'.db' may be appended to the filename.-l, --loglevel [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]Logging level.-O, --optimization [default|fair]Apply optimization profile.--prefetch-multiplier <prefetch multiplier>Set custom prefetch multiplier value forthis worker instance.Pool Options:-c, --concurrency <concurrency>Number of child processes processing thequeue. The default is the number of CPUsavailable on your system.-P, --pool [prefork|eventlet|gevent|solo|processes|threads|custom]Pool implementation.-E, --task-events, --events Send task-related events that can becaptured by monitors like celery events,celerymon, and others.--time-limit FLOAT Enables a hard time limit (in secondsint/float) for tasks.--soft-time-limit FLOAT Enables a soft time limit (in secondsint/float) for tasks.--max-tasks-per-child INTEGER Maximum number of tasks a pool worker canexecute before it's terminated and replacedby a new worker.--max-memory-per-child INTEGER Maximum amount of resident memory, in KiB,that may be consumed by a child processbefore it will be replaced by a new one. Ifa single task causes a child process toexceed this limit, the task will becompleted and the child process will bereplaced afterwards. Default: no limit.--scheduler TEXTDaemonization Options:-f, --logfile TEXT Log destination; defaults to stderr--pidfile TEXT--uid TEXT--gid TEXT--umask TEXT--executable TEXTOptions:--help Show this message and exit.
(3)解決方法
修改命令
PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO
成功
?
2.執行Celery命令報錯
(1)報錯
AttributeError: 'NoneType' object has no attribute 'Redis'
?
(2)原因分析
PyCharm未安裝redis插件。
(3)解決方法
安裝redis插件
?
3.Win11啟動Celery報ValueErro錯誤
(1)報錯
Windows 在開發 Celery 異步任務,通過命令?celery --app=celerypro.celery_task worker -n node1 -l INFO?
啟動 Celery 服務后正常;
但在使用 delay() 調用任務時會出現以下報錯信息:
Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
?
(2)原因分析
PyCharm未安裝eventlet
(3)解決方法
安裝包 eventlet
pip install eventlet
通過以下命令啟動服務
celery --app=celerypro.celery_task worker -n node1 -l INFO -P eventlet
?
?