pycharm 運行celery_Celery全面學習筆記

來源

介紹

Celery 是 Distributed Task Queue,分布式任務隊列。分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作。

Celery 核心模塊

Celery有一下5個核心角色

Task

就是任務,有異步任務和定時任務

Broker

中間人,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker。Celery本身不提供隊列服務,推薦用Redis或RabbitMQ實現隊列服務。

Worker

執行任務的單元,它實時監控消息隊列,如果有任務就獲取任務并執行它。

Beat

定時任務調度器,根據配置定時將任務發送給Broler。

Backend

用于存儲任務的執行結果。

各個角色間的關系看下面這張圖理解一下:

Celery 全面學習筆記

安裝

Celery4.x 開始不再支持Windows平臺了。3.1.26是最后目前最新的3.x版本,下面裝的是3.1.25。

pip install celery

pip install celery==3.1.25

建議使用的Broker只有RabbitMQ和redis這兩個。RabbitMQ只要準備好服務,不需要安裝額外的模塊。

如果要用redis,那么還要準備redis服務,以及安裝redis模塊:

pip install redis

上面的安裝也可以用下面的命令把redis一起裝上:

pip install -U 'celery[redis]'

驗證

使用命令 celery --version 查看版本,順便驗證:

>celery --version

'celery' 不是內部或外部命令,也不是可運行的程序

或批處理文件。

這里報錯是因為沒有把celery加到環境變量里,所以找不到程序。不過我也不想加,所以把路徑打全也好了:

>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery --version

3.1.25 (Cipater)

基本操作

這里跑一個簡單的任務,最后再獲取到任務的執行結果。

創建任務

先按下面寫一段代碼:

# task1.py

from celery import Celery

# 創建Celery實例

app = Celery('tasks',

broker='redis://192.168.246.11:6379/0',

)

# 創建任務

@app.task

def add(x, y):

print("計算2個值的和: %s %s" % (x, y))

return x+y

如果使用RabbitMQ,則把broker修改成這個 broker='amqp://192.168.3.108' 。

啟動Worker

啟動Celery Worker來開始監聽并執行任務:

$ celery -A task1 worker --loglevel=info

$ celery -A task1 worker --l debug # 或者可以這么起

參數 -A 后跟的是Celery實例,實例的名字可以省略,寫全是 task1.app 。你要把目錄切換到task1文件所在的目錄執行命令,或者看看有沒有參數能把文件目錄加到python的環境變量中去。因為-A 之后的參數是作為python的模塊來導入的。所以像下面這樣,我也把Worker跑起來了:

G:\>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A Steed.Documents.PycharmProjects.Celery.task1 worker --loglevel=info

[2018-09-28 17:55:10,715: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:

Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers

the ability to execute any command. It's important to secure

your broker from unauthorized access when using pickle, so we think

that enabling pickle should require a deliberate action and not be

the default choice.

If you depend on pickle then you should set a setting to disable this

warning and to be sure that everything will continue working

when you upgrade to Celery 3.2::

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

-------------- celery@IDX-xujf v3.1.25 (Cipater)

---- **** -----

--- * *** * -- Windows-10-10.0.17134-SP0

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: tasks:0x1fb5056fda0

- ** ---------- .> transport: redis://192.168.246.11:6379/0

- ** ---------- .> results: disabled://

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ----

--- ***** ----- [queues]

-------------- .> celery exchange=celery(direct) key=celery

[tasks]

. Steed.Documents.PycharmProjects.Celery.task1.add

[2018-09-28 17:55:10,864: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0

[2018-09-28 17:55:10,922: INFO/MainProcess] mingle: searching for neighbors

[2018-09-28 17:55:11,961: INFO/MainProcess] mingle: all alone

[2018-09-28 17:55:11,980: WARNING/MainProcess] celery@IDX-xujf ready.

調用任務

要給Worker發送任務,需要調用 delay() 方法,下面是在IDLE上操作的:

>>> import sys

>>> dir = r"G:\Steed\Documents\PycharmProjects\Celery"

>>> sys.path.append(dir) # 我的任務文件不在環境變量里,IDLE找不到

>>> from task1 import add

>>> add.delay(1, 2)

>>>

Worker顯示了下面這些信息

[2018-09-29 11:10:33,103: INFO/MainProcess] Received task: task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a]

[2018-09-29 11:10:33,107: WARNING/Worker-1] 計算2個值的和: 1 2

[2018-09-29 11:10:33,109: INFO/MainProcess] Task task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] succeeded in 0s: 3

上面只是一個發送任務的調用,結果是拿不到的。上面也沒有接收返回值,這次把返回值保存到起來:

>>> t = add.delay(3, 4)

>>> type(t) # 查看返回值的類型

>>> t.get() # 這句會報錯

Traceback (most recent call last):

File "", line 1, in

t.get()

File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\result.py", line 169, in get

no_ack=no_ack,

File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\backends\base.py", line 616, in _is_disabled

'No result backend configured. '

NotImplementedError: No result backend configured. Please see the documentation for more information.

這里是實例化的時候,沒有定義backend,就是保存任務結果的位置。

獲取返回結果

修改最初的任務的代碼,在實例化的時候加上backend參數,指定保存任務結果的位置。這里把結果也存到同一個redis里:

from celery import Celery

app = Celery('tasks',

broker='redis://192.168.246.11',

backend='redis://192.168.246.11', # 這次把端口號什么的都省了

)

@app.task

def add(x, y):

print("計算2個值的和: %s %s" % (x, y))

return x+y

然后要重啟Worker,IDLE也要重啟,現在可以獲取到任務的返回結果了:

>>> t = add.delay(1, 1)

>>> t.get()

2

>>>

如果是RabbitMQ,則app的初始話設置就這么寫:

app = Celery('tasks',

broker='amqp://192.168.3.108',

backend='rpc://192.168.3.108', # 新版本rpc將初步替代amqp,用的還是RabbitMQ

# backend='amqp://192.168.3.108', # 如果是舊版本,沒有rpc,那只能用amqp

)

其他操作

get進入阻塞

上面的任務執行的太快了,準備一個需要執行一段時間的任務:

import time

@app.task

def upper(v):

for i in range(10):

time.sleep(1)

print(i)

return v.upper()

用get調用任務會進入阻塞,直到任務返回結果,這樣就沒有異步的效果了:

>>> t = upper.delay("abc")

>>> t.get()

'ABC'

ready獲取任務是否完成,不阻塞

ready()方法可以返回任務是否執行完成,等到返回True了再去get,馬上能拿到結果:

>>> t = upper.delay("abcd")

>>> t.ready()

False

>>> t.ready()

False

>>> t.ready()

False

>>> t.ready()

True

>>> t.get()

'ABCD'

>>

get設置超時時間

還可以給get設置一個超時時間,如果超時,會拋出異常:

>>> t = upper.delay("abcde")

>>> t.get(timeout=11)

'ABCDE'

>>> t = upper.delay("abcde")

>>> t.get(timeout=1)

Traceback (most recent call last):

File "", line 1, in

t.get(timeout=1)

File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 169, in get

no_ack=no_ack,

File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\backends\base.py", line 238, in wait_for

raise TimeoutError('The operation timed out.')

celery.exceptions.TimeoutError: The operation timed out.

>>>

任務報錯

如果任務執行報錯,比如執行這個任務:

>>> t = upper.delay(123)

>>>

那么Worker那邊會顯示錯誤的內容:

[2018-09-29 12:57:07,077: ERROR/MainProcess] Task task1.upper[11820ee6-6936-4680-93c2-462487ec927e] raised unexpected: AttributeError("'int' object has no attribute 'upper'",)

Traceback (most recent call last):

File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 240, in trace_task

R = retval = fun(*args, **kwargs)

File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 438, in __protected_call__

return self.run(*args, **kwargs)

File "G:\Steed\Documents\PycharmProjects\Celery\task1.py", line 25, in upper

return v.upper()

AttributeError: 'int' object has no attribute 'upper'

然后再get結果的時候,會把這個錯誤作為異常拋出,這樣很不友好:

>>> t = upper.delay(123)

>>> t.get()

Traceback (most recent call last):

File "", line 1, in

t.get()

File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 175, in get

raise meta['result']

AttributeError: 'int' object has no attribute 'upper'

>>>

get設置只獲取錯誤結果,不觸發異常

>>> t.get(propagate=False)

AttributeError("'int' object has no attribute 'upper'",)

>>>

traceback 里面存著錯誤信息

>>> t.traceback

'Traceback (most recent call last):\n File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 240, in trace_task\n R = retval = fun(*args, **kwargs)\n File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 438, in __protected_call__\n return self.run(*args, **kwargs)\n File "G:\\Steed\\Documents\\PycharmProjects\\Celery\\task1.py", line 25, in upper\n return v.upper()\nAttributeError: \'int\' object has no attribute \'upper\'\n'

>>>

小結

啟動Celery Worker來開始監聽并執行任務

$ celery -A tasks worker --loglevel=info

調用任務

>>> from tasks import add

>>> t = add.delay(4, 4)

同步拿結果

>>> t.get()

>>> t.get(timeout=1)

檢查任務是否完成

>>> t.ready()

如果出錯,獲取錯誤結果,不觸發異常

>>> t.get(propagate=False)

>>> t.traceback # 打印異常詳細結果

在項目中使用Celery

可以把celery配置成一個應用,假設應用名字是CeleryPro,目錄格式如下:

CeleryPro

├─__init.py

├─celery.py

├─tasks.py

這里的連接文件命名必須為celery.py,其他名字隨意

celery文件

這個文件名必須是celery.py:

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery('CeleryPro',

broker='redis://192.168.246.11',

backend='redis://192.168.246.11',

include=['CeleryPro.tasks'])

# Optional configuration, see the application user guide.

app.conf.update(

result_expires=3600,

)

if __name__ == '__main__':

app.start()

第一句 from __future__ import absolute_import, unicode_literals ,后面的unicode_literals不知道是什么。不過前面的absolute_import是絕對引入。因為這個文件的文件名就是celery,所以默認后面的 form celery 是引入這個文件,但我們實際需要的是引入celery模塊,所以用了絕對引入這個模塊。如果要引入這個文件,可以這么寫 from .celery ,加個點,下面的tasks里會用到

tasks文件

這個文件開始兩行就多了一個點,這里要導入上面的celery.py文件。后面只要寫各種任務加上裝飾器就可以了:

from __future__ import absolute_import, unicode_literals

from .celery import app

import time

@app.task

def add(x, y):

print("計算2個值的和: %s %s" % (x, y))

return x+y

@app.task

def upper(v):

for i in range(10):

time.sleep(1)

print(i)

return v.upper()

啟動worker

啟動的時候,-A 參數后面用應用名稱 CeleryPro 。你還需要cd到你CeleryPro的父級目錄上啟動,否則找不到:

>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A CeleryPro worker -l info

[2018-09-29 15:06:20,818: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:

Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers

the ability to execute any command. It's important to secure

your broker from unauthorized access when using pickle, so we think

that enabling pickle should require a deliberate action and not be

the default choice.

If you depend on pickle then you should set a setting to disable this

warning and to be sure that everything will continue working

when you upgrade to Celery 3.2::

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

-------------- celery@IDX-xujf v3.1.25 (Cipater)

---- **** -----

--- * *** * -- Windows-10-10.0.17134-SP0

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: CeleryPro:0x21deadaf470

- ** ---------- .> transport: redis://192.168.246.11:6379//

- ** ---------- .> results: redis://192.168.246.11/

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ----

--- ***** ----- [queues]

-------------- .> celery exchange=celery(direct) key=celery

[tasks]

. CeleryPro.tasks.add

. CeleryPro.tasks.upper

[2018-09-29 15:06:20,953: INFO/MainProcess] Connected to redis://192.168.246.11:6379//

[2018-09-29 15:06:20,983: INFO/MainProcess] mingle: searching for neighbors

[2018-09-29 15:06:21,994: INFO/MainProcess] mingle: all alone

[2018-09-29 15:06:22,055: WARNING/MainProcess] celery@IDX-xujf ready.

各種啟動的姿勢

這里注意用的都是CeleryPro:

celery -A CeleryPro worker -loglevel=info # 前臺啟動不推薦

celery -A CeleryPro worker -l info # 前臺啟動簡寫

celery multi start w1 -A CeleryPro -l info # 推薦用后臺啟動

調用任務

調用任務也是在CeleryPro的父級目錄下調用就好了,各種用法都一樣。

操作都要在CeleryPro的父級目錄下執行,就是說只要保證CeleryPro的父級目錄在環境變量里。或者用 sys.path.append() 加到環境變量里去。

這里理解為把celery包裝成了你項目里的一個應用,應用的內容都放在了CeleryPro這個文件夾下。而CeleryPro就作為你的項目里的一個模塊。而你項目的主目錄一定在項目啟動的時候加到環境變量里的,所以其實這樣包裝好之后再項目里使用應該很方便。

后臺啟動多個Worker

啟動命令:

celery -A 項目名 worker -loglevel=info : 前臺啟動命令

celery multi start w1 -A 項目名 -l info : 后臺啟動命令

celery multi restart w1 -A 項目名 -l info : 后臺重啟命令

celery multi stop w1 -A 項目名 -l info : 后臺停止命令

前后臺的區別:后臺是通過mult啟動的。

w1是worker的名稱,可以后臺啟動多個worker,每個worker有一個一名稱。

即便是所有的worker都已經done了,用戶任然啟動了任務,所有的任務都會保留,直到有worker來執行并返回結果。

如果前臺啟動的worker斷開了,那么worker的任務會消失;如果后臺啟動的worker斷開了,后臺的任務仍然在。沒太理解這句的意思。

查看當前還有多少個Celery的worker

似乎也就只能通過ps來查看了,下面先起了3個后臺Worker,ps看一下,然后停掉了一個Worker,再用ps看了一下:

[root@Python3 ~]# celery multi start w1 -A CeleryPro -l info

celery multi v4.2.1 (windowlicker)

> Starting nodes...

> w1@Python3: OK

[root@Python3 ~]# celery multi start w2 -A CeleryPro -l info

celery multi v4.2.1 (windowlicker)

> Starting nodes...

> w2@Python3: OK

[root@Python3 ~]# celery multi start w3 -A CeleryPro -l info

celery multi v4.2.1 (windowlicker)

> Starting nodes...

> w3@Python3: OK

[root@Python3 ~]# ps -ef | grep celery

root 1346 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3

root 1350 1346 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3

root 1360 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3

root 1364 1360 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3

root 1374 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3

root 1378 1374 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3

root 1391 1251 0 20:55 pts/0 00:00:00 grep --color=auto celery

[root@Python3 ~]# celery multi stop w1

celery multi v4.2.1 (windowlicker)

> Stopping nodes...

> w1@Python3: TERM -> 1346

[root@Python3 ~]# ps -ef | grep celery

root 1360 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3

root 1364 1360 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3

root 1374 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3

root 1378 1374 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3

root 1398 1251 0 20:57 pts/0 00:00:00 grep --color=auto celery

[root@Python3 ~]#

Windows平臺不支持

錯誤信息如下:

File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\platforms.py", line 429, in detached

raise RuntimeError('This platform does not support detach.')

RuntimeError: This platform does not support detach.

> w1@IDX-xujf: * Child terminated with errorcode 1

FAILED

根據錯誤信息查看一下429行的代碼:

if not resource:

raise RuntimeError('This platform does not support detach.')

這里判斷了一下resource,然后就直接拋出異常了。resource具體是什么,可以在這個文件里搜索一下變量名(resource)

# 在開頭獲取了這個resource的值

resource = try_import('resource')

# 上面的try_import方法,在另外一個文件里

def try_import(module, default=None):

"""Try to import and return module, or return

None if the module does not exist."""

try:

return importlib.import_module(module)

except ImportError:

return default

# 下面有一個方法注釋里表明resource為None代表是Windows

def get_fdmax(default=None):

"""Return the maximum number of open file descriptors

on this system.

:keyword default: Value returned if there's no file

descriptor limit.

"""

try:

return os.sysconf('SC_OPEN_MAX')

except:

pass

if resource is None: # Windows

return default

fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]

if fdmax == resource.RLIM_INFINITY:

return default

return fdmax

上面做的就是要嘗試導入一個模塊 “resource” 。該模塊只用于Unix。

定時任務

3版本的定時任務和4版本還是有很大差別的。另外4版本里有更多的定時任務。

Celery3

繼續使用之前的2個任務,只需要為celery添加一些配置(conf),為任務設置計劃。

app.conf里的參數都是全大寫的,這里大小寫敏感,不能用小寫:

# CeleryPro/tasks.py

from __future__ import absolute_import, unicode_literals

from .celery import app

import time

@app.task

def add(x, y):

print("計算2個值的和: %s %s" % (x, y))

return x+y

@app.task

def upper(v):

for i in range(10):

time.sleep(1)

print(i)

return v.upper()

# CeleryPro/celery.py

from __future__ import absolute_import, unicode_literals

from celery import Celery

from celery.schedules import crontab

from datetime import timedelta

app = Celery('CeleryPro',

broker='redis://192.168.246.11',

backend='redis://192.168.246.11',

include=['CeleryPro.tasks'])

app.conf.CELERYBEAT_SCHEDULE = {

'add every 10 seconds': {

'task': 'CeleryPro.tasks.add',

'schedule': timedelta(seconds=10), # 可以用timedelta對象

# 'schedule': 10, # 也支持直接用數字表示秒數

'args': (1, 2)

},

'upper every 2 minutes': {

'task': 'CeleryPro.tasks.upper',

'schedule': crontab(minute='*/2'),

'args': ('abc', ),

},

}

# app.conf.CELERY_TIMEZONE = 'UTC'

app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'

# Optional configuration, see the application user guide.

app.conf.update(

CELERY_TASK_RESULT_EXPIRES=3600,

)

if __name__ == '__main__':

app.start()

任務結果過期設置 `CELERY_TASK_RESULT_EXPIRES=3600' 。默認設置是1天,官網介紹這是靠一個內置的周期性任務把超過時限的任務結果給清除的。

A built-in periodic task will delete the results after this time (celery.task.backend_cleanup).

設置完成后,啟動Worker,啟動Beat就OK了:

G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro worker -l info

G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro beat -l info

Celery4

新版的好處是,可以把定時任務和普通的任務一樣單獨定義了。多了 @app.on_after_configure.connect 這個裝飾器,3版本是沒有這個裝飾器的。

寫代碼

單獨再創建一個py文件,存放定時任務:

# CeleryPro/periodic4.py

from __future__ import absolute_import, unicode_literals

from .celery import app

from celery.schedules import crontab

@app.on_after_configure.connect

def setup_periodic_tasks(sender, **kwargs):

# 每10秒執行一次

sender.add_periodic_task(10.0, hello.s(), name='hello every 10') # 給任務取個名字

# 每30秒執行一次

sender.add_periodic_task(30, upper.s('abcdefg'), expires=10) # 設置任務超時時間10秒

# 執行周期和Linux的計劃任務crontab設置一樣

sender.add_periodic_task(

crontab(hour='*', minute='*/2', day_of_week='*'),

add.s(11, 22),

)

@app.task

def hello():

print('Hello World')

@app.task

def upper(arg):

return arg.upper()

@app.task

def add(x, y):

print("計算2個值的和: %s %s" % (x, y))

return x+y

上面一共定了3個計劃。

name參數給計劃取名,這樣這個任務報告的時候就會使用name的值,像這樣:hello every 10。否則默認顯示的是調用函數的命令,像這樣:CeleryPro.periodic4.upper('abcdefg')。

expires參數設置任務超時時間,超時未完成,可能就放棄了(沒測試)。

修改一下之前的celery.py文件,把新寫的任務文件添加到include的列表里。順便我這里改用RabbitMQ玩一下:

# CeleryPro/celery.py

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery('CeleryPro',

broker='amqp://192.168.3.108',

backend='rpc',

include=['CeleryPro.tasks', 'CeleryPro.periodic4'])

app.conf.timezone = 'UTC' # 計劃任務默認用的是UTC時間

# app.conf.timezone = 'Asia/Shanghai' # 也可以更改為北京時間

# Optional configuration, see the application user guide.

app.conf.update(

result_expires=3600,

)

if __name__ == '__main__':

app.start()

啟動worker

啟動方法和之前一樣:

[root@Python3 ~]# celery -A CeleryPro worker -l info

/usr/local/lib/python3.6/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is

absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

uid=uid, euid=euid, gid=gid, egid=egid,

-------------- celery@Python3 v4.2.1 (windowlicker)

---- **** -----

--- * *** * -- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-01 12:46:35

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: CeleryPro:0x7ffb0c8b2908

- ** ---------- .> transport: amqp://guest:**@192.168.3.108:5672//

- ** ---------- .> results: rpc://

- *** --- * --- .> concurrency: 1 (prefork)

-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)

--- ***** -----

-------------- [queues]

.> celery exchange=celery(direct) key=celery

[tasks]

. CeleryPro.periodic4.add

. CeleryPro.periodic4.hello

. CeleryPro.periodic4.upper

. CeleryPro.tasks.add

. CeleryPro.tasks.upper

[2018-10-01 12:46:35,187: INFO/MainProcess] Connected to amqp://guest:**@192.168.3.108:5672//

[2018-10-01 12:46:35,216: INFO/MainProcess] mingle: searching for neighbors

[2018-10-01 12:46:36,266: INFO/MainProcess] mingle: all alone

[2018-10-01 12:46:36,307: INFO/MainProcess] celery@Python3 ready.

啟動后看一下[tasks],新加的定時任務已經列出來了,之前的任務也都在。

啟動Beat

這里-A后面要寫全 CeleryPro.periodic4 ,和啟動Worker的參數有點不一樣:

[root@Python3 ~]# celery -A CeleryPro.periodic4 beat -l info

celery beat v4.2.1 (windowlicker) is starting.

__ - ... __ - _

LocalTime -> 2018-10-01 12:45:04

Configuration ->

. broker -> amqp://guest:**@192.168.3.108:5672//

. loader -> celery.loaders.app.AppLoader

. scheduler -> celery.beat.PersistentScheduler

. db -> celerybeat-schedule

. logfile -> [stderr]@%INFO

. maxinterval -> 5.00 minutes (300s)

[2018-10-01 12:45:04,934: INFO/MainProcess] beat: Starting...

[2018-10-01 12:45:05,006: INFO/MainProcess] Scheduler: Sending due task hello every 10 (CeleryPro.periodic4.hello)

[2018-10-01 12:45:05,356: INFO/MainProcess] Scheduler: Sending due task CeleryPro.periodic4.upper('abcdefg') (CeleryPro.periodic4.upper)

啟動之后馬上就把2個每隔一段時間執行的任務發送給Worker執行了,之后會根據定義的間隔繼續發送。

另外一個用crontab設置的任務需要等到時間匹配上了才會發送。當時是45分,等到46分就會執行了。

舊版本的做法一樣可以用

上面說了,新版主要是多提供了一個裝飾器。不用新提供的裝飾器,依然可以把定時任務寫在配置里:

# CeleryPro/celery.py

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery('CeleryPro',

broker='amqp://192.168.3.108',

backend='rpc',

include=['CeleryPro.tasks'])

app.conf.beat_schedule = {

'every 5 seconds': {

'task': 'CeleryPro.tasks.upper',

'schedule': 5,

'args': ('xyz',)

}

}

# Optional configuration, see the application user guide.

app.conf.update(

result_expires=3600,

)

if __name__ == '__main__':

app.start()

這里就是在配置里設置,定時啟動一個普通任務。這里把include里的CeleryPro.periodic4刪掉了,留著也沒影響。

任務文件tasks.py還是之前的那個,具體如下:

# CeleryPro/tasks.py

from __future__ import absolute_import, unicode_literals

from .celery import app

import time

@app.task

def add(x, y):

print("計算2個值的和: %s %s" % (x, y))

return x+y

@app.task

def upper(v):

for i in range(10):

time.sleep(1)

print(i)

return v.upper()

最后啟動Worker,啟動Breat試一下:

[root@Python3 ~]# celery -A CeleryPro beat -l info

這里Beat的-A參數用 CeleryPro 也能啟動這里的定時任務。CeleryPro.tasks 效果也是一樣的。另外如果把periodic4.py加到include列表里去,用 CeleryPro.periodic4 參數啟動的話,這里的定時任務也會啟動。

這里也是支持用crontab的,用法和之前的一樣,把schedule參數的值換成調用crontab的函數。

小結

上面的兩種定時任務的方法,各有應用場景。

如果要改任務執行的函數,只能改代碼,然后重啟Worker了。

這里要說的是改計劃(包括新增、取消和修改計劃周期),但是任務執行的函數不變。用@app.on_after_configure.connect裝飾器,是把計劃寫死在一個函數里了。似乎無法動態添加新任務。不過好處是結構比較清晰。

而后一種方法,只要更新一下 app.conf.beat_schedule 這個字典里的配置信息,然后重啟Beat就能生效了。

crontab 舉例

下面是crontab的一些例子:

Example

Meaning

crontab()

Execute every minute.

crontab(minute=0, hour=0)

Execute daily at midnight.

crontab(minute=0, hour='*/3')

Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.

crontab(minute=0,hour='0,3,6,9,12,15,18,21')

Same as previous.

crontab(minute='*/15')

Execute every 15 minutes.

crontab(day_of_week='sunday')

Execute every minute (!) at Sundays.

crontab(minute='',hour='', day_of_week='sun')

Same as previous.

crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri')

Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.

crontab(minute=0, hour='/2,/3')

Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm

crontab(minute=0, hour='*/5')

Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).

crontab(minute=0, hour='*/3,8-17')

Execute every hour divisible by 3, and every hour during office hours (8am-5pm).

crontab(day_of_month='2')

Execute on the second day of every month.

crontab(day_of_month='2-30/3')

Execute on every even numbered day.

crontab(day_of_month='1-7,15-21')

Execute on the first and third weeks of the month.

crontab(day_of_month='11',month_of_year='5')

Execute on 11th of May every year.

crontab(month_of_year='*/3')

Execute on the first month of every quarter.

日程表(Solar schedules)

4版本里還提供這樣的方法來指定計劃

If you have a task that should be executed according to sunrise, sunset, dawn or dusk, you can use the solar schedule type:

如果你有一個任務,是根據日出,日落,黎明或黃昏來執行的,你可以使用日程表類型:

所有事件都是根據UTC時間計算的,所以不受時區設置影響。官網的例子:

from celery.schedules import solar

app.conf.beat_schedule = {

# Executes at sunset in Melbourne

'add-at-melbourne-sunset': {

'task': 'tasks.add',

'schedule': solar('sunset', -37.81753, 144.96715),

'args': (16, 16),

},

}

這里solar函數要提供3個參數,事件、緯度、經度。經緯度使用的標志看下表:

Sign

Argument

Meaning

+

latitude

North

-

latitude

South

+

longitude

East

-

longitude

West

支持的事件類型如下:

Event

Meaning

dawn_astronomical

Execute at the moment after which the sky is no longer completely dark. This is when the sun is 18 degrees below the horizon.

dawn_nautical

Execute when there’s enough sunlight for the horizon and some objects to be distinguishable; formally, when the sun is 12 degrees below the horizon.

dawn_civil

Execute when there’s enough light for objects to be distinguishable so that outdoor activities can commence; formally, when the Sun is 6 degrees below the horizon.

sunrise

Execute when the upper edge of the sun appears over the eastern horizon in the morning.

solar_noon

Execute when the sun is highest above the horizon on that day.

sunset

Execute when the trailing edge of the sun disappears over the western horizon in the evening.

dusk_civil

Execute at the end of civil twilight, when objects are still distinguishable and some stars and planets are visible. Formally, when the sun is 6 degrees below the horizon.

dusk_nautical

Execute when the sun is 12 degrees below the horizon. Objects are no longer distinguishable, and the horizon is no longer visible to the naked eye.

dusk_astronomical

Execute at the moment after which the sky becomes completely dark; formally, when the sun is 18 degrees below the horizon.

在Django中使用的最佳實踐

在django中使用的話,可以把celery的配置直接寫在django的settings.py文件里。另外任務函數則寫在tasks.py文件里放在各個app的目錄下。每個app下都可以有一個tasks.py,所有的任務都是共享的。

創建目錄結構

創建一個django的項目,項目名稱就叫UsingCeleryWithDjango,app的名字就app01好了。創建好項目后,在項目目錄下創建CeleryPro目錄,目錄下建一個celery.py文件。目錄結構如下:

UsingCeleryWithDjango

├─manage.py

├─app01

│ │ admin.py

│ │ apps.py

│ │ models.py

│ │ tests.py

│ │ views.py

│ └ __init__.py

├─CeleryPro

│ │ celery.py

│ └ __init__.py

├─templates

└─UsingCeleryWithDjango

│ settings.py

│ urls.py

│ wsgi.py

└ __init__.py

上面只要關注一下CeleryPro的結構和位置就好了,其他都是創建django項目后的默認內容。

CeleryPro/celery.py 文件,是用來創建celery實例的。

CeleryPro/init.py 文件,需要確保當Django啟動時加載celery。之后在app里會用到celery模塊里的 @shared_task 這個裝飾器。

CeleryPro 示例代碼

# UsingCeleryWithDjango/CeleryPro/__init__.py

from __future__ import absolute_import, unicode_literals

__author__ = '749B'

# This will make sure the app is always imported when

# Django starts so that shared_task will use this app.

from .celery import app as celery_app

__all__ = ('celery_app',)

# UsingCeleryWithDjango/CeleryPro/celery.py

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'UsingCeleryWithDjango.settings')

from django.conf import settings # noqa

app = Celery('CeleryPro')

# Using a string here means the worker will not have to

# pickle the object when using Windows.

app.config_from_object('django.conf:settings')

# 自動發現所有app下的tasks

# 但是,新版django的INSTALLED_APPS的寫法無法發現到

# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 這是官方示例的寫法

'''

# 這里是setting.py里的INSTALLED_APPS部分

INSTALLED_APPS = [

'django.contrib.admin',

'django.contrib.auth',

'django.contrib.contenttypes',

'django.contrib.sessions',

'django.contrib.messages',

'django.contrib.staticfiles',

'app01.apps.App01Config', # 這種寫法自動發現找不到tasks

# 'app01', # 這種寫法就能自動發現

]

'''

# 或者不想改settings.INSTALLED_APPS,那就自己把app的列表寫在一個列表里作為參數吧

app.autodiscover_tasks(['app01']) # 這里我就這么

@app.task(bind=True)

def debug_task(self):

print('Request: {0!r}'.format(self.request))

這里有個坑,我寫了一段注釋,寫的應該比較清楚了。

任務文件 tasks

在app下創建tasks.py文件(和models.py文件同一級目錄),創建任務。

- app01/

- app01/tasks.py

- app01/models.py

tasks.py文件里創建的函數用的是 @shared_task 這個裝飾器。這些任務是所有app共享的。

# UsingCeleryWithDjango/app01/tasks.py

# Create your tasks here

from __future__ import absolute_import, unicode_literals

from celery import shared_task

@shared_task

def add(x, y):

return x + y

@shared_task

def mul(x, y):

return x * y

@shared_task

def xsum(numbers):

return sum(numbers)

設置settings.py

這個是django的配置文件,不過現在celery的配置也都可以寫在這里了:

# UsingCeleryWithDjango/UsingCeleryWithDjango/settings.py

# 其他都是django的配置內容,就省了

# Celery settings

BROKER_URL = 'redis://192.168.246.11/0'

CELERY_RESULT_BACKEND = 'redis://192.168.246.11/0'

這里就做最基本的設置,用redis收任務和存任務結果,其他都默認了設置了。

啟動Worker

啟動命令是一樣的,關鍵就是-A后面的參數:

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info

[2018-10-02 20:55:56,411: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:

Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers

the ability to execute any command. It's important to secure

your broker from unauthorized access when using pickle, so we think

that enabling pickle should require a deliberate action and not be

the default choice.

If you depend on pickle then you should set a setting to disable this

warning and to be sure that everything will continue working

when you upgrade to Celery 3.2::

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

-------------- celery@IDX-xujf v3.1.25 (Cipater)

---- **** -----

--- * *** * -- Windows-10-10.0.17134-SP0

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: CeleryPro:0x27f5e4dbe80

- ** ---------- .> transport: redis://192.168.246.11:6379/0

- ** ---------- .> results: redis://192.168.246.11/0

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ----

--- ***** ----- [queues]

-------------- .> celery exchange=celery(direct) key=celery

[tasks]

. CeleryPro.celery.debug_task

. app01.tasks.add

. app01.tasks.mul

. app01.tasks.xsum

[2018-10-02 20:55:56,548: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0

[2018-10-02 20:55:56,576: INFO/MainProcess] mingle: searching for neighbors

[2018-10-02 20:55:57,596: INFO/MainProcess] mingle: all alone

[2018-10-02 20:55:57,647: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!

warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2018-10-02 20:55:57,653: WARNING/MainProcess] celery@IDX-xujf ready.

上面這樣就是成功啟動了,確認一下[tasks]下面的任務是否都有就沒問題了。

關于這個[tasks]下面的內容,就是所有我們自定義的任務的名字,下面研究了一下自己如何獲取到這些任務名字

獲取到所有的tasks

所有的tasks都可以通過app.tasks獲取到。這個app就是 CeleryPro/celery.py 里 app = Celery('CeleryPro') 生成的實例。并且在 CeleryPro/init.py 里通過 from .celery import app as celery_app 換了個別名,所以在這個項目里應該是 celery_app.tasks 。

打印celery_app.tasks結果如下:

{'celery.chord_unlock': , 'celery.group': , 'app01.tasks.xsum': , 'celery.backend_cleanup': , 'app01.tasks.add': , 'celery.map': , 'app01.tasks.mul': , 'celery.chain': , 'CeleryPro.celery.debug_task': , 'celery.starmap': , 'celery.chord': , 'celery.chunks': }

我們的任務都在里面了,但是還多了很多其他的任務(都是celery開頭的)。之前啟動Worker的時候都是用 -l info 參數,如果用 -l debug 參數也是能看到這些任務的。也就是說celery在啟動Worker的時候做了個過濾,debug模式打印所有,info模式只打印用戶自定義的任務。接下來現在就是去源碼里找一下,看看是怎么做過濾的。

我在源碼里截取了下面這些來分析一下:

# celery/apps/worker.py

# 首先是一些在啟動時會打印到控制臺的字符串內容

# 這個是LOGO,這個不是重點

ARTLINES = [

' --------------',

'---- **** -----',

'--- * *** * --',

'-- * - **** ---',

'- ** ----------',

'- ** ----------',

'- ** ----------',

'- ** ----------',

'- *** --- * ---',

'-- ******* ----',

'--- ***** -----',

' --------------',

]

# 這個字符串就是打印任務列表的字符串

# 輸出到控制臺之前,會用format做一下字符串格式化,這樣任務列表就能動態的輸出了

EXTRA_INFO_FMT = """

[tasks]

{tasks}

"""

# 這個類里有很多方法,這里就看看動態獲取任務列表的恨啊

class Worker(WorkController):

# 這個就是生成任務列表的方法

# 邏輯也很簡單就是判斷是不是以 'celery' 開頭

# include_builtins 為True就輸出所有的task,為False就過濾掉'celery'開頭的

# include_builtins 具體的值看下面的extra_info方法

def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):

return sep.join(

' . {0}'.format(task) for task in sorted(self.app.tasks)

if (not task.startswith(int_) if not include_builtins else task)

)

# 這個方法是調用上面的tasklist方法的

# 先判斷啟動級別,根據級別是否小于等于debug,決定include_builtins參數

# 最后用tasklist返回的結果,格式化EXTRA_INFO_FMT

def extra_info(self):

if self.loglevel <= logging.INFO:

include_builtins = self.loglevel <= logging.DEBUG

tasklist = self.tasklist(include_builtins=include_builtins)

return EXTRA_INFO_FMT.format(tasks=tasklist)

過濾方法很簡單,就是用startswith過濾掉以celery開頭的key就好了。另外過濾之前先用sorted做了個排序,順便把字典變成了用key組成的列表。

所以用下面的方法就可以獲取到任務列表:

from CeleryPro import celery_app

def celery_list(request):

task_list = []

for task in sorted(celery_app.tasks):

if not task.startswith('celery.'):

task_list.append(task)

print(task_list)

return HttpResponse('OK')

上面的代碼最終獲得的是一個列表,可以直接用一個列表生成式搞定:

task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]

這里拿到的只是任務的key,要調用任務的話,就用key在celery_app.tasks這個字典里獲取到對應的value,調用這個value的方法:

task_name = task_list[1]

t = celery_app.tasks[task_name].delay(1, 2)

在views里調用任務

調用任務的具體做法,上一節最后已經有了。但是獲取任務執行結果還有些問題。

之前的做法都是在調用delay方法時獲取返回值,就是這個任務的對象,有了返回的對象,就可以判斷任務是否執行完成以及獲取任務執行結果。

但是現在在views視圖函數里提交任務后,函數就返回結束了,任務的對象就沒有了,并且也是無法把這里的對象直接返回給瀏覽器的。這里就需要返回一個任務的id(就是為每個任務生成的uuid)。之后請求時,就通過這個uuid獲取到之前的任務的對象。

# 要通過uuid獲取對象,使用下面這個方法

from celery.result import AsyncResult

task_obj = AsyncResult(uuid) # 通過uuid獲取到任務對象

# 先獲取到對象,之后的操作就和之前的一樣了

task_obj.ready() # 檢查任務是否執行完成

task_obj.get() # 阻塞的拿結果

task_obj.result # 任務執行完成后,結果就存在這里,就不要再用get方法獲取了

下面是我測試寫的示例代碼

前端頁面

這個頁面可以選擇任務,填好參數,提交后臺執行。提交后會跳轉到任務結果頁面:

# UsingCeleryWithDjango/templates/celery_list.html

{% csrf_token %}

{% for task in task_list %}

{{ task }}

{% endfor %}

Tips: 后臺會用json.loads把input提交的參數做一次反序列化,然后用*args傳參

debug_task方法,參數不填

add和mul方法,參數填個2個元素的列表。比如:[1, 2]

xsum方法,參數接收一個列表,所以要再包一層[]。比如:[[1, 2, 3, 4, 5]]

路由函數

有兩個url,一個是提交任務頁面的url。還有一個url是根據uuid拿任務結果的,這個視圖沒寫html,直接用HttpResponse返回了:

# UsingCeleryWithDjango/UsingCeleryWithDjango/urls.py

from django.contrib import admin

from django.urls import path

from app01 import views

urlpatterns = [

path('admin/', admin.site.urls),

path('celery_list/', views.celery_list),

path('celery_result//', views.celery_result),

]

視圖函數

# UsingCeleryWithDjango/app01/views.py

from django.shortcuts import render, redirect, HttpResponse

# Create your views here.

from CeleryPro import celery_app

from celery.result import AsyncResult

import json

def celery_list(request):

if request.method == 'POST':

task_name = request.POST.get('task_name')

args = request.POST.get('args')

if args:

t = celery_app.tasks[task_name].delay(*json.loads(args))

return redirect('/celery_result/%s/' % t.id)

else:

celery_app.tasks[task_name]()

# 參考源碼的方法,獲取到所有task名字的列表

task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]

return render(request, 'celery_list.html', {'task_list': task_list})

def celery_result(request, uuid):

uuid = str(uuid)

task_obj = AsyncResult(uuid)

if task_obj.ready():

return HttpResponse(task_obj.result)

else:

ele = ""

return HttpResponse('Not Ready %s' % ele)

測試下來都很好,不過所有任務都是立刻會返回結果的。所以去修改一下tasks.py里的任務。找個任務加點延遲 time.sleep() ,如果任務沒有執行完成,也不會卡住,而是先返回一個頁面,可以再刷新,如果執行完成了,就能返回任務執行的結果。

在django中使用定時任務

要在django中使用定時任務,到這里需要再安裝一個模塊:

pip install django_celery_beat

這個模塊是 django_celery_beat ,注意名字里是下劃線,不過命令里用中橫杠也認(大概是做了別名)。這個模塊不僅僅只是做定時任務,它是通過把任務存到django的數據庫里實現的,所以還可以很方便的通過django admin來設置和管理。

注意:安裝這個模塊的時候還會自動安裝一些別的依賴模塊,不過坑的地方是,會把原本的celery更新到最新版,也就是號稱不支持windows的4版本。

既然升級了,就先在當前的環境下跑跑試試看。然后踩了2個坑。

我用的是win10系統,部分由于windows操作系統導致的問題,不知道通用性是如何的。

無法自動發現app的任務

worker可以正常啟動,頁面也能打開,但是app里定義的任務都找不到了。

自動發放所有app下的tasks是在 "UsingCeleryWithDjango/CeleryPro/celery.py" 這個文件里配置的,具體是調用下面的這個方法:

# from django.conf import settings # noqa

# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 從django的settings里獲取app的路徑

app.autodiscover_tasks(['app01'],) # 自己指定

去看了下這個方法的源碼,一大段注釋,不過內容很簡單:

def autodiscover_tasks(self, packages=None,

related_name='tasks', force=False):

"""Auto-discover task modules.

Searches a list of packages for a "tasks.py" module (or use

related_name argument).

If the name is empty, this will be delegated to fix-ups (e.g., Django).

For example if you have a directory layout like this:

.. code-block:: text

foo/__init__.py

tasks.py

models.py

bar/__init__.py

tasks.py

models.py

baz/__init__.py

models.py

Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will

result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.

Arguments:

packages (List[str]): List of packages to search.

This argument may also be a callable, in which case the

value returned is used (for lazy evaluation).

related_name (str): The name of the module to find. Defaults

to "tasks": meaning "look for 'module.tasks' for every

module in ``packages``."

force (bool): By default this call is lazy so that the actual

auto-discovery won't happen until an application imports

the default modules. Forcing will cause the auto-discovery

to happen immediately.

"""

if force:

return self._autodiscover_tasks(packages, related_name)

signals.import_modules.connect(starpromise(

self._autodiscover_tasks, packages, related_name,

), weak=False, sender=self)

內容就是一個if,然后返回某個東西。關鍵是if的條件,是一個默認參數為false的變量,所以用默認方法調用,是不會執行任何語句的。解決辦法就很簡單了,調用的時候指定force參數:

app.autodiscover_tasks(['app01'], force=True) # 4版本有個force參數。默認是False,需要設為True

執行任務報錯

啟動worker(-l info),打開網頁,提交任務。然后報錯。worker上的錯誤信息如下:

[2018-10-08 13:23:28,062: INFO/MainProcess] Received task: app01.tasks.add[ff0f5e76-6474-4f74-a93c-7b2486abe07e]

[2018-10-08 13:23:28,078: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)

Traceback (most recent call last):

File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\billiard\pool.py", line 358, in workloop

result = (True, prepare_result(fun(*args, **kwargs)))

File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task

tasks, accept, hostname = _loc

ValueError: not enough values to unpack (expected 3, got 0)

這個問題基本上判斷下來就是4版本不支持windows系統導致的。

通過celery降級解決問題

這小段看看就好,因為后面有不降級的辦法。

到這里我就沒能力看懂錯誤信息然后找出真正的問題了,只能把celery的版本降回去再看看了:

pip uninstall celery

pip install celery==3.1.25

所謂降級,其實就是先刪了,然后再裝一個舊版本。這條路我沒繼續走下去。

4版本的celery還是能用的

有發現個新的辦法,可以解決這里的問題,還需要再裝一個模塊:

pip install eventlet

裝完之后,加一個新的參數啟動worker,"-P eventlet" :

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info -P eventlet

-------------- celery@IDX-xujf v4.2.1 (windowlicker)

---- **** -----

--- * *** * -- Windows-10-10.0.17134-SP0 2018-10-08 13:33:21

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: CeleryPro:0x16ad81d16a0

- ** ---------- .> transport: redis://192.168.246.11:6379/0

- ** ---------- .> results: redis://192.168.246.11/0

- *** --- * --- .> concurrency: 4 (eventlet)

-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)

--- ***** -----

-------------- [queues]

.> celery exchange=celery(direct) key=celery

[tasks]

. CeleryPro.celery.debug_task

. app01.tasks.add

. app01.tasks.mul

. app01.tasks.xsum

[2018-10-08 13:33:21,430: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0

[2018-10-08 13:33:21,457: INFO/MainProcess] mingle: searching for neighbors

[2018-10-08 13:33:22,488: INFO/MainProcess] mingle: all alone

[2018-10-08 13:33:22,502: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!

warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2018-10-08 13:33:22,504: INFO/MainProcess] celery@IDX-xujf ready.

[2018-10-08 13:33:22,519: INFO/MainProcess] pidbox: Connected to redis://192.168.246.11:6379/0.

[2018-10-08 13:34:13,596: INFO/MainProcess] Received task: app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d]

[2018-10-08 13:34:13,611: INFO/MainProcess] Task app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d] succeeded in 0.0s: 7

上面是worker的日志,啟動后,還提交了一個任務,這次正常處理完了。

使用 Django_Celery_Beat

先在settings的INSTALLED_APPS里注冊一下:

INSTALLED_APPS = [

......

'django_celery_beat',

]

應用django_celery_beat的數據庫,會自動創建幾張表。只要直接migrate就好了:

>python manage.py migrate

Operations to perform:

Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions

Running migrations:

Applying django_celery_beat.0001_initial... OK

Applying django_celery_beat.0002_auto_20161118_0346... OK

Applying django_celery_beat.0003_auto_20161209_0049... OK

Applying django_celery_beat.0004_auto_20170221_0000... OK

Applying django_celery_beat.0005_add_solarschedule_events_choices... OK

Applying django_celery_beat.0006_auto_20180210_1226... OK

登錄django admin后,就能看下如下的幾張表了:

Celery 全面學習筆記

任務都是配置在Periodic tasks表里的。另外幾張表就是各種任務執行周期的。

配置任務

先進入 Intervals 表,新建任務周期。這里建一個每5秒的周期。

Celery 全面學習筆記

然后進入 Periodic tasks 表,選擇要執行的任務,關聯上某個周期。

這里能看到的任務就是通過自動發現注冊的任務:

Celery 全面學習筆記

下面還有填寫任務參數的部分,這里有兩個框,里面寫JSON。位置參數寫上面,關鍵參數寫下面:

Celery 全面學習筆記

這里的JSON會反序列化之后,以 "*args, **kwargs" 傳遞給任務函數的。

好了任務配置完了,其他任務周期也是一樣的,就不試了。

啟動Beat

這里依然需要啟動一個Beat來定時發任務的。先把Worker起動起來,然后啟動Beat需要多加一個參數 "-S django" :

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro beat -l info -S django

celery beat v4.2.1 (windowlicker) is starting.

__ - ... __ - _

LocalTime -> 2018-10-08 14:43:43

Configuration ->

. broker -> redis://192.168.246.11:6379/0

. loader -> celery.loaders.app.AppLoader

. scheduler -> django_celery_beat.schedulers.DatabaseScheduler

. logfile -> [stderr]@%INFO

. maxinterval -> 5.00 seconds (5s)

[2018-10-08 14:43:43,907: INFO/MainProcess] beat: Starting...

[2018-10-08 14:43:43,908: INFO/MainProcess] Writing entries...

[2018-10-08 14:43:48,911: INFO/MainProcess] Writing entries...

[2018-10-08 14:43:48,939: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)

[2018-10-08 14:43:53,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)

[2018-10-08 14:43:58,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)

[2018-10-08 14:43:59,534: INFO/MainProcess] Writing entries...

[2018-10-08 14:43:59,717: INFO/MainProcess] Writing entries...

[2018-10-08 14:43:59,727: INFO/MainProcess] Writing entries...

[2018-10-08 14:43:59,729: INFO/MainProcess] Writing entries...

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>

注意:每次修改任務,都需要重啟Beat,最新的配置才能生效。這個對 Intervals 的任務(每隔一段時間執行的),影響比較大。Crontab的任務問題貌似不是很大。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/539222.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/539222.shtml
英文地址,請注明出處:http://en.pswp.cn/news/539222.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

hive復合數據類型之array

概述 ARRAY&#xff1a;ARRAY類型是由一系列相同數據類型的元素組成&#xff0c;這些元素可以通過下標來訪問。比如有一個ARRAY類型的變量fruits&#xff0c;它是由[apple,orange,mango]組成&#xff0c;那么我們可以通過fruits[1]來訪問元素orange&#xff0c;因為ARRAY類型的…

Exploit開發系列教程-Mona 2 SEH

P3nro5e 2015/07/10 10:580x00 Mona 2 前言 & 準備Mona 2是一種非常有用的插件&#xff0c;它由Corelan Team開發。起初是為Immunity Debugger寫的&#xff0c;現在它適用于WinDbg調試器。你將需要為WinDbg x86 和 WinDbg x64安裝一些工具&#xff1a;安裝Python 2.7 (從這…

python集合的元素可以是_Python集合的元素中,為什么不可以是包含嵌套列表的元組?...

你有一個誤解&#xff0c;hash算法針對的是元素的內容&#xff0c;并不是針對指針&#xff0c;所以指針不變不等于可hash。 如果你想深究細節的話&#xff0c;可以看tuple的源碼&#xff1a; static Py_hash_t tuplehash(PyTupleObject *v) { Py_uhash_t x; /* Unsigned for de…

python lib庫_python_lib基礎庫

1&#xff1a;argv傳遞給python腳本的命令行參數列表&#xff0c;argv[0]是腳本的名字(他是平臺獨立的&#xff0c;不管他是一個路徑全名或不是)&#xff0c;如果使用了-c參數選項&#xff0c;argv[0]會被設置為字符串-c&#xff0c;如果沒有腳本名傳遞給python解釋器&#xff…

hive復合數據類型之map

概述 MAP&#xff1a;MAP包含key->value鍵值對&#xff0c;可以通過key來訪問元素。比如”userlist”是一個map類型&#xff0c;其中username是key&#xff0c;password是value&#xff1b;那么我們可以通過userlist[username]來得到這個用戶對應的password&#xff1b; 操…

Beego框架使用

為什么80%的碼農都做不了架構師&#xff1f;>>> Beego Web項目目錄結構 new 命令是新建一個 Web 項目&#xff0c;我們在命令行下執行 bee new <項目名> 就可以創建一個新的項目。但是注意該命令必須在 $GOPATH/src 下執行。最后會在 $GOPATH/src 相應目錄下…

oracle下lag和lead分析函數

Lag和Lead分析函數可以在同一次查詢中取出同一字段的前N行的數據(Lag)和后N行的數據(Lead)作為獨立的列。 這種操作可以代替表的自聯接&#xff0c;并且LAG和LEAD有更高的效率。 語法&#xff1a; [sql] view plaincopy /*語法*/ lag(exp_str,offset,defval) over() Lead(…

802d簡明調試手冊_SINUMERIK-828D簡明調試手冊.pdf

SINUMERIK 828D / 828D BASIC簡明調試手冊SINUMERIKAnswers for industry. SIEMENSABC01.2012 ASINUMERIK 828D / 828D BASIC V04.04SP01123PLC 45NC 67PLC 891011121314151617PLC 18i1 11.1 11.1.1 NC 31.1.2 31.2

jtessboxeditorfx 界面顯示不出來_macOS 使用 XQuartz 支持 X11 實現 Linux 圖形化界面顯示...

更多奇技淫巧歡迎訂閱博客&#xff1a;https://fuckcloudnative.io前言在 Windows 中相信大家已經很熟悉使用 Xmanager(Xshell), MobaXterm, SecureCRT 通過 X11 實現 Linux 圖形化界面顯示&#xff0c;我的需求是在 macOS 下使用 iTerm2 作為 Terminal 實現 X11 圖形化界面顯示…

EntityFramework Core 2.0 Explicitly Compiled Query(顯式編譯查詢)

前言 EntityFramework Core 2.0引入了顯式編譯查詢&#xff0c;在查詢數據時預先編譯好LINQ查詢便于在請求數據時能夠立即響應。顯式編譯查詢提供了高可用場景&#xff0c;通過使用顯式編譯的查詢可以提高查詢性能。EF Core已經使用查詢表達式的散列來表示自動編譯和緩存查詢&a…

Oracle Minus關鍵字 不包含 取差集

Oracle Minus關鍵字   SQL中的MINUS關鍵字   SQL中有一個MINUS關鍵字&#xff0c;它運用在兩個SQL語句上&#xff0c;它先找出第一條SQL語句所產生的結果&#xff0c;然后看這些結果有沒有在第二個SQL語句的結果 中。如果有的話&#xff0c;那這一筆記錄就被去除&#xff0…

python掃描器甄別操作系統類型_20189317 《網絡攻防技術》 第三周作業

一.教材內容總結1.網絡踩點&#xff1a;web搜索與挖掘、DNS和IP查詢、網絡拓撲偵察(1)網絡踩點目標確定(2)技術手段&#xff1a;web信息搜索與挖掘、DNS和IP查詢、網絡拓撲偵察(3)web信息搜索與挖掘&#xff1a;基本搜索與挖掘技巧、高級搜索與挖掘技巧、編程實現google搜索、元…

python 網頁重定向_小試牛刀:python爬蟲爬取springer開放電子書.

首先聲明,本文旨在記錄反思,并沒有資源,代碼也不具有借鑒意義(水平實在不行.某天,水群的時候發現群友發了一個文件,里面是疫情時期springer開放的免費電子書名單,同時還附有下載鏈接,總共有400多本,這要是一個一個下載不得累死個人,只下載自己感興趣的書也是一個好主意,但是,我…

直面桌面云帶來的現狀優勢

在桌面云解決方案里&#xff0c;首先&#xff0c;所有的數據以及運算都在服務器端進行&#xff0c;客戶端只是顯示其變化的影像而已&#xff0c;所以在不需要擔心客戶端來非法竊取資料&#xff0c;我們在電影里面看到的商業間諜拿著 U 盤瘋狂的拷貝公司商業機密的情況再也不會出…

ORA-28001: the password has expired解決方法

Oracle提示錯誤消息ORA-28001: the password has expired&#xff0c;是由于Oracle11G的新特性所致&#xff0c; Oracle11G創建用戶時缺省密碼過期限制是180天&#xff08;即6個月&#xff09;&#xff0c; 如果超過180天用戶密碼未做修改則該用戶無法登錄。 Oracle公司是為了數…

.net 導出excel_Qt編寫的項目作品18-數據導出到Excel及Pdf和打印數據

一、功能特點原創導出數據機制&#xff0c;不依賴任何office組件或者操作系統等第三方庫&#xff0c;尤其是支持嵌入式linux。10萬行數據9個字段只需要2秒鐘完成。只需要四個步驟即可開始急速導出大量數據到Excel。同時提供直接寫入數據接口和多線程寫入數據接口&#xff0c;不…

hive數據庫定義

默認數據庫"default" 可以顯式切換數據庫&#xff1a;hive> use 數據庫名; 創建 hive>CREATE DATABASE [IF NOT EXISTS] mydb [LOCATION] /....... [COMMENT] ....; 實例 hive (default)> create database test_db comment test database; OK Ti…

圖像增強_Keras 常用的圖像增強方式

歡迎關注 “小白玩轉Python”&#xff0c;發現更多 “有趣”在使用神經網絡和深度學習模型時&#xff0c;需要進行數據準備。對于更復雜的物體識別任務&#xff0c;也越來越需要增加數據量。數據增加意味著增加數據量。換句話說&#xff0c;擁有更大的數據集意味著更健壯的模型…

Facebook產品經理的三年敘事與協作思考

產品經理和研發工程師的關系經常被大家調侃&#xff0c;可偏偏就有同時受到研發和設計都喜歡的“別人家的產品經理”&#xff0c;溝通協調、對接需求、項目把控面面俱到還有好人緣。有沒有人天生就是產品經理&#xff1f;產品經理的工作就是寫需求寫需求和寫需求么&#xff1f;…

sis新地址_堅若磐石不掉速,老平臺升級新選擇,入手昱聯Asint 500G SSD

我是文章的原作者&#xff0c;文章首發于&#xff1a;什么值得買愛折騰的老狐貍?zhiyou.smzdm.com首發文章鏈接&#xff1a;堅若磐石不掉速&#xff0c;老平臺升級新選擇&#xff0c;入手昱聯Asint 500G SSD _值客原創_什么值得買?post.smzdm.com雖然說&#xff0c;現在越來越…