Quartz
數據結構
quartz采用完全二叉樹:除了最后一層每一層節點都是滿的,而且最后一層靠左排列。
二叉樹節點個數規則:每層從左開始,第一層只有一個,就是2的0次冪,第二層兩個就是2的1次冪,第三層4個就是2的2次冪,…
quartz采用二叉樹的數據結構,因為二叉樹有小頂堆與大頂堆的特性,即把最小或者最大的節點放到最上面,而quartz總是要先執行最快到時間的,所以quartz去小頂堆的頂點去拿最快到期的任務去執行。
java沒有支持二叉樹的代碼,quartz將二叉樹放入數組,從頂點開始,依照自上而下從左到右的方式存入數組中。
quartz創建新的定時任務時會放入數組最后,也就是二叉樹最下層,然后會將這個任務節點與父節點作比較,比父節點小就上浮,直到不小于父節點為止;
quartz執行了頂點最快到期的任務后會將頂點刪除,然后將最下面的節點放到頂點,然后與相鄰下一層的最小節點比較,大于它則下沉,直到沉到沒有小于它的節點
整體架構
Job
定義
定時任務業務類,用于執行業務邏輯,你可以只創建一個job
類,然后創建多個與該job關聯的JobDetail
實例,每一個實例都有自己的屬性集和JobDataMap
,最后,將所有的實例都加到scheduler
中。
Job
分為有狀態(保存數據)和無狀態(不保存數據),有狀態的Job
為StatefulJob
接口,無狀態的為Job
接口。
使用
需要實現Job接口重寫execute方法
import org.quartz.*;public class MyJob implements Job {@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException { //業務代碼}
}
生命周期:每次在調度器在執行job的時候,他是在execute()方法前創建一個新的job實例(JobDetail)。當調用完之后,關聯的job對象實例會被釋放,釋放之后將會被垃圾回收機制回收
JobDetail
定義
job的實例,封裝job并描述job的細節,job為實際執行的業務,一個job可對應多個jobdetail
使用
//MyJob為實際業務類,可以從 jobDetail1 中獲取任務信息
JobDetail jobDetail1 = JobBuilder.newJob(MyJob.class).build();
JobDataMap
定義
存儲數據對象,用于定時任務執行時使用,在job實例對象被執行時調用,可同時用于JobDetail
與Trigger
使用
通過
.usingJobData("key","value")
或
.JobDataMap.Put("myclass", myclass)
方法賦值,
還可以用此方法直接給Job業務類成員變量賦值
JobDetail jobDetail = JobBuilder.newJob(MyJob.class)/**可以在業務類MyJob中通過context.getJobDetail().getJobDataMap()獲取*/.usingJobData("job","jobDetail")/**可以直接賦值到業務類MyJob的name屬性中*/.usingJobData("name","jobDetail").usingJobData("count1",0).build();//也可以用 jobDetail.JobDataMap.Put("myclass", myclass);Trigger trigger = TriggerBuilder.newTrigger()/**可以在業務類MyJob中通過context.getTrigger().getJobDataMap()獲取*/.usingJobData("trigger","trigger")/**會覆蓋JobDetail中對業務類MyJob的name屬性的賦值*/.usingJobData("name","trigger").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever()).build();
在業務Job類中通過JobDetailMap
的getString("key")
獲取
public class MyJob implements Job {private String name;public void setName(String name) {this.name = name;}@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {System.out.println("name:"+name);//從觸發器獲取JobDataMap triggerMap = context.getTrigger().getJobDataMap();//從任務獲取JobDataMap jobDetailMap = context.getJobDetail().getJobDataMap();System.out.println("jobDetailMap:"+jobDetailMap.getString("job"));System.out.println("triggerMap:"+triggerMap.getString("trigger"));}
}
/*** 獲取JobDetail與Trigger的JobDataMap,并拼到一個map中,但是key重復會覆蓋* */
JobDataMap mergeMap = context.getMergedJobDataMap();
序列化問題
如果你使用的是持久化的存儲機制(JDBCJobStore
),在決定JobDataMap
中存放什么數據的時候需要小心,因為JobDataMap
中存儲的對象都會被序列化,因此很可能會導致類的版本不一致的問題;Java的標準類型都很安全,如果你已經有了一個類的序列化后的實例,某個時候,別人修改了該類的定義,此時你需要確保對類的修改沒有破壞兼容性;更多細節,參考下方描述。另外,你也可以配置JDBC-JobStore
和JobDataMap
,使得map中僅允許存儲基本類型和String類型的數據,這樣可以避免后續的序列化問題。
Java的序列化機制是通過在運行時判斷類的serialVersionUID來驗證版本一致性的。在進行反序列化時,JVM會把傳來的字節流中的serialVersionUID與本地相應實體(類)的serialVersionUID進行比較,如果相同就認為是一致的,可以進行反序列化,否則就會出現序列化版本不一致的異常。當實現java.io.Serializable接口的實體(類)沒有顯式地定義一個名為serialVersionUID,類型為long的變量時,Java序列化機制會根據編譯的class自動生成一個serialVersionUID作序列化版本比較用,這種情況下,只有同一次編譯生成的class才會生成相同的serialVersionUID 。即:我們沒有顯式指定一個版本號serialVersionUID,在修改序列化的類后就會反序列化失敗。我們應該總是顯式指定一個版本號,這樣做的話我們不僅可以增強對序列化版本的控制,而且也提高了代碼的可移植性。因為不同的JVM有可能使用不同的策略來計算這個版本號,那樣的話同一個類在不同的JVM下也會認為是不同的版本。
Trigger
觸發器,定義定時任務觸發規則,即時間。
Trigger的公共屬性
trigger的公共屬性有:
jobKey
屬性:當trigger觸發時被執行的job的身份;startTime
屬性:設置trigger第一次觸發的時間;該屬性的值是java.util.Date類型,表示某個指定的時間點;有些類型的trigger,會在設置的startTime時立即觸發,有些類型的trigger,表示其觸發是在startTime之后開始生效。比如,現在是1月份,你設置了一個trigger–“在每個月的第5天執行”,然后你將startTime屬性設置為4月1號,則該trigger第一次觸發會是在幾個月以后了(即4月5號)。endTime
屬性:表示trigger失效的時間點。比如,”每月第5天執行”的trigger,如果其endTime是7月1號,則其最后一次執行時間是6月5號。
優先級(priority)
如果你的trigger很多(或者Quartz線程池的工作線程太少),Quartz可能沒有足夠的資源同時觸發所有的trigger;這種情況下,你可能希望控制哪些trigger優先使用Quartz的工作線程,要達到該目的,可以在trigger上設置priority屬性。比如,你有N個trigger需要同時觸發,但只有Z個工作線程,優先級最高的Z個trigger會被首先觸發。如果沒有為trigger設置優先級,trigger使用默認優先級,值為5;priority屬性的值可以是任意整數,正數、負數都可以。
注意:只有同時觸發的trigger之間才會比較優先級。10:59觸發的trigger總是在11:00觸發的trigger之前執行。
注意:如果trigger是可恢復的,在恢復后再調度時,優先級與原trigger是一樣的。
錯過觸發(misfire Instructions)
trigger還有一個重要的屬性misfire
;如果scheduler關閉了,或者Quartz線程池中沒有可用的線程來執行job,此時持久性的trigger就會錯過(miss)其觸發時間,即錯過觸發(misfire)。
導致misfire有三個原因:
- 所有工作線程都忙于運行其他作業(可能具有更高的優先級)
- 調度程序本身已關閉
- 該作業是在過去的開始時間安排的(可能是編碼錯誤)
不同類型的trigger,有不同的misfire機制。**它們默認都使用“智能機制(smart policy)”,**即根據trigger的類型和配置動態調整行為。當scheduler啟動的時候,查詢所有錯過觸發(misfire)的持久性trigger。然后根據它們各自的misfire機制更新trigger的信息。當你在項目中使用Quartz時,你應該對各種類型的trigger的misfire機制都比較熟悉,這些misfire機制在JavaDoc中有說明。關于misfire機制的細節,會在講到具體的trigger時作介紹。
所有的trigger都有一個Trigger.MISFIRE_INSTRUCTION_SMART_POLICY '智能機制(smart policy)'
策略可以使用,該策略也是所有trigger的默認策略。如果使用smart policy,SimpleTrigger會根據實例的配置及狀態,在所有MISFIRE策略中動態選擇一種Misfire策略。
日歷示例(calendar)
Quartz的Calendar
對象(不是java.util.Calendar
對象)可以在定義和存儲trigger的時候與trigger進行關聯。Calendar
用于從trigger的調度計劃中排除時間段。比如,可以創建一個trigger,每個工作日的上午9:30執行,然后增加一個Calendar,排除掉所有的商業節日。
org.quartz.impl.calendar包下BaseCalendar
為高級的 Calendar 實現了基本的功能,實現了 org.quartz.Calendar 接口AnnualCalendar
排除年中一天或多天CronCalendar
日歷的這種實現排除了由給定的CronExpression表達的時間集合。 例如,您可以使用此日歷使用表達式“* * 0-7,18-23?* *”每天排除所有營業時間(上午8點至下午5點)。 如果CronTrigger具有給定的cron表達式并且與具有相同表達式的CronCalendar相關聯,則日歷將排除觸發器包含的所有時間,并且它們將彼此抵消。DailyCalendar
您可以使用此日歷來排除營業時間(上午8點 - 5點)每天。 每個DailyCalendar僅允許指定單個時間范圍,并且該時間范圍可能不會跨越每日邊界(即,您不能指定從上午8點至凌晨5點的時間范圍)。 如果屬性invertTimeRange為false(默認),則時間范圍定義觸發器不允許觸發的時間范圍。 如果invertTimeRange為true,則時間范圍被反轉 - 也就是排除在定義的時間范圍之外的所有時間。HolidayCalendar
特別的用于從 Trigger 中排除節假日MonthlyCalendar
排除月份中的指定數天,例如,可用于排除每月的最后一天WeeklyCalendar
排除星期中的任意周幾,例如,可用于排除周末,默認周六和周日
使用Calendar
的步驟較為簡單:
第一步,創建Calendar, 并添加到Scheduler中。
DailyCalendar calendar = new DailyCalendar("9:22:00","9:30:00");
scheduler.addCalendar("calendar", calendar, false, false);
第二步,使用TriggerBuilder方法時,添加modifiedbyCalendar,參數為calendar的名稱。
return TriggerBuilder.newTrigger().withIdentity("test trigger", "test").startNow().withSchedule(simpleSchedule().repeatSecondlyForTotalCount(6).withIntervalInMinutes(1)).modifiedByCalendar("calendar").build();
Trigger種類
五種類型的 Trigger(2.3.2版本):SimpleTrigger,CronTrigger,CalendarIntervalTrigger,DailyTimeIntervalTrigger,MutableTrigger,OperableTrigger
最常用的:
SimpleTrigger(簡單觸發器) | 進行簡單的觸發,僅需觸發一次或者以固定時間間隔周期執行:如每日的5點執行一次;每分鐘執行一次 |
---|---|
CronTrigger(表達式觸發器) | 進行復雜的觸發:如每月的第幾周第幾天什么時候執行 |
/**StartAt() 表示觸發器的時間表首次被觸發的時間。它的值的類型是java.util.Date。EndAt() 指定觸發器的不再被觸發的時間。它的值是java.util.Date。
*/
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1","trigger1")/**立即執行*/.startNow()/**簡單調度,每秒執行一次*/.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever()).build();
SimpleTrigger
SimpleTrigger
可以滿足的調度需求是:在具體的時間點執行一次;或者在具體的時間點執行,并且以指定的間隔重復執行若干次。
SimpleTrigger
的屬性包括:開始時間、結束時間、重復次數以及重復的間隔。重復次數,可以是0、正整數,以及常量SimpleTrigger.REPEAT_INDEFINITELY。重復的間隔,必須是0,或者long型的正數,表示毫秒。注意,如果重復間隔為0,trigger將會以重復次數并發執行(或者以scheduler可以處理的近似并發數)。
指定時間開始觸發,不重復(執行一次):
SimpleTrigger trigger = (SimpleTrigger) newTrigger() .withIdentity("trigger1", "group1").startAt(myStartTime) // some Date .forJob("job1", "group1") // identify job with name, group strings.build();
指定時間觸發,每隔10秒執行一次,重復10次:
trigger = newTrigger().withIdentity("trigger3", "group1").startAt(myTimeToStartFiring) // if a start time is not given (if this line were omitted), "now" is implied.withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(10)) // note that 10 repeats will give a total of 11 firings.forJob(myJob) // identify job with handle to its JobDetail itself .build();
5分鐘以后開始觸發,僅執行一次:
trigger = (SimpleTrigger) newTrigger() .withIdentity("trigger5", "group1").startAt(futureDate(5, IntervalUnit.MINUTE)) // use DateBuilder to create a date in the future.forJob(myJobKey) // identify job with its JobKey.build();
立即觸發,每個5分鐘執行一次,直到22:00:
trigger = newTrigger().withIdentity("trigger7", "group1").withSchedule(simpleSchedule().withIntervalInMinutes(5).repeatForever()).endAt(dateOf(22, 0, 0)).build();
建立一個觸發器,將在下一個小時的整點觸發,然后每2小時重復一次:
trigger = newTrigger().withIdentity("trigger8") // because group is not specified, "trigger8" will be in the default group.startAt(evenHourDate(null)) // get the next even-hour (minutes and seconds zero ("00:00")).withSchedule(simpleSchedule().withIntervalInHours(2).repeatForever())// note that in this example, 'forJob(..)' is not called which is valid // if the trigger is passed to the scheduler along with the job .build();scheduler.scheduleJob(trigger, job);
請查閱TriggerBuilder和SimpleScheduleBuilder提供的方法,以便對上述示例中未提到的選項有所了解。
TriggerBuilder(以及Quartz的其它builder)會為那些沒有被顯式設置的屬性選擇合理的默認值。比如:如果你沒有調用withIdentity(..)方法,TriggerBuilder會為trigger生成一個隨機的名稱;如果沒有調用startAt(..)方法,則默認使用當前時間,即trigger立即生效。
SimpleTrigger的Misfire策略常量:
MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY
MISFIRE_INSTRUCTION_FIRE_NOW
MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT
MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT
MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT
在使用SimpleTrigger構造trigger時,misfire策略作為基本調度(simple schedule)的一部分進行配置(通過SimpleSchedulerBuilder設置):
trigger = newTrigger().withIdentity("trigger7", "group1").withSchedule(simpleSchedule().withIntervalInMinutes(5).repeatForever().withMisfireHandlingInstructionNextWithExistingCount()).build();
CronTrigger
CronTrigger通常比Simple Trigger更有用,如果您需要基于日歷的概念而不是按照SimpleTrigger的精確指定間隔進行重新啟動的作業啟動計劃。
使用CronTrigger,您可以指定號時間表,例如“每周五中午”或“每個工作日和上午9:30”,甚至“每周一至周五上午9:00至10點之間每5分鐘”和1月份的星期五“。
即使如此,和SimpleTrigger一樣,CronTrigger有一個startTime,它指定何時生效,以及一個(可選的)endTime,用于指定何時停止計劃。
cron表達式生成器:https://cron.qqe2.com/
建立一個觸發器,每隔兩分鐘,每天上午8點至下午5點之間:
trigger = newTrigger().withIdentity("trigger3", "group1").withSchedule(cronSchedule("0 0/2 8-17 * * ?")).forJob("myJob", "group1").build();
建立一個觸發器,將在上午10:42每天發射:
trigger = newTrigger().withIdentity("trigger3", "group1").withSchedule(dailyAtHourAndMinute(10, 42)).forJob(myJobKey).build();
或者:
trigger = newTrigger().withIdentity("trigger3", "group1").withSchedule(cronSchedule("0 42 10 * * ?")).forJob(myJobKey).build();
建立一個觸發器,將在星期三上午10:42在TimeZone(系統默認值)之外觸發:
trigger = newTrigger().withIdentity("trigger3", "group1").withSchedule(weeklyOnDayAndHourAndMinute(DateBuilder.WEDNESDAY, 10, 42)).forJob(myJobKey).inTimeZone(TimeZone.getTimeZone("America/Los_Angeles")).build();
或者:
trigger = newTrigger().withIdentity("trigger3", "group1").withSchedule(cronSchedule("0 42 10 ? * WED")).inTimeZone(TimeZone.getTimeZone("America/Los_Angeles")).forJob(myJobKey).build();
CronTrigger的Misfire指令常數
MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY
MISFIRE_INSTRUCTION_DO_NOTHING
MISFIRE_INSTRUCTION_FIRE_NOW
在構建CronTriggers時,您可以將misfire指令指定為簡單計劃的一部分(通過CronSchedulerBuilder):
trigger = newTrigger().withIdentity("trigger3", "group1").withSchedule(cronSchedule("0 0/2 8-17 * * ?").withMisfireHandlingInstructionFireAndProceed()).forJob("myJob", "group1").build();
Scheduler
定義
調度器,通過線程池進行任務調度,按照Trigger定義的時間執行Job,它是單例的。
Scheduler 中的方法主要分為三大類:
- 操作調度器本身,例如調度器的啟動
start()
、調度器的關閉shutdown()
。 - 操作
Trigger
,例如pauseTriggers()
、resumeTrigger()
。 - 操作
Job
,例如scheduleJob()
、unscheduleJob()
、rescheduleJob()
使用
默認情況下,StdSchedulerFactory從當前工作目錄加載“quartz.properties”文件。如果加載失敗,那么就會嘗試加載org/quartz包下的“quartz.properties”文件。如果不想用默認的文件,你可以定義一個系統屬性“org.quartz.properties”指向你想要的文件。
try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();//加載quartz自帶的org.quartz.properties//結合了jobDetail與trigger進行調度scheduler.scheduleJob(jobDetail,trigger);scheduler.start();} catch (SchedulerException e) {e.printStackTrace();}
Scheduler 的生命期
Scheduler 的生命期,從 SchedulerFactory 創建它時開始,到 Scheduler 調用shutdown() 方法時結束;Scheduler 被創建后,可以增加、刪除和列舉 Job 和 Trigger,以及執行其它與調度相關的操作(如暫停 Trigger)。但是,Scheduler 只有在調用 start() 方法后,才會真正地觸發 trigger(即執行 job)
Scheduler 創建
Scheduler接口有兩個實現類,分別為StdScheduler(標準默認調度器)和RemoteScheduler(遠程調度器)
常用的是StdSchedulerFactory
1.通過DirectSchedulerFactory創建一個實例:
public static void main(String[] args) {try {DirectSchedulerFactory schedulerFactory = DirectSchedulerFactory.getInstance();// 表示以3個工作線程初始化工廠schedulerFactory.createVolatileScheduler(3);Scheduler scheduler = schedulerFactory.getScheduler(); } catch (SchedulerException e) {e.printStackTrace();}}
創建步驟:
?1、通過DirectSchedulerFactory的getInstance方法得到拿到實例
?2、調用createXXX方法初始化工廠
?3、調用工廠實例的getScheduler方法拿到調度器實例
可以看出,DirectSchedulerFactory是通過createXXX方法傳遞配置參數來初始化工廠,這種初始化方式是一種硬編碼,在工作中用到的情況會很少。
2.使用StdSchedulerFactory工廠創建
此工廠是依賴一系列的屬性來決定如何創建調度器實例的。
屬性提供的方式有三種:
?1、通過java.util.Properties屬性實例
?2、通過外部屬性文件提供
?3、通過有屬性文件內容的 java.io.InputStream 文件流提供
public static void main(String[] args) {try {StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();// 第一種方式 通過Properties屬性實例創建Properties props = new Properties();props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");props.put("org.quartz.threadPool.threadCount", 5);schedulerFactory.initialize(props);// 第二種方式 通過傳入文件名// schedulerFactory.initialize("my.properties");// 第三種方式 通過傳入包含屬性內容的文件輸入流// InputStream is = new FileInputStream(new File("my.properties"));// schedulerFactory.initialize(is);// 獲取調度器實例Scheduler scheduler = schedulerFactory.getScheduler();} catch (Exception e) {e.printStackTrace();}}
JobStore數據庫連接
Jobstore
用來存儲任務和觸發器相關的信息,例如所有任務的名稱、數量、狀態等等。Quartz中有兩種存儲任務的方式,一種在在內存,一種是在數據庫。詳細見下方。
其他
QuartzSchedulerThread
:負責執行向QuartzScheduler注冊的觸發Trigger的工作的線程。
ThreadPool
:Scheduler使用一個線程池作為任務運行的基礎設施,任務通過共享線程池中的線程提供運行效率。
QuartzSchedulerResources
:包含創建QuartzScheduler實例所需的所有資源(JobStore,ThreadPool等)。
SchedulerFactory
:用于獲取調度器實例。
JobStore
: 通過類實現的接口,這些類要為org.quartz.core.QuartzScheduler的使用提供一個org.quartz.Job和org.quartz.Trigger存儲機制。作業和觸發器的存儲應該以其名稱和組的組合為唯一性。
QuartzScheduler
:這是Quartz的核心,它是org.quartz.Scheduler接口的間接實現,包含調度org.quartz.Jobs,注冊org.quartz.JobListener實例等的方法。
Scheduler
:這是Quartz Scheduler的主要接口,代表一個獨立運行容器。調度程序維護JobDetails和觸發器的注冊表。 一旦注冊,調度程序負責執行作業,當他們的相關聯的觸發器觸發(當他們的預定時間到達時)。
Trigger
:具有所有觸發器通用屬性的基本接口,描述了job執行的時間出發規則。 - 使用TriggerBuilder實例化實際觸發器。
JobDetail
:傳遞給定作業實例的詳細信息屬性。 JobDetails將使用JobBuilder創建/定義。
Job
:要由表示要執行的“作業”的類實現的接口。只有一個方法 void execute(jobExecutionContext context)
(jobExecutionContext 提供調度上下文各種信息,運行時數據保存在jobDataMap中)
Job有個子接口StatefulJob
,代表有狀態任務。有狀態任務不可并發,前次任務沒有執行完,后面任務處于阻塞等到。
一個job可以被多個Trigger 綁定,但是一個Trigger只能綁定一個job!
Scheduler可以同時調度多組Trigger 及JobDetail
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.scheduleJob(jobDetail,trigger);scheduler.scheduleJob(jobDetail,trigger);scheduler.start();
Job并發及持久化
@DisallowConcurrentExecution
將該注解加到job類上,告訴Quartz不要并發地執行同一個job定義(這里指特定的job類)的多個實例。
此注解加在Job類上,但實際運行生效的是JobDetail
scheduler是默認多線程并發訪問資源的, 可以避免爭搶及定時任務堵塞.
比如前一個任務沒執行完,間隔時間就過了,又來了下一個,此時下一個正常執行,不等上一個執行完再執行除非使用@DisallowConcurrentExecution注解
此注解會保證必須上一個任務執行完成后在執行下一個,即使超過了間隔時間,如果超時,會在執行完立刻執行下一次,不會再等過了間隔時間再執行.
比如間隔為1秒,上個任務執行了3秒,3秒后會立即執行下一個任務,而不是等4秒再執行
@PersistJobDataAfterExecution
Job分為有狀態(保存數據)和無狀態(不保存數據),有狀態的Job為StatefulJob接口,無狀態的為Job接口。
無狀態任務在執行時,擁有自己的JobDataMap拷貝,對JobData的更改不會影響下次的執行。而有狀態任務共享同一個JobDataMap實例,每次任務執行對JobDataMap所做的更改都會保存下來,后面的執行可以看到這個更改。也就是每次執行任務后都會對后面的執行發生影響。
正因為這個原因,無狀態的Job可以并發執行,而有狀態的StatefulJob不能并發執行,這意味著如果前次的StatefulJob還沒有執行完畢,下一次的任務將阻塞等待,直到前次任務執行完畢。有狀態任務比無狀態任務需要考慮更多的因素,程序往往擁有更高的復雜度,因此除非必要,應該盡量使用無狀態的Job。
在quartz的2.3.2版本中,StatefulJob已取消,可以使用@PersistJobDataAfterExecution實現有狀態
@PersistJobDataAfterExecution:告訴Quartz在成功執行了Job實現類的execute方法后(沒有發生任何異常),更新JobDetail中JobDataMap的數據,使得該JobDetail實例在下一次執行的時候,JobDataMap中是更新后的數據,而不是更新前的舊數據。
而有狀態任務共享共享同一個JobDataMap實例,每次任務執行對JobDataMap所做的更改會保存下來,后面的執行可以看到這個更改,也即每次執行任務后都會對后面的執行發生影響。
以下代碼示例中JobDetail的count會累加,Trigger不會:
public class TestJob {public static void main(String[] args) {JobDetail jobDetail = JobBuilder.newJob(MyJob.class).withIdentity("job1","group1").usingJobData("count1",0).build();int count=0;Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1","trigger1").usingJobData("count",count)/**立即執行*/.startNow()/**簡單調度,每秒執行一次*/.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever()).build();try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.scheduleJob(jobDetail,trigger);scheduler.start();} catch (SchedulerException e) {e.printStackTrace();}}
}
@PersistJobDataAfterExecution //只對JobDetail有持久化作用,對Trigger沒有
public class MyJob implements Job {private String name;public void setName(String name) {this.name = name;}@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {JobDataMap triggerMap = context.getTrigger().getJobDataMap();JobDataMap jobDetailMap = context.getJobDetail().getJobDataMap();triggerMap.put("count",triggerMap.getInt("count")+1);jobDetailMap.put("count1",jobDetailMap.getInt("count1")+1);System.out.println("triggerMap count:"+triggerMap.getInt("count"));System.out.println("jobDetailMap count:"+jobDetailMap.getInt("count1"));}
}
使用建議:
如果你使用了@PersistJobDataAfterExecution注解,則強烈建議你同時使用@DisallowConcurrentExecution注解,因為當同一個job(JobDetail)的兩個實例被并發執行時,由于競爭,JobDataMap中存儲的數據很可能是不確定的。
JobStore數據庫連接
Jobstore用來存儲任務和觸發器相關的信息,例如所有任務的名稱、數量、狀態等等。Quartz中有兩種存儲任務的方式,一種在在內存,一種是在數據庫。
RAMJobStore
Quartz默認的 JobStore是 RAMJobstore,也就是把任務和觸發器信息運行的信息存儲在內存中,用到了 HashMap、TreeSet、HashSet等等數據結構。
如果程序崩潰或重啟,所有存儲在內存中的數據都會丟失。所以我們需要把這些數據持久化到磁盤。
JDBCJobStore
JDBCJobStore可以通過 JDBC接口,將任務運行數據保存在數據庫中。
DataSource設置有兩種方法:
一種方法是讓Quartz創建和管理DataSource,即在quartz.properties中配置數據源;
另一種是由Quartz正在運行的應用程序服務器管理的DataSource,通過應用管理數據源,比如springboot應用在yml中設置數據庫連接,在quartz中注入DataSource使用。
示例為quartz.properties:
#數據庫中 quartz表的表名前綴
org.quartz.jobStore.tablePrefix:QRTZ_
#數據源名字,要與下方配置名字一致
org.quartz.jobStore.dataSource:myDS
#配置數據源(此處是否可以不定義而是定義在application.properties中? 待試驗)
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual
配置JDBCJobStore以使用DriverDelegate,即數據庫代理
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
接下來,您需要通知JobStore您正在使用的表前綴(如上所述)。
使用表前綴配置JDBCJobStore
org.quartz.jobStore.tablePrefix = QRTZ_
JDBC的實現方式有兩種,JobStoreSupport類的兩個子類:
JobStoreTX:在獨立的程序中使用,自己管理事務,不參與外部事務。
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事務時,使用它。使用 JDBCJobSotre時,需要配置數據庫信息:
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreCMT
注意問題
后臺報錯Table 'seata_order.qrtz_locks' doesn't exist:
檢查兩處地方,第一處是application中的
spring.datasource.url=jdbc:mysql://42.193.104.62:3306/***? # ***應為對應數據庫的名字
第二處是數據庫的配置文件,是否開啟了不區分大小寫
quartz表
在官網的 Downloads鏈接中,提供了 11張表的建表語句: quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables
2.3的版本在這個路徑下:src\org\quartz\impl\jdbcjobstore
表名與作用:
配置文件詳解
線程池配置
#是要使用的ThreadPool實現的名稱。Quartz附帶的線程池是“org.quartz.simpl.SimpleThreadPool”,并且幾乎能夠滿足幾乎每個用戶的需求。它有非常簡單的行為,并經過很好的測試。它提供了一個固定大小的線程池。
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool#可用于并發執行作業的線程數,至少為1(無默認值)
org.quartz.threadPool.threadCount=5#設置線程的優先級,可以是Thread.MIN_PRIORITY(即1)和Thread.MAX_PRIORITY(這是10)之間的任何int 。默認值為Thread.NORM_PRIORITY(5)。
org.quartz.threadPool.threadPriority=1#使池中的線程創建為守護進程線程。默認為“false”
org.quartz.threadPool.makeThreadsDaemons=false#在工作池中的線程名稱的前綴將被附加一個數字。
org.quartz.threadPool.threadNamePrefix=1#可以是java線程的有效名稱的任何字符串。如果未指定此屬性,線程將接收調度程序的名稱(“org.quartz.scheduler.instanceName”)加上附加的字符#串“_QuartzSchedulerThread”。
org.quartz.scheduler.threadName = _QuartzSchedulerThread
JobStore配置
# 數據保存方式為數據庫持久化,并由quartz自己管理事務
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX# 數據保存方式為數據庫持久化,并由容器管理事務
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreCMT# 數據庫代理類,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以滿足大部分數據庫
#用于完全符合JDBC的驅動程序,可用于oracle、mysql
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#對于Microsoft SQL Server和Sybase
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.MSSQLDelegate
#其他參見官方文檔#數據庫中 quartz表的表名前綴
org.quartz.jobStore.tablePrefix:QRTZ_
#數據源名字,要與下方配置名字一致
org.quartz.jobStore.dataSource:myDS
#配置數據源
org.quartz.dataSource.myDS.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user=root
org.quartz.dataSource.myDS.password=123456
#是可選的SQL查詢字符串,用于檢測連接是否失敗。
org.quartz.dataSource.myDS.validationQuery=select 0 from dual#當調查器檢測到JobStore中的連接丟失(例如數據庫)時,調度程序重試等待時間(以毫秒為單位)
org.quartz.scheduler.dbFailureRetryInterval = 6000
其他配置
#使JobDataMaps中的所有值都將是“字符串”,避免了序列化問題
org.quartz.jobStore.useProperties=false#定義了觸發器應該多長時間才被認為觸發失敗,默認為60000(一分鐘)
org.quartz.jobStore.misfireThreshold = 60000#Scheduler一次獲取trigger的最大數量。默認值為1。這個數字越大,觸發效率越高(在有許多trigger需要同時觸發的場景下),但是在集群節點之間可能會有負
#載均衡的代價。如果這個屬性的值大于1,且使用JDBCJobStore,那么屬性“org.quartz.jobStore.acquireTriggersWithinLock”必須設置true,以避免數據損
#壞。
org.quartz.scheduler.batchTriggerAcquisitionMaxCount = 1#防止多個線程同時拉取相同的trigger的情況,也就避免的重復調度的危險
org.quartz.jobStore.acquireTriggersWithinLock = true
集群配置
#是否加入集群 true是 false否
org.quartz.jobStore.isClustered = true# 調度標識名 集群中每一個實例都必須使用相同的名稱
org.quartz.scheduler.instanceName = ClusterQuartz# 調度器ID設置為自動獲取 每一個必須不同
org.quartz.scheduler.instanceId= AUTO#僅當org.quartz.scheduler.instanceId設置為“AUTO” 時才使用。默認為“org.quartz.simpl.SimpleInstanceIdGenerator”,它根據主機名和時間戳生成實例#ID。其他IntanceIdGenerator實現包括SystemPropertyInstanceIdGenerator(它從系統屬性“org.quartz.scheduler.instanceId”獲取實例ID,#HostnameInstanceIdGenerator使用本地主機名
org.quartz.scheduler.instanceIdGenerator.class = org.quartz.simpl.SimpleInstanceIdGenerator
其他參考
https://blog.csdn.net/bobozai86/article/details/123777036
核心機制
流程
Quartz的核心流程大致分為三個階段:
- 獲取調度實例階段
- 通過
getScheduler
方法根據配置文件加載配置和初始化,創建線程池ThreadPool
(默認是SimpleThreadPool
,用來執行Quartz
調度任務),創建調度器QuartzScheduler
,創建調度線程QuartzSchedulerThread
,并將調度線程初始狀態設置為暫停狀態。
- 通過
- 綁定JobDetail和Trigger階段
Scheduler
將任務添加到JobStore
中,如果是使用數據庫存儲信息,這時候會把任務持久化到Quartz
核心表中,同時也會對實現JobListener
的監聽者通知任務已添加
- 啟動調度器階段
Scheduler
會調用QuartzScheduler
的Start()
方法,這時候會把調度線程從暫停切為啟動狀態,通知QuartzSchedulerThread
正式干活。QuartzSchedulerThread
會從SimpleThreadPool
查看下有多少可用工作線程,然后找JobStore
去拿下一批符合條件的待觸發的Trigger
任務列表,包裝成FiredTriggerBundle
。通過JobRunShellFactory
創建FiredTriggerBundle
的執行線程實例JobRunShell
,然后把JobRunShell
實例交給SimpleThreadPool
的工作線程去執行。SimpleThreadPool
會從可用線程隊列拿出對應數量的線程,去調用JobRunShell
的run()
方法,此時會執行任務類的execute
方法 :job.execute(JobExecutionContext context)
。
線程模型
SimpleThreadPool
:包工頭,管理所有WorkerThread
WorkerThread
:工人, 執行JobRunShell
JobRunShell
:任務,任務中有run()
方法,會執行業務類的execute
方法 :job.execute(JobExecutionContext context)
。QuartSchedulerThread
:項目經理,獲取即將觸發的Trigger
,將JobRunShell
交給SimpleThreadPool
,由SimpleThreadPool
調用WorkerThread
執行JobRunShell
Quartz集群進程間如何通信
Quartz集群之間是通過數據庫幾張核心的Quartz表進行通信
Quartz集群如何保證高并發下不重復跑
Quartz有多個節點同時在運行,而任務是共享的,這時候肯定存在資源競爭問題,容易造成并發問題,Quartz節點之間是否存在分布式鎖去控制?
Quartz
是通過數據庫去作為分布式鎖來控制多進程并發問題,Quartz
加鎖的地方很多,Quartz
是使用悲觀鎖的方式進行加鎖,讓在各個instance操作Trigger
任務期間串行,這里挑選核心的代碼來看看它是符合利用數據庫防止并發的。
使用數據庫鎖需要在quartz.properties
中加以下配置,讓集群生效Quartz才會對多個instance進行并發控制
org.quartz.jobStore.isClustered = true
QRTZ_LOCKS
表,它會為每個調度器創建兩行數據,獲取 Trigger 和觸發 Trigger 是兩把鎖,加鎖入口在JobStoreSupport
類中,Quartz提供的鎖表,為多個節點調度提供分布式鎖,實現分布式調度,默認有2個鎖
SCHED_NAME | LOCK_NAME |
---|---|
Myscheduler | STATE_ACCESS |
Myscheduler | TRIGGER_ACCESS |
STATE_ACCESS
主要用在scheduler定期檢查是否失效的時候,保證只有一個節點去處理已經失效的scheduler;
TRIGGER_ACCESS
主要用在TRIGGER被調度的時候,保證只有一個節點去執行調度
Quartz集群如何保證高并發下不漏跑
有時候Quartz
可能會錯過我們的調度任務:
- 服務重啟,沒能及時執行任務,就會misfire
- 工作線程去運行優先級更高的任務,就會misfire
- 任務的上一次運行還沒結束,下一次觸發時間到達,就會misfire
Quartz
可提供了一些補償機制應對misfire
情況,用戶可以根據需要選擇對應的策略,
Quartz常見問題
服務器始終不一致問題
常見異常:
This scheduler instance (SchedulerName) is still active but was recovered by another instance in the cluster
解決:
同步所有集群節點的時間然后重啟服務
Quartz集群負載不均衡
Quartz集群是采用搶占式加鎖方式去處理任務,因此你會看到每個節點的任務處理日志并不是均衡分配的,很可能一個節點會搶占大量任務導致負載過重,但是這一點官方并沒有解決。
錯過預定觸發時間
常見異常:
Handling 1 trigger(s) that missed their scheduled fire-time
解決:
很可能是你線程數設置太少,而任務執行時間太長,超過的misfire
閾值,導致線程池沒有可用線程而錯過了觸發事件。嘗試把配置文件線程數調大org.quartz.threadPool.threadCount
或者把misfire
閾值調大org.quartz.jobStore.misfireThreshold
簡單使用
寫業務類job
import org.quartz.*;@DisallowConcurrentExecution
@PersistJobDataAfterExecution //只對JobDetail有持久化作用,對Trigger沒有
public class MyJob implements Job {private String name;public void setName(String name) {this.name = name;}@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {System.out.println("name:"+name);//從觸發器中獲取數據JobDataMap triggerMap = context.getTrigger().getJobDataMap();//從任務中獲取數據JobDataMap jobDetailMap = context.getJobDetail().getJobDataMap();System.out.println("jobDetailMap:"+jobDetailMap.getString("job"));System.out.println("triggerMap:"+triggerMap.getString("trigger"));/*** 獲取JobDetail與Trigger的JobDataMap,并拼到一個map中,但是key重復會覆蓋* */JobDataMap mergeMap = context.getMergedJobDataMap();triggerMap.put("count",triggerMap.getInt("count")+1);jobDetailMap.put("count1",jobDetailMap.getInt("count1")+1);System.out.println("triggerMap count:"+triggerMap.getInt("count"));System.out.println("jobDetailMap count:"+jobDetailMap.getInt("count1"));}
}
定義觸發器與業務實例,并調度
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;public class TestJob {public static void main(String[] args) {JobDetail jobDetail = JobBuilder.newJob(MyJob.class).withIdentity("job1","group1")/**可以在業務類MyJob中通過context.getJobDetail().getJobDataMap()獲取*/.usingJobData("job","jobDetail")/**可以直接賦值到業務類MyJob的name屬性中*/.usingJobData("name","jobDetail").usingJobData("count1",0).build();int count=0;Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1","trigger1")/**可以在業務類MyJob中通過context.getTrigger().getJobDataMap()獲取*/.usingJobData("trigger","trigger").usingJobData("count",count)/**會覆蓋JobDetail中對業務類MyJob的name屬性的賦值*///.usingJobData("name","trigger")/**立即執行*/.startNow()/**簡單調度,每秒執行一次*/.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever()).build();try {Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();scheduler.scheduleJob(jobDetail,trigger);/*** scheduler是默認多線程并發訪問資源的, 可以避免爭搶及定時任務堵塞* 比如前一個任務沒執行完,間隔時間就過了,又來了下一個,此時下一個正常執行,不等上一個執行完再執行** 除非使用@DisallowConcurrentExecution注解* 此注解會保證必須上一個任務執行完成后在執行下一個,即使超過了間隔時間,如果超時,會在執行完立刻執行下一次,不會再等過了間隔時間* 再執行,比如間隔為1秒,執行了3秒,3秒后會立即執行,而不是等4秒再執行* */scheduler.start();} catch (SchedulerException e) {e.printStackTrace();}}
}
quartz整合springboot
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency></dependencies>
業務類定義
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.scheduling.quartz.QuartzJobBean;import java.util.Date;@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class QuartzJob extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {try {Thread.sleep(2000);System.out.println(context.getScheduler().getSchedulerInstanceId());System.out.println("taskname="+context.getJobDetail().getKey().getName());System.out.println("執行時間="+new Date());} catch (Exception e) {e.printStackTrace();}}
}
配置類
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executor;@Configuration
public class SchedulerConfig {/*** 注入應用的數據源* */@Autowiredprivate DataSource dataSource;/*** 配置線程池* Runtime.getRuntime().availableProcessors() 獲取的是cpu核心線程數也就是計算資源。* */@Beanpublic Executor schedulerThreadPool(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());return executor;}/*** 從自定義的properties加載quartz配置* */@Beanpublic Properties quartzProperties() throws IOException {PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();propertiesFactoryBean.setLocation(new ClassPathResource("/spring-quartz.properties"));propertiesFactoryBean.afterPropertiesSet();return propertiesFactoryBean.getObject();}/*** 創建schedulerFactoryBean工廠實例用于獲取scheduler* */@Beanpublic SchedulerFactoryBean schedulerFactoryBean() throws IOException {SchedulerFactoryBean factory = new SchedulerFactoryBean();factory.setSchedulerName("cluster_scheduler");factory.setDataSource(dataSource);factory.setApplicationContextSchedulerContextKey("application");factory.setQuartzProperties(quartzProperties());factory.setTaskExecutor(schedulerThreadPool());factory.setStartupDelay(0);return factory;}/*** 從工廠實例獲取scheduler* */@Beanpublic Scheduler scheduler() throws IOException {return schedulerFactoryBean().getScheduler();}
}
創建調度及觸發
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;/*** ApplicationListener< ContextRefreshedEvent> 一般被用于在項目初始化動作完成后執行的自己業務拓展動作* 實現onApplicationEvent(ContextRefreshedEvent event)方法,應用一啟動就會執行此方法* */
@Component
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent> {@Autowiredprivate Scheduler scheduler;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {try {TriggerKey triggerKey = TriggerKey.triggerKey("trigger1","group1");Trigger trigger = scheduler.getTrigger(triggerKey);if(trigger == null){trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?")).startNow().build();JobDetail jobDetail = JobBuilder.newJob(QuartzJob.class).withIdentity("job1","group1").build();scheduler.scheduleJob(jobDetail,trigger);}TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2","group2");Trigger trigger2 = scheduler.getTrigger(triggerKey2);if(trigger2 == null) {trigger2 = TriggerBuilder.newTrigger().withIdentity(triggerKey2).withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?")).startNow().build();JobDetail jobDetail2 = JobBuilder.newJob(QuartzJob.class).withIdentity("job2", "group2").build();scheduler.scheduleJob(jobDetail2, trigger2);}scheduler.start();} catch (SchedulerException e) {e.printStackTrace();}}
}