如何實現一個任務排程系統

語言: CN / TW / HK

閱讀一篇「定時任務框架選型」的文章時,一位網友的留言到了我:

我看過那麼多所謂的教程,大部分都是教“如何使用工具”的,沒有多少是教“如何製作工具”的,能教“如何仿製工具”的都已經是鳳毛麟角,中國 軟體行業,缺的是真正可以“製作工具”的程式設計師,而絕對不缺那些“使用工具”的程式設計師! ...... ”這個業界最不需要的就是“會使用XX工具的工程師”,而是“有創造力的軟體工程師”!業界所有的飯碗,本質就是“有創造力的軟體工程師”提供出來的啊!

寫這篇文章,想和大家從頭到腳說說任務排程,希望大家讀完之後,能夠理解實現一個任務排程系統的核心邏輯。

1 Quartz

Quartz是一款Java開源任務排程框架,也是很多Java工程師接觸任務排程的起點。

下圖顯示了任務排程的整體流程:

Quartz的核心是三個元件。

  • 任務:Job 用於表示被排程的任務;
  • 觸發器:Trigger 定義排程時間的元素,即按照什麼時間規則去執行任務。一個Job可以被多個Trigger關聯,但是一個Trigger 只能關聯一個Job;
  • 排程器 :工廠類建立Scheduler,根據觸發器定義的時間規則排程任務。

上圖程式碼中Quartz 的JobStore是 RAMJobStore,Trigger 和 Job 儲存在記憶體中。

執行任務排程的核心類是 QuartzSchedulerThread

  1. 排程執行緒從JobStore中獲取需要執行的的觸發器列表,並修改觸發器的狀態;
  2. <font color="red">Fire</font>觸發器,修改觸發器資訊(下次執行觸發器的時間,以及觸發器狀態),並存儲起來。
  3. 最後建立具體的執行任務物件,通過worker執行緒池執行任務。

接下來再聊聊 Quartz 的叢集部署方案。

Quartz的叢集部署方案,需要針對不同的資料庫型別(MySQL , ORACLE) 在資料庫例項上建立Quartz表,JobStore是: JobStoreSupport

這種方案是分散式的,沒有負責集中管理的節點,而是利用資料庫行級鎖的方式來實現叢集環境下的併發控制。

scheduler例項在叢集模式下首先獲取{0}LOCKS表中的行鎖,Mysql 獲取行鎖的語句:

{0}會替換為配置檔案預設配置的QRTZ_。sched_name為應用叢集的例項名,lock_name就是行級鎖名。Quartz主要有兩個行級鎖觸發器訪問鎖 (TRIGGER_ACCESS) 和 狀態訪問鎖(STATE_ACCESS)。

這個架構解決了任務的分散式排程問題,同一個任務只能有一個節點執行,其他節點將不執行任務,當碰到大量短任務時,各個節點頻繁的競爭資料庫鎖,節點越多效能就會越差。

2 分散式鎖模式

Quartz的叢集模式可以水平擴充套件,也可以分散式排程,但需要業務方在資料庫中新增對應的表,有一定的強侵入性。

有不少研發同學為了避免這種侵入性,也探索出分散式鎖模式

業務場景:電商專案,使用者下單後一段時間沒有付款,系統就會在超時後關閉該訂單。

通常我們會做一個定時任務每兩分鐘來檢查前半小時的訂單,將沒有付款的訂單列表查詢出來,然後對訂單中的商品進行庫存的恢復,然後將該訂單設定為無效。

我們使用Spring Schedule的方式做一個定時任務。

@Scheduled(cron = "0 */2 * * * ? ")
public void doTask() {
   log.info("定時任務啟動");
   //執行關閉訂單的操作
   orderService.closeExpireUnpayOrders();
   log.info("定時任務結束");
 }

在單伺服器執行正常,考慮到高可用,業務量激增,架構會演進成叢集模式,在同一時刻有多個服務執行一個定時任務,有可能會導致業務紊亂。

解決方案是在任務執行的時候,使用Redis 分散式鎖來解決這類問題。

@Scheduled(cron = "0 */2 * * * ? ")
public void doTask() {
    log.info("定時任務啟動");
    String lockName = "closeExpireUnpayOrdersLock";
    RedisLock redisLock = redisClient.getLock(lockName);
    //嘗試加鎖,最多等待3秒,上鎖以後5分鐘自動解鎖
    boolean locked = redisLock.tryLock(3, 300, TimeUnit.SECONDS);
    if(!locked){
        log.info("沒有獲得分散式鎖:{}" , lockName);
        return;
    }
    try{
       //執行關閉訂單的操作
       orderService.closeExpireUnpayOrders();
    } finally {
       redisLock.unlock();
    }
    log.info("定時任務結束");
}

Redis的讀寫效能極好,分散式鎖也比Quartz資料庫行級鎖更輕量級。當然Redis鎖也可以替換成Zookeeper鎖,也是同樣的機制。

在小型專案中,使用:定時任務框架(Quartz/Spring Schedule)和 分散式鎖(redis/zookeeper)有不錯的效果。

但是呢?我們可以發現這種組合有兩個問題:

  1. 定時任務在分散式場景下有空跑的情況,而且任務也無法做到分片;
  2. 要想手工觸發任務,必須新增額外的程式碼才能完成。

3 ElasticJob-Lite 框架

ElasticJob-Lite 定位為輕量級無中心化解決方案,使用 jar 的形式提供分散式任務的協調服務。 官網架構圖

應用內部定義任務類,實現SimpleJob介面,編寫自己任務的實際業務流程即可。

public class MyElasticJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0:
                // do something by sharding item 0
                break;
            case 1:
                // do something by sharding item 1
                break;
            case 2:
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

舉例:應用A有五個任務需要執行,分別是A,B,C,D,E。任務E需要分成四個子任務,應用部署在兩臺機器上。

應用A在啟動後, 5個任務通過 Zookeeper 協調後被分配到兩臺機器上,通過Quartz Scheduler 分開執行不同的任務。

ElasticJob 從本質上來講 ,底層任務排程還是通過 Quartz ,相比Redis分散式鎖 或者 Quartz 分散式部署 ,它的優勢在於可以依賴 Zookeeper 這個大殺器 ,將任務通過負載均衡演算法分配給應用內的 Quartz Scheduler容器。

從使用者的角度來講,是非常簡單易用的。但從架構來看,排程器和執行器依然在同一個應用方JVM內,而且容器在啟動後,依然需要做負載均衡。應用假如頻繁的重啟,不斷的去選主,對分片做負載均衡,這些都是相對比較的操作。

另外,ElasticJob 的控制檯是比較粗糙的,通過讀取註冊中心資料展現作業狀態,更新註冊中心資料修改全域性任務配置。

4 中心化流派

中心化的原理是:把排程和任務執行,隔離成兩個部分:排程中心和執行器。排程中心模組只需要負責任務排程屬性,觸發排程命令。執行器接收排程命令,去執行具體的業務邏輯,而且兩者都可以進行分散式擴容。

4.1 MQ模式

先談談我在藝龍促銷團隊接觸的第一種中心化架構。

排程中心依賴Quartz叢集模式,當任務排程時候,傳送訊息到RabbitMQ 。業務應用收到任務訊息後,消費任務資訊。

這種模型充分利用了MQ解耦的特性,排程中心傳送任務,應用方作為執行器的角色,接收任務並執行。

但這種設計強依賴訊息佇列,可擴充套件性和功能,系統負載都和訊息佇列有極大的關聯。這種架構設計需要架構師對訊息佇列非常熟悉。

4.2 XXL-JOB

XXL-JOB 是一個分散式任務排程平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴充套件。現已開放原始碼並接入多家公司線上產品線,開箱即用。

xxl-job 2.3.0架構圖

我們重點剖析下架構圖 :

▍ 網路通訊 server-worker 模型

排程中心和執行器 兩個模組之間通訊是 server-worker 模式。排程中心本身就是一個SpringBoot 工程,啟動會監聽8080埠。

執行器啟動後,會啟動內建服務( EmbedServer )監聽9994埠。這樣雙方都可以給對方傳送命令。

那排程中心如何知道執行器的地址資訊呢 ?上圖中,執行器會定時傳送註冊命令 ,這樣排程中心就可以獲取線上的執行器列表。

通過執行器列表,就可以根據任務配置的路由策略選擇節點執行任務。常見的路由策略有如下三種:

  • 隨機節點執行:選擇叢集中一個可用的執行節點執行排程任務。適用場景:離線訂單結算。

  • 廣播執行:在叢集中所有的執行節點分發排程任務並執行。適用場景:批量更新應用本地快取。

  • 分片執行:按照使用者自定義分片邏輯進行拆分,分發到叢集中不同節點並行執行,提升資源利用效率。適用場景:海量日誌統計。

▍ 排程器

排程器是任務排程系統裡面非常核心的元件。XXL-JOB 的早期版本是依賴Quartz。

但在v2.1.0版本中完全去掉了Quartz的依賴,原來需要建立的 Quartz表也替換成了自研的表。

核心的排程類是:JobTriggerPoolHelper 。呼叫start方法後,會啟動兩個執行緒:scheduleThread 和 ringThread 。

首先 scheduleThread 會定時從資料庫載入需要排程的任務,這裡從本質上還是基於資料庫行鎖保證同時只有一個排程中心節點觸發任務排程。

Connection conn = XxlJobAdminConfig.getAdminConfig()
                  .getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement(
"select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
# 觸發任務排程 (虛擬碼)
for (XxlJobInfo jobInfo: scheduleList) {
  // 省略程式碼
}
# 事務提交
conn.commit();

排程執行緒會根據任務的「下次觸發時間」,採取不同的動作:

已過期的任務需要立刻執行的,直接放入執行緒池中觸發執行 ,五秒內需要執行的任務放到 ringData 物件裡。

ringThread 啟動後,定時從 ringData 物件裡獲取需要執行的任務列表 ,放入到執行緒池中觸發執行。

5 自研在巨人的肩膀上

2018年,我有一段自研任務排程系統的經歷。

背景是:相容技術團隊自研的RPC框架,技術團隊不需要修改程式碼,RPC註解方法可以託管在任務排程系統中,直接當做一個任務來執行。

自研過程中,研讀了XXL-JOB 原始碼,同時從阿里雲分散式任務排程 SchedulerX 吸取了很多營養。

SchedulerX 1.0 架構圖

  • Scheduler-console 是任務排程的控制檯,用於建立、管理定時任務。負責資料的建立、修改和查詢。在產品內部與 scheduler server 互動。
  • Scheduler-server 是任務排程的服務端,是 Scheduler的核心元件。負責客戶端任務的排程觸發以及任務執行狀態的監測。
  • Scheduler-client 是任務排程的客戶端。每個接入客戶端的應用程序就是一個的 Worker。 Worker 負責與 scheduler-server 建立通訊,讓 scheduler-server發現客戶端的機器。 並向scheduler-server註冊當前應用所在的分組,這樣 scheduler-server 才能向客戶端定時觸發任務。

我們模仿了SchedulerX的模組,架構設計如下圖:

我選擇了 RocketMQ 原始碼的通訊模組 remoting 作為自研排程系統的通訊框架。基於如下兩點:

  1. 我對業界大名鼎鼎的 Dubbo不熟悉,而remoting我已經做了多個輪子,我相信自己可以搞定;

  2. 在閱讀 SchedulerX 1.0 client 原始碼中,發現 SchedulerX 的通訊框架和RocketMQ Remoting很多地方都很類似。它的原始碼裡有現成的工程實現,完全就是一個寶藏。

我將 RocketMQ remoting 模組去掉名字服務程式碼,做了一定程度的定製。

在RocketMQ的remoting裡,服務端採用 Processor 模式。

排程中心需要註冊兩個處理器:回撥結果處理器CallBackProcessor和心跳處理器HeartBeatProcessor 。執行器需要註冊觸發任務處理器TriggerTaskProcessor 。

public void registerProcessor(
             int requestCode,
             NettyRequestProcessor processor,
             ExecutorService executor);

處理器的介面:

public interface NettyRequestProcessor {
 RemotingCommand processRequest(
                 ChannelHandlerContext ctx,
                 RemotingCommand request) throws Exception;
 boolean rejectRequest();
}

對於通訊框架來講,我並不需要關注通訊細節,只需要實現處理器介面即可。

以觸發任務處理器TriggerTaskProcessor舉例:

搞定網路通訊後,排程器如何設計 ?最終我還是選擇了Quartz 叢集模式。主要是基於以下幾點原因:

  1. 排程量不大的情況下 ,Quartz 叢集模式足夠穩定,而且可以相容原來的XXL-JOB任務;
  2. 使用時間輪的話,本身沒有足夠的實踐經驗,擔心出問題。 另外,如何讓任務通過不同的排程服務(schedule-server)觸發, 需要有一個協調器。於是想到Zookeeper。但這樣的話,又引入了新的元件。
  3. 研發週期不能太長,想快點出成果。

自研版的排程服務花費一個半月上線了。系統執行非常穩定,研發團隊接入也很順暢。 排程量也不大 ,四個月總共接近4000萬到5000萬之間的排程量。

坦率的講,自研版的瓶頸,我的腦海裡經常能看到。 資料量大,我可以搞定分庫分表,但 Quartz 叢集基於行級鎖的模式 ,註定上限不會太高。

為了解除心中的困惑,我寫一個輪子DEMO看看可否work:

  1. 去掉外接的註冊中心,排程服務(schedule-server)管理會話;
  2. 引入zookeeper,通過zk協調排程服務。但是HA機制很粗糙,相當於一個任務排程服務執行,另一個服務standby;
  3. Quartz 替換成時間輪 (參考Dubbo裡的時間輪原始碼)。

這個Demo版本在開發環境可以執行,但有很多細節需要優化,僅僅是個玩具,並沒有機會執行到生產環境。

最近讀阿里雲的一篇文章《如何通過任務排程實現百萬規則報警》,SchedulerX2.0 高可用架構見下圖:

文章提到:

每個應用都會做三備份,通過 zk 搶鎖,一主兩備,如果某臺 Server 掛了,會進行 failover,由其他 Server 接管排程任務。

這次自研任務排程系統從架構來講,並不複雜,實現了XXL-JOB的核心功能,也相容了技術團隊的RPC框架,但並沒有實現工作流以及mapreduce分片。

SchedulerX 在升級到2.0之後基於全新的Akka 架構,這種架構號稱實現高效能工作流引擎,實現程序間通訊,減少網路通訊程式碼。

在我調研的開源任務排程系統中,PowerJob也是基於Akka 架構,同時也實現了工作流和MapReduce執行模式。

我對PowerJob非常感興趣,也會在學習實踐後輸出相關文章,敬請期待。

6 技術選型

首先我們將任務排程開源產品和商業產品 SchedulerX 放在一起,生成一張對照表:

Quartz 和 ElasticJob從本質上還是屬於框架的層面。

中心化產品從架構上來講更加清晰,排程層面更靈活,可以支援更復雜的排程(mapreduce動態分片,工作流)。

XXL-JOB 從產品層面已經做到極簡,開箱即用,排程模式可以滿足大部分研發團隊的需求。簡單易用 + 能打,所以非常受大家歡迎。

其實每個技術團隊的技術儲備不盡相同,面對的場景也不一樣,所以技術選型並不能一概而論。

不管是使用哪種技術,在編寫任務業務程式碼時,還是需要注意兩點:

  • 冪等。當任務被重複執行的時候,或者分散式鎖失效的時候,程式依然可以輸出正確的結果;
  • 任務不跑了,千萬別驚慌。檢視排程日誌,JVM層面使用Jstack命令檢視堆疊,網路通訊要新增超時時間 ,一般能解決大部分問題。

7 寫到最後

2015年其實是非常有趣的一年。ElasticJob 和 XXL-JOB 這兩種不同流派的任務排程專案都開源了。

在 XXL-JOB 原始碼裡,至今還保留著許雪裡老師在開源中國的一條動態截圖:

剛寫的任務排程框架 ,Web動態管理任務,實時生效,熱乎的。沒有意外的話,明天中午推送到git.osc上去。哈哈,下樓炒個面加個荷包蛋慶祝下。

看到這個截圖,內心深處竟然會有一種共情,嘴角不自禁的上揚。

我又想起:2016年,ElasticJob的作者張亮老師開源了sharding-jdbc 。我在github上建立了一個私有專案,參考sharding-jdbc的原始碼,自己實現分庫分表的功能。第一個類名叫:ShardingDataSource,時間定格在 2016/3/29。

我不知道如何定義“有創造力的軟體工程師”,但我相信:一個有好奇心,努力學習,樂於分享,願意去幫助別人的工程師,運氣肯定不會太差。


覺得對您有幫助的話,請給作者一個「點贊」和「收藏」,我們下期見。