python拓展7(Celery消息隊列配置定時任務)

介紹

celery 定時器是一個調度器(scheduler);它會定時地開啟(kicks off)任務,然后由集群中可用的工人(worker)來執行。

定時任務記錄(entries)默認 從 beat_schedule 設置中獲取,但自定義存儲也可以使用,如把記錄存儲到SQL數據庫中。

要確保同一時間一份時間表上只有一個調度器在運行,否則會因為重復發送任務而結束。使用集中途徑意味著定時任務不用必須同步,并且服務無需用鎖操控。

  • user:用戶程序,用于告知celery去執行一個任務。
  • broker: 存放任務(依賴RabbitMQ或Redis,進行存儲)
  • worker:執行任務

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(測試中) 充當broker來進行消息的接收,并且也支持多個broker和worker來實現高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

Celery version 4.0 runs onPython ?2.7, 3.4, 3.5?PyPy ?5.4, 5.5?This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.If you’re running an older version of Python, you need to be running an older version of Celery:Python 2.6: Celery series 3.1 or earlier.Python 2.5: Celery series 3.0 or earlier.Python 2.4 was Celery series 2.2 or earlier.Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
版本和要求

環境準備:

  • 安裝rabbitMQ或Redis
    https://www.cnblogs.com/L5251/articles/9146825.html

  ? ?https://www.cnblogs.com/L5251/articles/9325586.html

  • 安裝celery
    ? ? ?pip3 install celery

快速上手

import time
from celery import Celeryapp = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')@app.task
def xxxxxx(x, y):time.sleep(10)return x + y
s1.py
s2.py
from celery.result import AsyncResult
from s1 import appasync = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)if async.successful():result = async.get()print(result)# result.forget() # 將結果刪除
elif async.failed():print('執行失敗')
elif async.status == 'PENDING':print('任務等待中被執行')
elif async.status == 'RETRY':print('任務異常后正在重試')
elif async.status == 'STARTED':print('任務已經開始被執行')
s3.py
# 執行 s1.py 創建worker(終端執行命令):
celery worker -A s1 -l info
# PS:Windows系統上執行命令時出錯解決方法
    pip3 install eventlet
# 后期運行修改為:celery worker -A s1 -l info -P eventlet
# 執行 s2.py ,創建一個任務并獲取任務ID:
    python3 s2.py# 執行 s3.py ,檢查任務狀態并獲取結果:python3 s3.py

多任務結構

pro_cel├── celery_tasks# celery相關文件夾│   ├── celery.py   # celery連接和配置相關文件│   └── tasks.py    #  所有任務函數├── check_result.py # 檢查結果└── send_task.py    # 觸發任務
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celerycelery = Celery('xxxxxx',broker='redis://192.168.0.111:6379',backend='redis://192.168.0.111:6379',include=['celery_tasks.tasks'])# 時區
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-import time
from .celery import celery@celery.task
def xxxxx(*args, **kwargs):time.sleep(5)return "任務結果"@celery.task
def hhhhhh(*args, **kwargs):time.sleep(5)return "任務結果"
pro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-from celery.result import AsyncResult
from celery_tasks.celery import celeryasync = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)if async.successful():result = async.get()print(result)# result.forget() # 將結果刪除
elif async.failed():print('執行失敗')
elif async.status == 'PENDING':print('任務等待中被執行')
elif async.status == 'RETRY':print('任務異常后正在重試')
elif async.status == 'STARTED':print('任務已經開始被執行')
pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks# 立即告知celery去執行xxxxxx任務,并傳入兩個參數
result = celery_tasks.tasks.xxxxx.delay(4, 4)print(result.id)
pro_cel/send_task.py

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

定時任務

1. 設定時間讓celery執行一個任務

import datetime
from celery_tasks.tasks import xxxxx
"""
from datetime import datetimev1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10# 使用apply_async并設定時間
result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)

2. 類似于contab的定時任務

"""
celery beat -A proj
celery worker -A proj -l info"""
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = Falseapp.conf.beat_schedule = {# 'add-every-10-seconds': {#     'task': 'proj.s1.add1',#     'schedule': 10.0,#     'args': (16, 16)# },'add-every-12-seconds': {'task': 'proj.s1.add1','schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),'args': (16, 16)},
}

注:如果想要定時執行類似于crontab的任務,需要定制 Scheduler來完成。

Flask中應用Celery

pro_flask_celery/
├── app.py
├── celery_tasks├── celery.py└── tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-from flask import Flask
from celery.result import AsyncResultfrom celery_tasks import tasks
from celery_tasks.celery import celeryapp = Flask(__name__)TASK_ID = None@app.route('/')
def index():global TASK_IDresult = tasks.xxxxx.delay()# result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))TASK_ID = result.idreturn "任務已經提交"@app.route('/result')
def result():global TASK_IDresult = AsyncResult(id=TASK_ID, app=celery)if result.ready():return result.get()return "xxxx"if __name__ == '__main__':app.run()
app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontabcelery = Celery('xxxxxx',broker='redis://192.168.10.48:6379',backend='redis://192.168.10.48:6379',include=['celery_tasks.tasks'])# 時區
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-import time
from .celery import celery@celery.task
def hello(*args, **kwargs):print('執行hello')return "hello"@celery.task
def xxxxx(*args, **kwargs):print('執行xxxxx')return "xxxxx"@celery.task
def hhhhhh(*args, **kwargs):time.sleep(5)return "任務結果"
celery_task/tasks.py

記錄

為了定時調用任務,你必須添加記錄到打點列表中:

from celery import Celery
from celery.schedules import crontabapp = Celery()@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# 每10秒調用 test('hello') .sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# 每30秒調用 test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10)# 每周一上午7:30執行
    sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.task
def test(arg):print(arg)

用on_after_configure處理器進行這些設置意味著當使用test.s()時我們不會在模塊層面運行app 。

add_periodic_task() 函數在幕后會添加記錄到beat_schedule設定,同樣的設定可以用來手動設置定時任務:

例子: 每30秒運行 tasks.add .

app.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},
}
app.conf.timezone = 'UTC'

一般會使用配置文件進行配置,如下?
celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},
}

程序里使用:

app.config_from_object('celeryconfig')注意
如果你的參數元組里只有一個項目,只用一個逗號就可以了,不要圓括號。

時間表使用時間差意味著每30秒間隔會發送任務(第一個任務在celery定時器開啟后30秒發送,然后上每次距一次運行后30秒發送一次)

可使用的屬性

    • task

      要執行的任務名字

    • schedule

      執行的頻率

      可以是整數秒數,時間差,或者一個周期( crontab)。你也可以自 定義你的時間表類型,通過擴展schedule接口。

    • args

      位置參數 (list 或 tuple).

    • kwargs

      鍵值參數 (dict).

    • options

      執行選項 (dict).

      這可以是任何被apply_async()支持的參數與—-exchange, routing_key, expires,等。

    • relative

      如果 relative 是 true ,時間表“由時鐘時間”安排,意味著 頻率近似到最近的秒,分鐘,小時或天,這取決于時間差中的時間間隔。?
      默認relative是false,頻率不會近似,會相對于celery的啟動時間。

      Crontab 表達式語法非常靈活。

例子含義
crontab()每分鐘執行
crontab(minute=0, hour=0)每天午夜執行
crontab(minute=0, hour=’*/3’)每三個小時執行: 午夜, 3am, 6am, 9am, 正午, 3pm, 6pm, 9pm.
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’)同上
crontab(minute=’*/15’)每15分鐘執行
crontab(day_of_week=’sunday’)星期日每分鐘
crontab(minute=’‘,hour=’‘, day_of_week=’sun’)同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’)每10分鐘執行,僅限于周六日3-4 am, 5-6 pm, and 10-11 pm
crontab(minute=0, hour=’/2,/3’)偶數小時或者能被3整除的小時數執行
crontab(minute=0, hour=’*/5’)被5整除的小時數,如3pm
crontab(minute=0, hour=’*/3,8-17’)8am-5pm能被3整除的
crontab(0, 0, day_of_month=’2’)每月第2天
crontab(0, 0,day_of_month=’2-30/3’)每偶數天
crontab(0, 0,day_of_month=’1-7,15-21’)每月1和3周
crontab(0, 0, day_of_month=’11’,month_of_year=’5’)每年5月11日
crontab(0, 0,month_of_year=’*/3’)每個季度第1月

開啟調度

開啟celery定時服務:

$ celery -A proj beat

可以把定時器嵌入到工人(worker)中,通過啟用workers -B選項,如果你永遠不會運行超過一個工人節點這就會很方便。但這不太常見,不推薦在生產環境這樣使用:

$ celery -A proj worker -B

定時器需要在本地數據庫文件(默認名為 celerybeat-schedule )存儲任務上次運行時間,所以它需要在當前目錄中寫權限。或者你也可以給這個文件指定一個位置:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

?

轉載于:https://www.cnblogs.com/L5251/articles/9332304.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/279832.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/279832.shtml
英文地址,請注明出處:http://en.pswp.cn/news/279832.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Asia Yokohama Regional Contest 2018 G題 What Goes Up Must Come Down(樹狀數組求逆序對)

https://codeforces.com/gym/102082 題意: 給一個數組大小不超過1e5,每個數的值也是1e5以內,可以交換相鄰兩個數,求保證它呈現一個非遞減再非遞增的趨勢的最小交換次數。 題解:對每個數來說,只有兩種情況&a…

Android系統的開機畫面顯示過程分析(8)

3. 第三個開機畫面的顯示過程第三個開機畫面是由應用程序bootanimation來負責顯示的。應用程序bootanimation在啟動腳本init.rc中被配置成了一個服務,如下所示:service bootanim /system/bin/bootanimation user graphics group graphics disabled o…

chrome連接已重置_如何重置(或調整)Chrome的下載設置

chrome連接已重置By default, Chrome saves all downloaded files to the same location—a dedicated “Downloads” folder. The thing is, this isn’t always practical for all types of download files. The good news is you can easily tweak this setting. 默認情況下…

.Net 7 團隊把國內的龍芯確實當做一等公民和棄用的項目

楔子:國內龍芯據說是用的自己的指令集,在研究ILC的時候,發現了龍芯在微軟那邊確實是一等公民的存在。同X64,ARM,X86一同并列交叉編譯和二進制提取。龍芯官網龍芯平臺.NET,是龍芯公司基于開源社區.NET獨立研發適配的龍芯版本&#…

戴爾押寶iSCSI,由低到高組合成型

戴爾(Dell)是較早接受SAS技術的主流存儲廠商之一,2006年已推出采用SAS硬盤驅動器的SAS直連存儲(DAS)系統PowerVault MD3000。一年之后,主機連接改用iSCSI的PowerVault MD3000i問世。2008年1月,E…

仿Gin搭建自己的web框架(七)

本篇介紹HTTP Basic Auth的實現以及Recovery機制。 HTTP Basic Auth Basic Auth是一種開放平臺認證方式,簡單的說就是需要你輸入用戶名和密碼才能繼續訪問。對于Basic Auth的概念不過多的進行介紹,直接進入如何實現的過程。 Basic Auth說白了就是賬號和密…

canvas高斯模糊算法

對于模糊圖片這個效果的實現&#xff0c;其實css3中的filter屬性也能夠實現&#xff0c;但是這個屬性的兼容性不是很好&#xff0c;所以我們通常不用這種方法實現&#xff0c;而使用canvas配合JS實現。 <span style"white-space:pre"> </span>//高斯模糊…

word中插入公式的快捷鍵_如何使用插入鍵在Word中插入復制的內容

word中插入公式的快捷鍵In Word, the “Insert” key on the keyboard can be used to switch between Insert and Overtype modes. However, it can also be used as a shortcut key for inserting copied or cut content at the current cursor position. 在Word中&#xff0…

微軟終于為 Visual Studio 添加了內置的 Markdown 編輯器

微軟終于為 Visual Studio 添加了內置的 Markdown 編輯器。根據官方博客的介紹&#xff0c;由于收到許多用戶的反饋&#xff0c;微軟決定為 Visual Studio 添加 Markdown 編輯器。開發者下載最新的 Visual Studio 17.5 第 2 個預覽版就能夠使用 Markdown 編輯功能&#xff0c;無…

【經驗分享】Hydra(爆破神器)使用方法

這個也是backtrack下面很受歡迎的一個工具 參數詳解&#xff1a;-R 根據上一次進度繼續破解-S 使用SSL協議連接-s 指定端口-l 指定用戶名-L 指定用戶名字典(文件)-p 指定密碼破解-P 指定密碼字典(文件)-e 空密碼探測和指定用戶密碼探測(ns)-C 用戶名可以用:分割(username:passw…

【東軟實訓】SQL多表鏈接

如果一個查詢同時涉及兩個以上的表&#xff0c;則稱之為鏈接查詢&#xff0c;鏈接查詢是關系數據庫中最主要的查詢&#xff0c;主要包括等值鏈接查詢、非等值鏈接查詢、自身鏈接查詢、外鏈接查詢和復合條件鏈接查詢。 這篇博文我們來對多表鏈接進行學習。 Outline 鏈接的基本概…

博鰲“‘AI+時代’來了嗎”分論壇,嘉賓們有何重要觀點?...

雷鋒網(公眾號&#xff1a;雷鋒網)3月27日消息&#xff0c;正在進行中的博鰲亞洲論壇2019年年會&#xff0c;于2019年3月26日至29日在中國海南博鰲舉辦。今年博鰲論壇的主題為“共同命運 共同行動 共同發展”。今天&#xff0c;在主題為《“AI時代”來了嗎&#xff1f;》分論壇…

一款統計摸魚時長的開源項目

對于我們程序員&#xff0c;在工作中一天8小時&#xff0c;不可能完全在寫代碼了&#xff0c;累了刷刷論壇、群里吹吹牛&#xff0c;這都是非常正常的。雖然一天下來&#xff0c;可能我們都可以按時完成工作&#xff0c;但是我們不知道&#xff0c;時間都花在哪里了&#xff0c…

saltstack 主題說明

轉載于:https://www.cnblogs.com/40kuai/p/9335869.html

基于spring boot 的ssm項目的簡單配置

2019獨角獸企業重金招聘Python工程師標準>>> 我前面的帖子有介紹spring boot的簡單搭建&#xff0c;現在我再講講spring boot的簡單配置 首先&#xff0c;項目結構 啟動類 RestController 注解相當于ResponseBody &#xff0b; Controller合在一起的作用。 Sprin…

nest 架構_如何與其他人分享您的Nest Cam Feed

nest 架構Your Nest Cam can help you keep an eye on your home from anywhere you are, but more eyes you trust to watch your stuff is more comforting. If you want someone else to check in once in a while, you can share your Nest Cam feed with a simple, passwo…

關于公司內部DNS的整改建議

文章背景&#xff1a;由給leader的郵件而來&#xff0c;在文章末尾另加了一些補充&#xff08;見FAQ&#xff09;。原主題&#xff1a;關于公司內網DNS服務器中的u-sooc.com域名調整建議 建議&#xff1a;將公司內部DNS服務器&#xff08;Windows Server 2008 R2&#xff09;所…

.Net 和Assembly下滑其它回升,TIOBE編程語言2022年12排行榜

楔子TIOBE編程語言排行榜一般反應的是語言的生態&#xff0c;個人比較喜歡這個排行。來看下2022年最后一個月12月&#xff0c;最后一天,TIOBE的排行榜單。榜單分析這里只看下前10名的編程語言&#xff0c;里面非常顯眼的是所有的語言都增加了生態環境&#xff0c;包括不被看好的…

Haproxy安裝與配置

Haproxy安裝與配置 有關高負載均衡的軟件&#xff0c;目前使用比較多的是haproxy、nginx和lvs。下面我們就開始學習haprxoy這款軟件。 1、Haproxy概念 1.1、haproxy原理 haproxy提供高可用性、負載均衡以及基于TCP(第四層)和HTTP&#xff08;第七層&#xff09;應用的代理&…

刪除word中所有的表格_如何在Word中刪除表格

刪除word中所有的表格If you’ve inserted a table in Word and you now want to delete it, you may have found it’s not all that straightforward to delete the entire table without deleting other content around the table. We’ll show you a couple of ways around…