【優化技術專題】「温故而知新」基於Quartz系列的任務調度框架的動態化任務實現分析

語言: CN / TW / HK

theme: condensed-night-purple

不提XXLJOB或者其他的調度框架,就看我接觸的第一個任務調度框架Quartz(温故而知新)

Quartz的動態暫停 恢復 修改和刪除任務

實現動態添加定時任務,先來看一下我們初步要實現的目標效果圖,這裏我們只在內存中操作,並沒有把quartz的任何信息保存到數據庫,即使用的是RAMJobStore,

當然如果你有需要,可以實現成JDBCJobStore,那樣任務信息將會更全面。

例如,我們要先列出計劃中的定時任務以及正在執行中的定時任務,這裏的正在執行中指的是任務已經觸發線程還沒執行完的情況。

  • 比如每天2點執行一個數據導入操作,這個操作執行時間需要5分鐘,在這5分鐘之內這個任務才是運行中的任務。
  • 當任務正常時可以使用暫停按鈕,任務暫停時可以使用恢復按鈕。

trigger各狀態説明:

  • None:Trigger已經完成且不會在執行,或找不到該觸發器,或Trigger已經被刪除.
  • NORMAL:正常狀態
  • PAUSED:暫停狀態
  • COMPLETE:觸發器完成,但是任務可能還正在執行中
  • BLOCKED:線程阻塞狀態
  • ERROR:出現錯誤

定時任務運行工廠類

任務運行入口,即Job實現類,在這裏我把它看作工廠類: ```java /* * 定時任務運行工廠類 * * User: liyd * Date: 14-1-3 * Time: 上午10:11 / public class QuartzJobFactory implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
    System.out.println("任務成功運行");
    ScheduleJob scheduleJob

= (ScheduleJob)context.getMergedJobDataMap().get("scheduleJob"); System.out.println("任務名稱 = [" + scheduleJob.getJobName() + "]"); }
} ```

這裏我們實現的是無狀態的Job,如果要實現有狀態的Job在以前是實現StatefulJob接口,在我使用的quartz 2.2.1中,StatefulJob接口已經不推薦使用了,換成了註解的方式,只需要給你實現的Job類加上註解@DisallowConcurrentExecution即可實現有狀態:

java /** * 定時任務運行工廠類 * <p/> * User: liyd * Date: 14-1-3 * Time: 上午10:11 */ @DisallowConcurrentExecution public class QuartzJobFactory implements Job {...}

創建任務

既然要動態的創建任務,我們的任務信息當然要保存在某個地方了,這裏我們新建一個保存任務信息對應的實體類: java /** * 計劃任務信息 * User: liyd * Date: 14-1-3 * Time: 上午10:24 */ public class ScheduleJob { /** 任務id */ private String jobId; /** 任務名稱 */ private String jobName; /** 任務分組 */ private String jobGroup; /** 任務狀態 0禁用 1啟用 2刪除*/ private String jobStatus; /** 任務運行時間表達式 */ private String cronExpression; /** 任務描述 */ private String desc; getter and setter .... }

接下來我們創建測試數據,實際應用中該數據可以保存在數據庫等地方,我們把任務的分組名+任務名作為任務的唯一key,和quartz中的實現方式一致:

```java / 計劃任務map / private static Map jobMap = new HashMap(); static { for (int i = 0; i < 5; i++) { ScheduleJob job = new ScheduleJob(); job.setJobId("10001" + i); job.setJobName("data_import" + i); job.setJobGroup("dataWork"); job.setJobStatus("1"); job.setCronExpression("0/5 * * * ?"); job.setDesc("數據導入任務"); addJob(job); } }

/* * 添加任務 * @param scheduleJob / public static void addJob(ScheduleJob scheduleJob) { jobMap.put(scheduleJob.getJobGroup() + "_" + scheduleJob.getJobName(), scheduleJob); } ```

有了調度工廠,有了任務運行入口實現類,有了任務信息,接下來就是創建我們的定時任務了,在這裏我把它設計成一個Job對應一個trigger,兩者的分組及名稱相同,方便管理,條理也比較清晰,在創建任務時如果不存在新建一個,如果已經存在則更新任務,主要代碼如下

schedulerFactoryBean 由spring創建注入

```java Scheduler scheduler = schedulerFactoryBean.getScheduler();

//這裏獲取任務信息數據 List jobList = DataWorkContext.getAllJob(); for (ScheduleJob job : jobList) { TriggerKey T = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup()); //獲取trigger,即在spring配置文件中定義的 bean id="myTrigger" CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); //不存在,創建一個 if (null == trigger) { JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class) .withIdentity(job.getJobName(), job.getJobGroup()).build(); jobDetail.getJobDataMap().put("scheduleJob", job); //表達式調度構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job .getCronExpression()); //按新的cronExpression表達式構建一個新的trigger trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); } else { // Trigger已存在,那麼更新相應的定時設置 //表達式調度構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job .getCronExpression()); //按新的cronExpression表達式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) .withSchedule(scheduleBuilder).build(); //按新的trigger重新設置job執行 scheduler.rescheduleJob(triggerKey, trigger); } } ```

  • 如此,可以説已經完成了我們的動態任務創建,大功告成了。有了上面的代碼,添加和修改任務是不是也會了,順道解決了?

  • 上面我們創建的5個測試任務,都是5秒執行一次,都將調用QuartzJobFactory的execute方法,但是傳入的任務信息參數不同,execute方法中的如下代碼就是得到具體的任務信息,包括任務分組和任務名:

java ScheduleJob scheduleJob = (ScheduleJob)context.getMergedJobDataMap().get("scheduleJob");

  • 有了任務分組和任務名即確定了該任務的唯一性,接下來需要什麼操作實現起來是不是就很容易了?
  • 以後需要添加新的定時任務只需要在任務信息列表中加入記錄即可,然後在execute方法中通過判斷任務分組和任務名來實現你具體的操作。
  • 以上已經初始實現了我們需要的功能,增加和修改也已經可以通過源代碼舉一反三出來,但是我們在實際開發的時候需要進行測試,如果一個任務是1個小時運行一次的,測試起來是不是很不方便?當然你可以修改任務的運行時間表達式,但相信這不是最好的方法,接下來我們就要實現在不對當前任務信息做任何修改的情況下觸發任務,並且該觸發只會運行一次作測試用。

計劃中的任務

主要是已經添加到quartz調度器的任務,因為quartz並沒有直接提供這樣的查詢接口,所以我們需要結合JobKey和Trigger來實現,核心代碼: 

java Scheduler scheduler = schedulerFactoryBean.getScheduler(); GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup(); Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(); for (JobKey jobKey : jobKeys) { List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { ScheduleJob job = new ScheduleJob(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDesc("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } } - jobList就是我們需要的計劃中的任務列表,需要注意一個job可能會有多個trigger的情況,在下面講到的立即運行一次任務的時候,會生成一個臨時的trigger也會出現在這。

  • 這裏把一個Job有多個trigger的情況看成是多個任務。包括在實際項目中一般用到的都是CronTrigger ,所以這裏我們着重處理了下CronTrigger的情況。

運行中的任務

實現和計劃中的任務類似,核心代碼: 

java Scheduler scheduler = schedulerFactoryBean.getScheduler(); List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs(); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size()); for (JobExecutionContext executingJob : executingJobs) { ScheduleJob job = new ScheduleJob(); JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDesc("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); }

暫停任務機制

比較簡單,核心代碼:

java Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.pauseJob(jobKey);

恢復任務

暫停任務相對,核心代碼:

java Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.resumeJob(jobKey);

刪除任務

刪除任務後,所對應的trigger也將被刪除 

java Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.deleteJob(jobKey);

立即運行任務

  • 這裏的立即運行,只會運行一次,方便測試時用。quartz是通過臨時生成一個trigger的方式來實現的,這個trigger將在本次任務運行完成之後自動刪除。

  • trigger的key是隨機生成的,例如:DEFAULT.MT_4k9fd10jcn9mg。

  • 在我的測試中,前面的DEFAULT.MT是固定的,後面部分才隨機生成。 

java Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.triggerJob(jobKey);

更新任務的時間表達式

更新之後,任務將立即按新的時間表達式執行: 

```java Scheduler scheduler = schedulerFactoryBean.getScheduler();

TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());

//獲取trigger,即在spring配置文件中定義的 bean id="myTrigger" CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

//表達式調度構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob .getCronExpression());

//按新的cronExpression表達式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) .withSchedule(scheduleBuilder).build();

//按新的trigger重新設置job執行 scheduler.rescheduleJob(triggerKey, trigger);
```


cronExpression表達式:

  • 字段   允許值   允許的特殊字符

    • 秒    0-59    , - * /
    • 分    0-59    , - * /
    • 小時    0-23    , - * /
    • 日期    1-31    , - * ? / L W C
    • 月份    1-12 或者 JAN-DEC    , - * /
    • 星期    1-7 或者 SUN-SAT    , - * ? / L C #
    • 年(可選)    留空, 1970-2099    , - * /
  • 特殊字符   意義

    • *    表示所有值;
    • ?    表示未説明的值,即不關心它為何值;
    • -    表示一個指定的範圍;
    • ,    表示附加一個可能值;
    • /    符號前表示開始時間,符號後表示每次遞增的值;
  • L W C

    • L("last")    ("last") "L" 用在day-of-month字段意思是 "這個月最後一天";用在 day-of-week字段, 它簡單意思是 "7" or "SAT"。如果在day-of-week字段裏和數字聯合使用,它的意思就是 "這個月的最後一個星期幾"
      • 例如: "6L" means "這個月的最後一個星期五". 當我們用“L”時,不指明一個列表值或者範圍是很重要的,不然的話,我們會得到一些意想不到的結果。
    • W("weekday")    只能用在day-of-month字段。用來描敍最接近指定天的工作日(週一到週五)。
      • 例如:在day-of-month字段用“15W”指“最接近這個月第15天的工作日”,即如果這個月第15天是週六,那麼觸發器將會在這個月第14天即週五觸發;如果這個月第15天是週日,那麼觸發器將會在這個月第16 天即週一觸發;如果這個月第15天是週二,那麼就在觸發器這天觸發。
      • 注意一點:這個用法只會在當前月計算值,不會越過當前月。“W”字符僅能在day- of-month指明一天,不能是一個範圍或列表。也可以用“LW”來指定這個月的最後一個工作日。
    • # 只能用在day-of-week字段。用來指定這個月的第幾個周幾。例:在day-of-week字段用"6#3"指這個月第3個週五(6指週五,3指第3個)。如果指定的日期不存在,觸發器就不會觸發。
    • C    指和calendar聯繫後計算過的值。
      • 例:在day-of-month 字段用“5C”指在這個月第5天或之後包括calendar的第一天;在day-of-week字段用“1C”指在這週日或之後包括calendar的第一天。

- 星期的簡寫: - 週一 MON - 週二 TUE - 週三 WED - 週四 THU - 週五 FRI - 週六 SAT - 週日 SUN

在MONTH和Day Of Week字段裏對字母大小寫不敏感

  • 表達式   意義
    • 每天中午12點觸發
      • "0 0 12 * * ?"
    • 每天上午10:15觸發
      • "0 15 10 ? * *"
      • "0 15 10 * * ?"
      • "0 15 10 * ? " (此處最後一項 年是可選的)
    • 2005年的每天上午10:15觸發
      • "0 15 10 * * ? 2005"
    • 每天下午2點到下午2:59期間的每1分鐘觸發
      • "0 * 14 * * ?"
    • 每天下午2點到下午2:55期間的每5分鐘觸發
      • "0 0/5 14 * * ?"
    • 每天下午2點到2:55期間和下午6點到6:55期間的每5分鐘觸發
      • "0 0/5 14,18 * * ?"
    • 每天下午2點到下午2:05期間的每1分鐘觸發
      • "0 0-5 14 * * ?"   
    • 每年三月的星期三的下午2:10和2:44觸發
      • "0 10,44 14 ? 3 WED" / "0 10,44 14 ? 3 WED * "
    • 週一至週五的上午10:15觸發
      • "0 15 10 ? * MON-FRI"    / "0 15 10 ? * MON-FRI * "
    • 每月15日上午10:15觸發
      • "0 15 10 15 * ?"
    • 每月最後一日的上午10:15觸發
      • "0 15 10 L * ?"   
    • 每月的最後一個星期五上午10:15觸發
      • "0 15 10 ? * 6L"   
    • 2002年至2005年的每月的最後一個星期五上午10:15觸發
      • "0 15 10 ? * 6L 2002-2005"   
    • 每月的第三個星期五上午10:15觸發
      • "0 15 10 ? * 6#3"   
    • 每兩個小時
      • 0 /2 * *
「其他文章」