------------定時任務、異步任務--------
pip install celery --target=D:\Users\ex-ouyangl003\PycharmProjects\data_new\dg_meta_system\metadata_system\venv\Lib\site-packages
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test():
print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好'
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, trigger='cron', second='*/5')
scheduler.start()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3)
backend='redis://:109321@10.35.163.24:6379/7’, # 返回值存入數據庫
broker='redis://:109321@10.35.163.24:6379/8') # :密碼@host/post/db
# -----windows celery
Windows的Celery只支持到3.1.25
pip install celery==4.3.0
pip install eventlet
celery -A app.tasks.meta.tasks worker -l info -P eventlet
celery -A app.tasks.meta.tasks worker --loglevel=info
celery -A app.celery worker --loglevel=info
tasks.task_name.delay()
t.ready()
t.get()
t.get(timeout=11)
t.get(propagate=False)
t.traceback
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_ENABLE_UTC=True
# 官網推薦消息序列化方式為json
CELERY_ACCEPT_CONTENT=['json']
CELERY_TASK_SERIALIZER='json'
CELERY_RESULT_SERIALIZER='json'
請求耗時(比如大量的數據庫插入,發送驗證郵件等)
利用Celery來后臺處理耗時任務可以保證Flask能夠較快響應而且不被阻塞,同時減輕了數據庫的高峰寫入壓力
操作數據庫,操作完成后記得釋放數據庫連接,例如Session.remove
Celery是專注實時處理和任務調度的分布式任務隊列。
主要應用場景:
1,web應用,當需要觸發事件需要較長時間處理完成,可以交給celery進行異步執行,執行完后返回結果,這段時間不用等待,提高系統的吞吐量和響應時間
2,完成任務時,需要額外的事件處理,如發送郵件等
3,后臺定時任務處理,celery可以幫助我們在不同服務器進行定時任務管理