Java 定時任務技術趨勢

語言: CN / TW / HK

定時任務是每個業務常見的需求,比如每分鐘掃描超時支付的訂單,每小時清理一次資料庫歷史資料,每天統計前一天的資料並生成報表等等。

01

Java 中自帶的解決方案

Cloud Native

1

使用 Timer

建立 java.util.TimerTask 任務,在 run 方法中實現業務邏輯。通過 java.util.Timer 進行排程,支援按照固定頻率執行。所有的 TimerTask 是在同一個執行緒中序列執行,相互影響。也就是說,對於同一個 Timer 裡的多個 TimerTask 任務,如果一個 TimerTask 任務在執行中,其它 TimerTask 即使到達執行的時間,也只能排隊等待。如果有異常產生,執行緒將退出,整個定時任務就失敗。

``` import java.util.Timer; import java.util.TimerTask;

public class TestTimerTask {

public static void main(String[] args) {
    TimerTask timerTask = new TimerTask() {
        @Override
        public void run() {
            System.out.println("hell world");
        }
    };
    Timer timer = new Timer();
    timer.schedule(timerTask, 10, 3000);
}

} ```

2

使用 ScheduledExecutorService

基於執行緒池設計的定時任務解決方案,每個排程任務都會分配到執行緒池中的一個執行緒去執行,解決 Timer 定時器無法併發執行的問題,支援 fixedRate 和 fixedDelay。

``` import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;

public class TestTimerTask {

public static void main(String[] args) {
    ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
    //按照固定頻率執行,每隔5秒跑一次
    ses.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello fixedRate");
        }
    }, 0, 5, TimeUnit.SECONDS);


    //按照固定延時執行,上次執行完後隔3秒再跑
    ses.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello fixedDelay");
        }
    }, 0, 3, TimeUnit.SECONDS);
}

} ```

02

Spring 中自帶的解決方案

Cloud Native

Springboot 中提供了一套輕量級的定時任務工具 Spring Task,通過註解可以很方便的配置,支援 cron 表示式、fixedRate、fixedDelay。

``` import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;

@Component @EnableScheduling public class MyTask {

/**
 * 每分鐘的第30秒跑一次
 */
@Scheduled(cron = "30 * * * * ?")
public void task1() throws InterruptedException {
    System.out.println("hello cron");
}


/**
 * 每隔5秒跑一次
 */
@Scheduled(fixedRate = 5000)
public void task2() throws InterruptedException {
    System.out.println("hello fixedRate");
}


/**
 * 上次跑完隔3秒再跑
 */
@Scheduled(fixedDelay = 3000)
public void task3() throws InterruptedException {
    System.out.println("hello fixedDelay");
}

} ```

Spring Task 相對於上面提到的兩種解決方案,最大的優勢就是支援 cron 表示式,可以處理按照標準時間固定週期執行的業務,比如每天幾點幾分執行。

03

業務冪等解決方案

Cloud Native

現在的應用基本都是分散式部署,所有機器的程式碼都是一樣的,前面介紹的 Java 和 Spring 自帶的解決方案,都是程序級別的,每臺機器在同一時間點都會執行定時任務。這樣會導致需要業務冪等的定時任務業務有問題,比如每月定時給使用者推送訊息,就會推送多次。

於是,很多應用很自然的就想到了使用分散式鎖的解決方案。即每次定時任務執行之前,先去搶鎖,搶到鎖的執行任務,搶不到鎖的不執行。怎麼搶鎖,又是五花八門,比如使用 DB、zookeeper、redis。

1

使用 DB 或者 Zookeeper 搶鎖

使用 DB 或者 Zookeeper 搶鎖的架構差不多,原理如下:

  1. 定時時間到了,在回撥方法裡,先去搶鎖。
  2. 搶到鎖,則繼續執行方法,沒搶到鎖直接返回。
  3. 執行完方法後,釋放鎖。

示例程式碼如下:

``` import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;

@Component @EnableScheduling public class MyTask { / * 每分鐘的第30秒跑一次 / @Scheduled(cron = "30 * * * ?") public void task1() throws Exception { String lockName = "task1"; if (tryLock(lockName)) { System.out.println("hello cron"); releaseLock(lockName); } else { return; } }

private boolean tryLock(String lockName) {
    //TODO
    return true;
}


private void releaseLock(String lockName) {
    //TODO
}

} ```

當前的這個設計,仔細一點的同學可以發現,其實還是有可能導致任務重複執行的。比如任務執行的非常快,A 這臺機器搶到鎖,執行完任務後很快就釋放鎖了。B 這臺機器後搶鎖,還是會搶到鎖,再執行一遍任務。

2

使用 redis 搶鎖

使用 redis 搶鎖,其實架構上和 DB/zookeeper 差不多,不過 redis 搶鎖支援過期時間,不用主動去釋放鎖,並且可以充分利用這個過期時間,解決任務執行過快釋放鎖導致任務重複執行的問題,架構如下:

示例程式碼如下:

``` @Component @EnableScheduling public class MyTask { / * 每分鐘的第30秒跑一次 / @Scheduled(cron = "30 * * * ?") public void task1() throws InterruptedException { String lockName = "task1"; if (tryLock(lockName, 30)) { System.out.println("hello cron"); releaseLock(lockName); } else { return; } }

private boolean tryLock(String lockName, long expiredTime) {
    //TODO
    return true;
}


private void releaseLock(String lockName) {
    //TODO
}

} ```

看到這裡,可能又會有同學有問題,加一個過期時間是不是還是不夠嚴謹,還是有可能任務重複執行?

——的確是的,如果有一臺機器突然長時間的 fullgc,或者之前的任務還沒處理完(Spring Task 和 ScheduledExecutorService 本質還是通過執行緒池處理任務),還是有可能隔了 30 秒再去排程任務的。

3

使用 Quartz

Quartz  [  1]   是一套輕量級的任務排程框架,只需要定義了 Job(任務),Trigger(觸發器)和 Scheduler(排程器),即可實現一個定時排程能力。支援基於資料庫的叢集模式,可以做到任務冪等執行。

Quartz 支援任務冪等執行,其實理論上還是搶 DB 鎖,我們看下 quartz 的表結構:

其中,QRTZ_LOCKS 就是 Quartz 叢集實現同步機制的行鎖表,其表結構如下:

`` --QRTZ_LOCKS表結構 CREATE TABLEQRTZ_LOCKS(LOCK_NAMEvarchar(40) NOT NULL, PRIMARY KEY (LOCK_NAME`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--QRTZ_LOCKS記錄 +-----------------+ | LOCK_NAME | +-----------------+ | CALENDAR_ACCESS | | JOB_ACCESS | | MISFIRE_ACCESS | | STATE_ACCESS | | TRIGGER_ACCESS | +-----------------+ ```

可以看出 QRTZ_LOCKS 中有 5 條記錄,代表 5 把鎖,分別用於實現多個 Quartz Node 對 Job、Trigger、Calendar 訪問的同步控制。

04

開源任務排程中介軟體

Cloud Native

上面提到的解決方案,在架構上都有一個問題,那就是每次排程都需要搶鎖,特別是使用 DB 和 Zookeeper 搶鎖,效能會比較差,一旦任務量增加到一定的量,就會有比較明顯的排程延時。還有一個痛點,就是業務想要修改排程配置,或者增加一個任務,得修改程式碼重新發布應用。

於是開源社群湧現了一堆任務排程中介軟體,通過任務排程系統進行任務的建立、修改和排程,這其中國內最火的就是 XXL-JOB 和 ElasticJob。

1

ElasticJob

ElasticJob  [  2]   是一款基於 Quartz 開發,依賴 Zookeeper 作為註冊中心、輕量級、無中心化的分散式任務排程框架,目前已經通過 Apache 開源。

ElasticJob 相對於 Quartz 來說,從功能上最大的區別就是支援分片,可以將一個任務分片引數分發給不同的機器執行。架構上最大的區別就是使用 Zookeeper 作為註冊中心,不同的任務分配給不同的節點排程,不需要搶鎖觸發,效能上比 Quartz 上強大很多,架構圖如下:

開發上也比較簡單,和 springboot 結合比較好,可以在配置檔案定義任務如下:

elasticjob: regCenter: serverLists: localhost:2181 namespace: elasticjob-lite-springboot jobs: simpleJob: elasticJobClass: org.apache.shardingsphere.elasticjob.lite.example.job.SpringBootSimpleJob cron: 0/5 * * * * ? timeZone: GMT+08:00 shardingTotalCount: 3 shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou scriptJob: elasticJobType: SCRIPT cron: 0/10 * * * * ? shardingTotalCount: 3 props: script.command.line: "echo SCRIPT Job: " manualScriptJob: elasticJobType: SCRIPT jobBootstrapBeanName: manualScriptJobBean shardingTotalCount: 9 props: script.command.line: "echo Manual SCRIPT Job: "

實現任務介面如下:

``` @Component public class SpringBootShardingJob implements SimpleJob {

@Override
public void execute(ShardingContext context) {
    System.out.println("分片總數="+context.getShardingTotalCount() + ", 分片號="+context.getShardingItem()
        + ", 分片引數="+context.getShardingParameter());
}

} ```

執行結果如下:

分片總數=3, 分片號=0, 分片引數=Beijing 分片總數=3, 分片號=1, 分片引數=Shanghai 分片總數=3, 分片號=2, 分片引數=Guangzhou

同時,ElasticJob 還提供了一個簡單的 UI,可以檢視任務的列表,同時支援修改、觸發、停止、生效、失效操作。

遺憾的是,ElasticJob 暫不支援動態建立任務。

2

XXL-JOB

XXL-JOB  [  3]  是一個開箱即用的輕量級分散式任務排程系統,其核心設計目標是開發迅速、學習簡單、輕量級、易擴充套件,在開源社群廣泛流行。

XXL-JOB 是 Master-Slave 架構,Master 負責任務的排程,Slave 負責任務的執行,架構圖如下:

XXL-JOB 接入也很方便,不同於 ElasticJob 定義任務實現類,是通過@XxlJob 註解定義 JobHandler。

``` @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

/**
 * 1、簡單任務示例(Bean模式)
 */
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
    XxlJobLogger.log("XXL-JOB, Hello World.");


    for (int i = 0; i < 5; i++) {
        XxlJobLogger.log("beat at:" + i);
        TimeUnit.SECONDS.sleep(2);
    }
    return ReturnT.SUCCESS;
}




/**
 * 2、分片廣播任務
 */
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {


    // 分片引數
    ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
    XxlJobLogger.log("分片引數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());


    // 業務邏輯
    for (int i = 0; i < shardingVO.getTotal(); i++) {
        if (i == shardingVO.getIndex()) {
            XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
        } else {
            XxlJobLogger.log("第 {} 片, 忽略", i);
        }
    }


    return ReturnT.SUCCESS;
}

} ```

XXL-JOB 相較於 ElasticJob,最大的特點就是功能比較豐富,可運維能力比較強,不但支援控制檯動態建立任務,還有排程日誌、執行報表等功能。

XXL-JOB 的歷史記錄、執行報表和排程日誌,都是基於資料庫實現的:

由此可以看出,XXL-JOB 所有功能都依賴資料庫,且排程中心不支援分散式架構,在任務量和排程量比較大的情況下,會有效能瓶頸。不過如果對任務量級、高可用、監控報警、視覺化等沒有過高要求的話,XXL-JOB 基本可以滿足定時任務的需求。

05

企業級解決方案

Cloud Native

開源軟體只能提供基礎的排程能力,在監管控上的能力一般都比較弱。比如日誌服務,業界往往使用 ELK 解決方案;簡訊報警,需要有簡訊平臺;監控大盤,現在主流的解決方案是 Prometheus;等等。企業想要有這些能力,不但需要額外的開發成本,還需要昂貴的資源成本。

另外使用開源軟體也伴隨著穩定性的風險,就是出了問題沒人能處理,想要反饋到社群等社群處理,這個鏈路太長了,早就產生故障了。

阿里雲任務排程 SchedulerX  [  4]  是阿里巴巴自研的基於 Akka 架構的一站式任務排程平臺,相容開源 XXL-JOB、ElasticJob、Quartz(規劃中),支援 Cron 定時、一次性任務、任務編排、分散式跑批,具有高可用、視覺化、可運維、低延時等能力,自帶企業級監控大盤、日誌服務、簡訊報警等服務。

1

優勢

安全防護

  • 多層次安全防護:支援 HTTPS 和 VPC 訪問,同時還有阿里雲的多層安全防護,防止惡意攻擊。
  • 多租戶隔離機制:支援多地域、名稱空間和應用級別的隔離。
  • 許可權管控:支援控制檯讀寫的許可權管理,客戶端接入的鑑權。

企業級高可用

SchedulerX2.0 採用高可用架構,任務多備份機制,經歷阿里集團多年雙十一、容災演練,可以做到任意一個機房掛了,任務排程都不會收到影響。

商業級報警運維

  • 報警:支援郵件、釘釘、簡訊、電話,(其他報警方式在規劃中)。支援任務失敗、超時、無可用機器報警。報警內容可以直接看出任務失敗的原因,以釘釘機器人為例。

  • 運維操作:原地重跑、重刷資料、標記成功、檢視堆疊、停止任務、指定機器等。

豐富的視覺化

schedulerx 擁有豐富的視覺化能力,比如:

  • 使用者大盤

  • 檢視任務歷史執行記錄

  • 檢視任務執行日誌

  • 檢視任務執行堆疊

  • 檢視任務操作記錄

相容開源

Schedulerx 相容開源 XXL-JOB、ElasticJob、Quartz(規劃中),業務不需要改一行程式碼,即可以將任務託管在 SchedulerX 排程平臺,享有企業級視覺化和報警的能力。

Spring 原生

SchedulerX 支援通過控制檯和 API 動態建立任務,也支援 Spring 宣告式任務定義,一份任務配置可以拿到任何環境一鍵啟動,配置如下:

spring: schedulerx2: endpoint: acm.aliyun.com #請填寫不同regin的endpoint namespace: 433d8b23-06e9-xxxx-xxxx-90d4d1b9a4af #region內全域性唯一,建議使用UUID生成 namespaceName: 學仁測試 appName: myTest groupId: myTest.group #同一個名稱空間下需要唯一 appKey: [email protected] #應用的key,不要太簡單,注意保管好 regionId: public #填寫對應的regionId aliyunAccessKey: xxxxxxx #阿里雲賬號的ak aliyunSecretKey: xxxxxxx #阿里雲賬號的sk alarmChannel: sms,ding #報警通道:簡訊和釘釘 jobs: simpleJob: jobModel: standalone className: com.aliyun.schedulerx.example.processor.SimpleJob cron: 0/30 * * * * ? # cron表示式 jobParameter: hello overwrite: true shardingJob: jobModel: sharding className: ccom.aliyun.schedulerx.example.processor.ShardingJob oneTime: 2022-06-02 12:00:00 # 一次性任務表示式 jobParameter: 0=Beijing,1=Shanghai,2=Guangzhou overwrite: true broadcastJob: # 不填寫cron和oneTime,表示api任務 jobModel: broadcast className: com.aliyun.schedulerx.example.processor.BroadcastJob jobParameter: hello overwrite: true mapReduceJob: jobModel: mapreduce className: com.aliyun.schedulerx.example.processor.MapReduceJob cron: 0 * * * * ? jobParameter: 100 overwrite: true alarmUsers: #報警聯絡人 user1: userName: 張三 userPhone: 12345678900 user2: userName: 李四 ding: https://oapi.dingtalk.com/robot/send?access_token=xxxxx

分散式跑批

SchedulerX 提供了豐富的分散式模型,可以處理各種各樣的分散式業務場景。包括單機、廣播、分片、 MapReduce  [  5]  等,架構如下:

SchedulerX 的 MapReduce 模型,簡單幾行程式碼,就可以將海量任務分散式到多臺機器跑批,相對於大資料跑批來說,具有速度快、資料安全、成本低、簡單易學等特點。

任務編排

SchedulerX 通過工作流進行任務編排,並且提供了一個視覺化的介面,操作簡單,拖拖拽拽即可配置一個工作流。詳細的任務狀態圖能一目瞭然看到下游任務為什麼沒跑,方便定位問題。

可搶佔的任務優先順序佇列

常見場景是夜間離線報表業務,比如很多報表任務是晚上 1、2 點開始跑,要控制應用最大併發的任務數量(否則業務扛不住),達到併發上限的任務會在佇列中等待。同時要求早上 9 點前必須把 KPI 報表跑出來,可以設定 KPI 任務高優先順序,會搶佔低優先順序任務優先排程。

SchedulerX 支援可搶佔的任務優先順序佇列,可以在控制檯動態配置:

2

Q&A

  1. Kubernetes 應用可以接入 SchedulerX 嗎?

——可以的,無論是物理機、容器、還是 Kubernetes pod,都可以接入 SchedulerX。

  1. 我的應用不在阿里雲上,可否使用 SchedulerX?

——可以的,任何雲平臺或者本地機器,只要能訪問公網,都可以接入 SchedulerX。