介紹
APScheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 Python 定時任務調度框架。APScheduler 支持三種調度任務:固定時間間隔,固定時間點(日期),Linux 下的 Crontab 命令。同時,它還支持異步執行、后臺執行調度任務。
APScheduler基于Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。
安裝
pip install apscheduler
官方地址
https://apscheduler.readthedocs.io/en/latest/userguide.html#starting-the-scheduler
基本概念
1.APScheduler四大組件
-
觸發器 triggers :用于設定觸發任務的條件
-
任務儲存器 job stores:用于存放任務,把任務存放在內存或數據庫中
-
執行器 executors: 用于執行任務,可以設定執行模式為單線程或線程池
-
調度器 schedulers: 把上方三個組件作為參數,通過創建調度器實例來運行
1.1 觸發器triggers
觸發器包含調度邏輯。每個任務都有自己的觸發器,用于確定何時應該運行作業。除了初始配置之外,觸發器完全是無狀態的。
1.2 任務儲存器 job stores
默認情況下,任務存放在內存中。也可以配置存放在不同類型的數據庫中。如果任務存放在數據庫中,那么任務的存取有一個序列化和反序列化的過程,同時修改和搜索任務的功能也是由任務儲存器實現。
注意一個任務儲存器不要共享給多個調度器,否則會導致狀態混亂
1.3 執行器 executors
任務會被執行器放入線程池或進程池去執行,執行完畢后,執行器會通知調度器。
1.4 調度器 schedulers
一個調度器由上方三個組件構成,一般來說,一個程序只要有一個調度器就可以了。開發者也不必直接操作任務儲存器、執行器以及觸發器,因為調度器提供了統一的接口,通過調度器就可以操作組件,比如任務的增刪改查。
調度器工作流程:
2. 調度器組件詳解
根據開發需求選擇相應的組件,下面是不同的調度器組件:
- BlockingScheduler 阻塞式調度器:適用于只跑調度器的程序。
- BackgroundScheduler 后臺調度器:適用于非阻塞的情況,調度器會在后臺獨立運行。
- AsyncIOScheduler AsyncIO調度器,適用于應用使用AsnycIO的情況。
- GeventScheduler Gevent調度器,適用于應用通過Gevent的情況。
- TornadoScheduler Tornado調度器,適用于構建Tornado應用。
- TwistedScheduler Twisted調度器,適用于構建Twisted應用。
- QtScheduler Qt調度器,適用于構建Qt應用。
2.1 任務儲存器的選擇
要看任務是否需要持久化。如果你運行的任務是無狀態的,選擇默認任務儲存器MemoryJobStore就可以應付。但是,如果你需要在程序關閉或重啟時,保存任務的狀態,那么就要選擇持久化的任務儲存器。如果,作者推薦使用SQLAlchemyJobStore并搭配PostgreSQL作為后臺數據庫。這個方案可以提供強大的數據整合與保護功能。
2.2 執行器的選擇
同樣要看你的實際需求。默認的ThreadPoolExecutor線程池執行器方案可以滿足大部分需求。如果,你的程序是計算密集型的,那么最好用ProcessPoolExecutor進程池執行器方案來充分利用多核算力。也可以將ProcessPoolExecutor作為第二執行器,混合使用兩種不同的執行器。
配置一個任務,就要設置一個任務觸發器。觸發器可以設定任務運行的周期、次數和時間。
3. APScheduler有三種內置的觸發器
- date 日期:觸發任務運行的具體日期
- interval 間隔:觸發任務運行的時間間隔
- cron 周期:觸發任務運行的周期
- calendarinterval:當您想要在一天中的特定時間以日歷為基礎的間隔運行任務時使用
一個任務也可以設定多種觸發器,比如,可以設定同時滿足所有觸發器條件而觸發,或者滿足一項即觸發。
3.0 觸發器代碼示例
date 是最基本的一種調度,作業任務只會執行一次。它表示特定的時間點觸發。它的參數如下:
- run_date(datetime or str):任務運行的日期或者時間
- timezone(datetime.tzinfo or str):指定時區
from datetime import date
from apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()
def my_job(text):print(text)# 注意:run_date參數可以是date類型、datetime類型或文本類型。
# 在2019年4月15日執行
scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['測試任務'])
# datetime類型(用于精確時間)
# scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['測試任務'])
# 字符串
#scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['測試任務'])scheduler.start()
3.2 interval 周期觸發任務
固定時間間隔觸發。interval 間隔調度,參數如下:
- weeks(int):間隔幾周
- days(int):間隔幾天
- hours(int):間隔幾小時
- minutes(int):間隔幾分鐘
- seconds(int):間隔多少秒
- start_date(datetime or str):開始日期
- end_date(datetime or str):結束日期
- timezone(datetime.tzinfo or str):時區
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingSchedulerdef job_func():print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")scheduler = BlockingScheduler()# 每2小時觸發
scheduler.add_job(job_func, 'interval', hours=2)# 在 2019-04-15 17:00:00 ~ 2019-12-31 24:00:00 之間, 每隔兩分鐘執行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2019-04-15 17:00:00' , end_date='2019-12-31 24:00:00')# jitter振動參數,給每次觸發添加一個隨機浮動秒數,一般適用于多服務器,避免同時運行造成服務擁堵。
scheduler.add_job(job_func, 'interval', hours=1, jitter=120)scheduler.start()
3.3 cron 觸發器
在特定時間周期性地觸發,和Linux crontab格式兼容。它是功能最強大的觸發器。
- year(int or str) 年,4位數字
- month(int or str) 月(范圍1-12)
- day(int or str) 日(范圍1-31)
- week(int or str) 周(范圍1-53)
- day_of_week(int or str) 周內第幾天或者星期幾(范圍0-6或者mon,tue,wed,thu,fri,stat,sun)
- hour(int or str) 時(0-23)
- minute(int or str) 分(0-59)
- second(int or str) 秒(0-59)
- start_date(datetime or str) 最早開始日期(含)
- end_date(datetime or str) 最晚結束日期(含)
- timezone(datetime.tzinfo or str) 指定時區
表達式類型
表達式 | 參數類型 | 描述 |
---|---|---|
* | 所有 | 通配符。例:minutes=* 即每分鐘觸發 |
*/a | 所有 | 可被a整除的通配符。 |
a-b | 所有 | 范圍a-b觸發 |
a-b/c | 所有 | 范圍a-b,且可被c整除時觸發 |
xth y | 日 | 第幾個星期幾觸發。x為第幾個,y為星期幾 |
last x | 日 | 一個月中,最后個星期幾觸發 |
last | 日 | 一個月最后一天觸發 |
x,y,z | 所有 | 組合表達式,可以組合確定值或上方的表達式 |
注意:month和day_of_week參數分別接受的是英語縮寫jan– dec 和 mon – sun
import datetime
from apscheduler.schedulers.background import BackgroundSchedulerdef job_func(text):print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])scheduler = BackgroundScheduler()
# 在每年 1-3、7-9 月份中的每個星期一、二中的 00:00, 01:00, 02:00 和 03:00 執行 job_func 任務
scheduler.add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')scheduler.start()
使用 scheduled_job() 裝飾器添加任務:
@scheduler.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():print("I am printed at 00:00:00 on the last Sunday of every month!")
注意:夏令時問題
有些timezone時區可能會有夏令時的問題。這個可能導致令時切換時,任務不執行或任務執行兩次。避免這個問題,可以使用UTC時間,或提前預知并規劃好執行的問題。
pri# 在Europe/Helsinki時區, 在三月最后一個周一就不會觸發;在十月最后一個周一會觸發兩次
scheduler.add_job(job_function, 'cron', hour=3, minute=30)
4. 配置調度程序
APScheduler提供了許多不同的方法來配置調度程序。您可以使用配置字典,也可以將選項作為關鍵字參數傳遞。您還可以先實例化調度程序,然后添加任務并配置調度程序。這樣您就可以在任何環境中獲得最大的靈活性
可以在BaseScheduler類的API引用中找到調度程序級別配置選項的完整列表 。調度程序子類還可以具有其各自API引用中記錄的其他選項。各個任務存儲和執行程序的配置選項同樣可以在其API參考頁面上找到。
假設您希望在應用程序中使用默認作業存儲和默認執行程序運行BackgroundScheduler:
from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler()
這將為您提供一個BackgroundScheduler,其MemoryJobStore(內存任務儲存器)名為“default”,ThreadPoolExecutor(線程池執行器)名為“default”,默認最大線程數為10。
假如你現在有這樣的需求,兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區。可以參考下面例子,它們是完全等價的。
- 名稱為“mongo”的MongoDBJobStore
- 名稱為“default”的SQLAlchemyJobStore
- 名稱為“ThreadPoolExecutor ”的ThreadPoolExecutor,最大線程20個
- 名稱“processpool”的ProcessPoolExecutor,最大進程5個
- UTC時間作為調度器的時區
- 默認為新任務關閉合并模式()
- 設置新任務的默認最大實例數為3
方法一:
from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = {'mongo': MongoDBJobStore(),'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
from apscheduler.schedulers.background import BackgroundScheduler# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({'apscheduler.jobstores.mongo': {'type': 'mongodb'},'apscheduler.jobstores.default': {'type': 'sqlalchemy','url': 'sqlite:///jobs.sqlite'},'apscheduler.executors.default': {'class': 'apscheduler.executors.pool:ThreadPoolExecutor','max_workers': '20'},'apscheduler.executors.processpool': {'type': 'processpool','max_workers': '5'},'apscheduler.job_defaults.coalesce': 'false','apscheduler.job_defaults.max_instances': '3','apscheduler.timezone': 'UTC',
})
方法三:
from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutorjobstores = {'mongo': {'type': 'mongodb'},'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {'default': {'type': 'threadpool', 'max_workers': 20},'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler()# ..這里可以添加任務scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
啟動調度器
啟動調度器是只需調用start()即可。除了BlockingScheduler,非阻塞調度器都會立即返回,可以繼續運行之后的代碼,比如添加任務等。
對于BlockingScheduler,程序則會阻塞在start()位置,所以,要運行的代碼必須寫在start()之前。
注意:調度器啟動后,就不可以修改配置。
5. 添加任務
添加任務方法有兩種:
- 通過調用add_job()
- 通過裝飾器scheduled_job()
5.1 利弊:
- 第一種方法是最常用的;第二種方法是最方便的,但缺點就是運行時,不能修改任務。
- 第一種add_job()方法會返回一個apscheduler.job.Job實例,這樣就可以在運行時,修改或刪除任務。
在任何時候你都能配置任務。但是如果調度器還沒有啟動,此時添加任務,那么任務就處于一個暫存的狀態。只有當調度器啟動時,才會開始計算下次運行時間。
還有一點要注意,如果你的執行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:
- 回調函數必須全局可用
- 回調函數參數必須也是可以被序列化的
重要提醒!
如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,并且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味著任務的狀態不會保存。
內置任務儲存器中,只有MemoryJobStore不會序列化任務;內置執行器中,只有ProcessPoolExecutor會序列化任務。
建議:如果想要立刻運行任務,可以在添加任務時省略trigger參數
6. 移除任務
如果想從調度器移除一個任務,那么你就要從相應的任務儲存器中移除它,這樣才算移除了。有兩種方式:
- 調用remove_job(),參數為:任務ID,任務儲存器名稱
- 在通過add_job()創建的任務實例上調用remove()方法
第二種方式更方便,但前提必須在創建任務實例時,實例被保存在變量中。對于通過scheduled_job()創建的任務,只能選擇第一種方式。
當任務調度結束時(比如,某個任務的觸發器不再產生下次運行的時間),任務就會自動移除。
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()# 同樣,通過任務的具體ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
7. 暫停和恢復任務
通過任務實例或調度器,就能暫停和恢復任務。如果一個任務被暫停了,那么該任務的下一次運行時間就會被移除。在恢復任務前,運行次數計數也不會被統計。
暫停任務,有以下兩個方法:
- apscheduler.job.Job.pause()
- apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復任務
- apscheduler.job.Job.resume()
- apscheduler.schedulers.base.BaseScheduler.resume_job()
8. 獲取任務列表
通過get_jobs()就可以獲得一個可修改的任務列表。get_jobs()第二個參數可以指定任務儲存器名稱,那么就會獲得對應任務儲存器的任務列表。
print_jobs()可以快速打印格式化的任務列表,包含觸發器,下次運行時間等信息。
修改任務
通過apscheduler.job.Job.modify()或modify_job(),你可以修改任務當中除了id的任何屬性。
比如:
job.modify(max_instances=6, name='Alternate name')
如果想要重新調度任務(就是改變觸發器),你能通過apscheduler.job.Job.reschedule()或reschedule_job()來實現。這些方法會重新創建觸發器,并重新計算下次運行時間。
比如:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
9. 關閉調度器
scheduler.shutdown()
默認情況下,調度器會先把正在執行的任務處理完,再關閉任務儲存器和執行器。但是,如果你就直接關閉,你可以添加參數:
scheduler.shutdown(wait=False)
上述方法不管有沒有任務在執行,會強制關閉調度器。
10. 暫停、恢復任務進程
# 暫停正在執行的任務
scheduler.pause()# 恢復任務:
scheduler.resume()# 也可以在調度器啟動時,默認所有任務設為暫停狀態。
scheduler.start(paused=True)