在Celery在python中的應用除了實現異步任務(async task
)外也可以執行定時任務(beat
)
1.Celery定時任務是什么?
Celery默認任務單元由任務生產者觸發,但有時可能需要其自動觸發, 而beat
進程正是負責此類任務,能夠自動觸發定時/周期性任務.
只需要在配置中配置好周期任務,然后在運行一個周期任務觸發器(beat)即可
2.直接上代碼
目錄結構如下:
celery_app.py 文件代碼如下:
import os
import sys
import time
import celery
from pathlib import Path
from datetime import timedelta# 實例化celery對象
app = celery.Celery("celery_worker",backend="redis://:@127.0.0.1:6379/4",broker="redis://:@127.0.0.1:6379/5",include=["celery_worker.email.tasks"],
)# celery beat 定時任務
beat_schedule = {'periodic_task-every-minute': {# 'task': 'celery_worker.email.tasks.add','task': 'chain.send_chains','schedule': timedelta(seconds=10),'args': (11, 22)},
}# 配置文件
app.conf.update(task_serializer="json",result_serializer="json",accept_content=["json"],task_default_queue="normal",timezone="Asia/Shanghai",enable_utc=False,task_ignore_result=True,redis_max_connections=100,result_expires=3600,beat_schedule=beat_schedule
)"""
celery -A celery_worker.celery_app worker -l info
celery -A celery_worker.celery_app beat
"""
email.tasks.py 代碼如下:
from loguru import logger
# 模塊化之后
from celery_worker.celery_app import app@app.task(name='chain.send_chains')
def add(x, y):logger.info(f'number_add 進來了...x:{x}, y:{y}')return x + y
然后順序啟動 worker 和 beat 定時任務(記得兩個都必須啟動)
執行如下命令:
celery -A celery_worker.celery_app worker -l info (啟動干活的人)
celery -A celery_worker.celery_app beat (啟動定時任務 類似crontab)
效果如下:
其實簡單的來說就是這點代碼:
beat_schedule = {'periodic_task-every-minute': {# 'task': 'celery_worker.email.tasks.add','task': 'chain.send_chains','schedule': timedelta(seconds=10),'args': (11, 22)},
}
periodic_task-every-minute
這個就是定時任務的名字 ,隨便起無所謂。
重點是這個 "task"
,經過實際測試,如果這個工作函數沒有指定name
名字的話,默認就是 函數路徑+函數名稱
也就是 'celery_worker.email.tasks.add'
。
但是如果這函數添加name
屬性值的話 直接用名字也是可以的,也就是'chain.send_chains'
。
好了 小伙伴們也自己實操下吧!