如何做好分散式任務排程——Scheduler 的一些探索

語言: CN / TW / HK

作者:張宇軒,章逸,曾丹

初識 Scheduler

找準定位:分散式任務排程平臺

無論是網際網路應用或者企業級應用,都充斥著大量的任務。我們常常需要一些任務排程系統幫助我們解決問題。隨著微服務化架構的逐步演進,單體架構逐漸演變為分散式、微服務架構。在此的背景下,很多原先的單點式任務排程平臺已經不能滿足業務系統的需求。於是出現了一些基於分散式的任務排程平臺。

Scheduler 是飛書內的分散式任務排程平臺。分散式任務排程能力主要包括: - 分散式:平臺是分散式部署的,各個節點之間可以無狀態和無限的水平擴充套件(保證可擴充套件); - 任務排程:涉及到任務狀態管理、任務排程請求的傳送與接收、具體任務的分配、任務的具體執行;(叢集中哪些機器什麼時候執行什麼任務,所以又需要一個可以感知整個叢集執行狀態的配置中心); - 配置中心:可以感知整個叢集的狀態、任務資訊的註冊。

摸清脈絡:Scheduler 的結構和核心模組

名詞解釋:

  • Processor: 程式設計處理器, 擁有一定的程式設計規範, 使用者自定義實現。

  • Executor: 一個 SDK,執行 Processor 的進行容器,與 Scheduler 通訊的載體。

  • Job:使用者建立的任務,其中包含任務的排程規則、排程模型、執行器名稱等資訊。

  • Instance:執行態的Job,每當Job觸發後會生成一個Instance,記錄本次執行的排程資訊。

  • Task:最小執行單元,不同調度模型的任務產生的Task數量不同。

1.png

通過架構圖可以發現,Scheduler 主要有以下三個部分:

  1. 排程器 (Scheduler):任務排程中心,負責管理任務的生命週期。接受任務註冊,準時準確找出待觸發的任務,進行任務拆分下發。找出與之關聯的執行器並下發對應任務

  2. 執行器 (Executor):接收排程任務,並將自身狀態上報給排程器

  3. 控制檯 (Web 前端):負責配置執行器的資訊以及排程任務的配置、任務狀態、資訊展示

因此,我們可以用一句話解釋清楚 Scheduler 所做的事情,即: 在「指定時間」「通知執行器」以「指定方式」執行任務 這句話中包含了三個關鍵點,也分別代表著 Scheduler 的三個核心模組:

  1. 指定時間:任務的觸發規則,如:每天早上8點、每週二、每月15號等。觸發器模組(Launcher Cron)負責任務觸發。
  1. 指定方式:任務的執行形式,如:單播任務-指定一個機器執行;廣播任務-指定所有機器執行;分片任務-任務分階段分批的執行。分派器模組(Assignment Cron)負責任務的執行方式。
  1. 通知執行器:將任務傳送到指定執行器,執行任務。派遣器模組(Dispatcher Cron)負責任務的傳送,採用流式通訊,排程器以推送的方式將任務傳送給執行器。

在一個 Job 的排程週期中,各個模組各司其職,整個流程如下:

2.png

擁有這三個核心模組後,Scheduler 已具備了成熟的任務排程功能。另外,為了增加 Scheduler 的穩定性,有額外兩個模組為其保駕護航:

  1. 健康管理模組 (Service Health Cron): 負責管理 Job 的生命週期,檢測未正常派發執行的 Job、Instance 和 Task,並將結果上報給運維人員。
  1. 任務進度重新整理模組 (Task Cron): 非同步更新 Task 狀態,流量較高時進行削峰,保證依賴的 mysql 及 redis 不因為流量過高而出現問題。

本篇文章不對 Scheduler 所支援的定時任務能力作贅述,而是從三個方面(易用性多功能性穩定性)介紹 Scheduler 對於分散式任務排程的思考和探索:

  1. 「易用性」: 決定了使用者是否選擇使用該框架的意願,一個好的框架必須是易用且快速接入的;
  1. 「多功能性」: 接入方需求多種多樣,要站在使用者角度想問題,不能閉門造車;
  1. 「穩定性」: 對於分散式任務排程平臺來說,不僅僅侷限於自身的穩定性,接入方的穩定性也十分重要。

換位思考-快速接入

背景:效率至上,時間是金

以位元組跳動內部為例,當前團隊想要實現一個定時任務有多種方式:接入位元組雲的 cronjob 平臺、自己實現一套定時任務框架或者接入第三方定時任務框架。

對於第一種接入 cronjob 平臺,每一種定時任務都需要註冊各自的 psm 和執行時環境(映象),當任務需要訪問依賴資源如 redis/db 等時,需要各自新增授權。任務程式碼邏輯有變化時也需要各自升級,導致開發、管理起來較為複雜。

對於第二種自己實現一套定時任務框架,不僅整體開發時間較長,且需要大量時間進行測試迴歸來保證框架的穩定性。如果專案內使用到的定時任務較多,那麼自身研發一套框架用途也較廣泛;若專案中使用到的定時任務較少,則 ROI 較低,很多時候也只是為了造輪子而造輪子。

因此,大多數專案面對增加定時任務的需求時,都會尋求直接接入第三方成熟的定時任務框架。對於他們來說,是否易於接入、與現有程式碼聯絡是否緊密、除錯是否方便是很重要的選取指標。

基於這種背景,Scheduler 在設計時就站在了接入方的角度,思考了如何讓接入方能夠在最短時間內以最低成本接入 Scheduler,實現自己的定時任務。

分析:站在使用者角度想問題

站在接入方角度,對定時任務框架進行選型時最關注的幾個點無非是定時任務執行準確性、最高支援 qps、定時設定多樣性、接入成本這幾個。對於前兩個指標,Scheduler 目前接入業務方 50+,日均排程任務 20w+ 次,與公司內其他第三方定時任務框架相比也較有競爭性,同時對於後兩個關注點,Scheduler 也有自己的風格。

豐富的排程設定

一般的定時任務框架只支援 crontab 表示式,例如 0 1 * * * ,代表每天凌晨一點執行一次。cronTab 功能強大,但是若配置複雜的定時策略,有一定學習成本,且可讀性不高。因此,鑑於這種情況,Scheduler 在 crontab 之上設計了更易讀更強大的定時策略,做到所見即所得。

{ "startTime":1648029600000, "timeZone":"Asia/Shanghai", "repeatLevel":"month", "repeatInterval":2, "repeatDays":[3,5,23] }

  • startTime: 開始時間戳,在此之前定時任務不會執行,到達該時間後會執行第一次
  • timeZone: 時區設定,根據當前設定的時區準確派發
  • repeatLevel: 重複級別,目前可以設定按「小時」、「天」、「月」、「周」、「年」以及「工作日」進行重複
  • repeatInterval: 重複間隔,代表每隔 \$repertInterval \$repeatLevel 執行一次
  • repeatDays: 重複天數,重複級別是周或月時生效

因此,該設定所代表的定時間隔為 每兩月的3、5、23號觸發一次 觸發時間(北京時間) | | ------------------- | | 2022-03-23 18:00:00 | | 2022-05-03 18:00:00 | | 2022-05-05 18:00:00 | | 2022-05-23 18:00:00

為什麼要做工作日排程

可能有同學注意到,Scheduler 對於重複級別的支援十分豐富,不僅可以按照普通的年、月、日等級別進行設定,還可以按照工作日進行重複排程(例如每兩個工作日執行一次),這歸因在 Scheduler 孵化於位元組跳動內部企業服務系統,為諸如人事系統、許可權系統等 ToB 服務提供定時任務能力。往往 ToB 客戶的需求複雜多變,因此,需要提前具備更多能力,才能更好地服務好 ToB 客戶。

Scheduler 在調研接入方需求時,得到了有些客戶對於定時提醒這類任務的需求是儘量不要在「非工作日」打擾。於是,Scheduler 決定增加工作日排程選項來適配客戶潛在需求,也側面說明了 Scheduler 為了讓接入方更快更小成本接入做出的努力。

輕鬆的使用方式

相信「開箱即用」對於人們在採買諸如家電、數碼產品時,是十分重要的一個考核指標。而對於對外提供的服務 or 框架,亦是如此。Scheduler 的目標就是讓接入方能夠在短時間熟悉 Scheduler、編寫測試程式碼以及上線定時任務。

專注業務

如果想實現一個定時任務,接入方只需要三步:引入 Scheduler sdk,繫結相應 processor,在 process 介面中實現具體業務邏輯。同時,由於定時任務的實現位於原始碼中,啟動配置無需更改,本地測試也較為便捷。同時,在位元組跳動環境下,無需新增 psm、授權配置等,儘可能做到了「開箱即用」。 ```` import ( "context"

"code.byted.org/apaas/scheduler_sdk/executor" // 引入 sdk )

func main() { executorSvc, err := executor.NewExecutor(executor.NewDefaultExecutorConfig(), &HelloWorld{}) // 繫結 processor if err != nil { panic(err) } if err = executorSvc.Run(); err != nil { panic(err) } }

type HelloWorld struct { ProcessorApiName string }

func (h *HelloWorld) GetApiName() string { return h.ProcessorApiName }

// process 中實現具體業務邏輯 func (h HelloWorld) SimpleProcess(ctx context.Context, tc executor.TaskContext) (err error) { tc.LogInfo(ctx, "hello world") return } ````

快速運維

沒有程式設計師想主動寫出 bug,但問題總是會突然出現。如何在出現問題時快速運維、快速止損,是所有工程師都追求的目標。Scheduler 在這方面做了幾種嘗試。

  1. 報警更直觀

使用者可以在建立 job 時,可以選擇配置報警機器人,並把 Scheduler 機器人拉入對應報警群組。當檢測到對應 job 出現問題時,Scheduler 機器人會把相應報警推送到對應群組,做到實時響應。

  1. 狀態更清晰

目前 task 的相關狀態如下,當一個 task 長期沒有到終態時,根據狀態碼即可知 task 目前處於什麼狀態,從而推斷是哪一步驟出了問題。 狀態碼 | 狀態 | | --- | --------------------------- | | 100 | 等待觸發 | | 101 | Ready 就緒態,等待推送 | | 201 | 推送到 Executor,還未實際執行(任務太多排隊) | | 202 | 執行中 | | 203 | 執行超時,邏輯複雜導致 | | 301 | 執行成功 | | 302 | 執行失敗 | | 401 | Ready 超時,沒有 Executor 拉取 | | 402 | 推送到 Executor 後長期未執行 | | 403 | 執行超時,Executor 宕機導致 |

並且一些 Scheduler 常見的報錯也做了封裝,幫助快速定位問題,例如

錯誤碼 | 錯誤原因 | | -------------- | -------------------------------------- | | k_sc_ec_000004 | 找不到任務{{.jobApiName}} | | k_sc_ec_100004 | 找不到任務例項{{.instanceID}} | | k_sc_ec_300001 | Processor Name 未註冊{{.content}} | | k_sc_ec_300006 | processor({{.content}}) 找不到對應 executor | | k_sc_ec_400002 | 找不到Executor {{.content}} | | k_sc_ec_400005 | 無許可權操作|

並肩作戰-分片任務

背景:任務越多,挑戰越大

一個成熟的專案中避免不了大型批量任務,比如通過 Excel、csv 或其他資料來源批量建立或更新資料,批量任務一般資料量很大,如果按照單例項序列執行,那麼不能充分利用計算機資源且一次執行會消費大量時間,使用者體驗不友好。

以 Kunlun 舉例, 舊階段的批量任務依賴於訊息佇列、Redis 實現,總體分為三大部分:

  1. 解析並校驗 Excel,將資料解析成一條條資料,將每條訊息封裝成一條訊息傳送至訊息佇列。
  1. 消費訊息佇列,進行建立、更新等操作,並在 Redis 記錄總體進度並推送給使用者,如果任務失敗,會將行數和錯誤原因同時記錄進 redis。
  1. 待所有資料處理結束後,如果 redis 中沒有錯誤資料,則提示使用者成功,否則根據錯誤資訊生成 Excel 返回給使用者。

3.png

使用訊息佇列、redis 的定時任務可以提速和優化使用者體驗,但有以下不足:

  1. 維護起來不方便,例如當有新服務需要此類功能時,需要自己再實現一套差不多的框架,所以需要將分片功能託管到第三方服務,而業務方只用專注於具體業務。
  1. 依賴於訊息佇列和 Redis 兩個外部元件,對兩個元件的穩定性要求極高,當其中一個出現問題,都會帶來不小的麻煩。

基於這種背景,Scheduler 豐富了原本的任務排程能力,補充了分片能力,以滿足複雜繁瑣的任務分片處理的需求。

分析:舊問題,新解法

若打算做出一套貼合業務需求的分片任務框架,需要先了解現階段的分片任務的實現步驟。

現階段的分片任務大致可以抽象成3個步驟:

  1. 。獲取資料,可以從上傳的檔案解析資料、從 DB 查詢出大批資料或其他資料來源。
  1. 。處理資料,聚焦於具體業務,如:建立、查詢、更新。
  1. 。處理結果,將此次任務執行結果處理成結果報告返回給使用者,報告可以為 Excel、一條訊息、一封郵件等。

Scheduler 要做的事情則是替換其中分片、訊息佇列、Redis 的功能,做出以下抽象: type ShardingProcessor interface { PreProcess(ctx context.Context, tc *TaskContext) error ShardingProcess(ctx context.Context, tc *TaskContext) error Notify(ctx context.Context, tc *TaskContext)error PostProcess(ctx context.Context, tc *TaskContext) error } 1. PreProcess(總) :資料準備階段。可在此方法中對資料進行額外處理,如計算拓撲關係,定義資料優先順序。單機執行。

  1. ShardingProcess(分) :分片處理階段。實際處理函式,多機執行。分片處理函式,執行批量匯入、更新等處理。(入參:Scheduler 對 PreProcess 返回結果的分片子引數)
  1. Notify(階段式-總) :進度更新處理,每當進度變更的大小大於設定閾值,則生成一次 Notify 的 Task。Executor 向 Scheduler 彙報子任務進度,Scheduler 計算出總體進度,當總進度發生變更後生成 NotifyTask 通知 Executor 進行處理。
  1. PostProcess(總) :結果處理階段。任務執行完畢後可在此函式進行後續處理,單機執行。當所有分片子任務都執行完後,Scheduler 會將子任務的執行結果傳送到此函式處理。(入參:每一個子任務 ShardingProcess 後的結果陣列)

執行器需要實現ShardingProcessor介面以供排程器進行排程。排程過程如下:

4.png

Scheduler 支援分片任務重點在於豐富排程模型,提升排程器排程能力,完善執行器執行能力來達到支援分片任務的目的。

排程側能力

分批排程的能力

排程側需要根據任務進度依次生成 PreTask、ProcessTask、NotifyTask、PostTask 來排程執行器ShardingProcessor中的 PreProcess - Process - Notify - PostProcess 四個方法。單機排程PreProcess,PostProcess,Notify,並行排程 PostProcess,總體排程呈現總-分-總的形式。排程過程如下:

5.png

資料拆分的能力

資料拆分即任務分片,指的是將單一任務按照特定的邏輯切分為多個獨立的子任務,將其分派到不同的節點執行,以提高任務的執行效率。

而 Scheduler 要處理的任務內部可能存在依賴關係(比如 kunlun 業務中 metadata 批量建立的需求,由於存在 lookup 和 reference 欄位等,記錄建立之間存在拓撲關係),所以在執行時需要優先順序的概念,而不能被簡單拆分為獨立的子任務。

為了支援帶優先順序的任務分片,Scheduler 接收的分片任務的資料特點如下:

  • 是業務方自定義任務的二維陣列;
  • 第一個維度是任務執行的優先順序,位於同一優先順序下的任務併發執行,位於不同優先順序的任務按優先順序序列執行;
  • 第二個維度是同一優先順序下的自定義任務列表。關於自定義任務的結構,Scheduler 不感知。業務方可以選擇儲存任務詳情或是主鍵資訊,並自定義處理邏輯,而 Scheduler 只做分片和排程工作。

二維陣列可以是下面這樣: [ [ // 第一優先順序的任務1,可以是主鍵 101, // 第一優先順序的任務2,可以是SQL語句 "Insert into tablename xxx", // 第一優先順序的任務3,可以是結構體等等... { "ID": 999, "Name": "zhangsan" } ], [ // 第二優先順序的任務1、2、... 102, 103, ... ] ] 瞭解了待分片任務的結構,我們來討論如何對任務進行分片。比如,分片的數量由什麼決定,單個分片上的資訊是如何分配的,不同分片又是不同分派到不同的處理器上的...

  1. 分片數的確定

分片數的確定基於以下引數的值:資料量、任務建立時使用者指定的單片最大數量單片最小數量,以及實際可用的執行器數量

  1. 分片演算法

分片特徵值(sharding key)的選擇要遵循的原則應該是基於最常用的訪問方式

由於 Scheduler 分片時並不關心業務資料的結構,所以選用資料陣列的下標來作為分片特徵值。

由於分片數量確定後,不涉及到由於分片的增加或減少對資料進行 Rehash 的情況,所以無需考慮虛擬節點、一致性雜湊等方式進行分片。

這裡選用雜湊分片的分片演算法,原因是既可以均勻分佈資料,實現起來也很簡單。

  1. 分片的儲存和派發

分片完成後,需要給每個分片建立一個 Task,並把分片的資料儲存下來。

關於 Task 的派發,根據上面關於分片數的討論,可以得到分片數和 Executor 數的關係:

  • 在資料量合適的情況下(單片最大數量和單片最小數量設定合理時,是最普遍的情況),分片數和 Executor 的數目是一致的;
  • 當資料量很小時,會出現分片數小於 Executor 數;
  • 當資料量很大時,會出現分片數大於 Executor 數,甚至可能是後者的幾倍。

為了讓各個 Task / 各個分片 能夠均勻派發給各個 Executor,也為了避免某個 executor 掛掉時,其他 Executor 不能均勻分攤掛掉的節點原先承擔的分片,需要採用合理的分片策略。

在分片時,我們保證了各分片的資料是儘量均勻分佈的,所以從分片到 Executor 的分派方式可以儘可能地簡單,採用平均分配的策略即可。對於掛掉的節點所承擔的分片,也採用同樣的策略派發到存活的 Executors 上即可。

例如:

有 3 個 Executor,分成 9 片,則每個 Executor 分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

如果 Executor 1 掛掉,則將 1 的分片平均分配到Executor 2、3: 2=[0,1,3,4,5], 3=[2,6,7,8]

平均分配* :對於不能均分的情況,為了避免靠前的 Executor 總是承擔更多的壓力,可以根據待分配分片數量的奇偶來決定是升序分派還是降序分派。

進度通知的能力

Scheduler 支援通知 Executor 任務執行的整體進度

  1. Executor 上報子任務進度至 Scheduler
  1. Scheduler 計算總任務進度
  1. 總任務進度發生變化,則生成 NotifyTask 傳送至 Executor

6.png

執行側分批執行的能力

執行側需要實現並註冊 SDK 提供的ShardingProcessor介面,來處理由排程側發來的多種型別的Task。

PreProcess

預處理方法,可以進行但不限於以下的操作:

  1. 啟動引數不符合 Scheduler 規定格式,可以通過 PreProcess 方法進行一次轉換。
  1. 資料存在匯入優先順序,可以在 PreProcess 中編寫計算拓撲關係的方法。

如果不需要預處理,可直接在方法內 return,分片時資料使用啟動時的 Data ```` func (s *ShardingProcessor) PreProcess(ctx context.Context, tc taskContext) error{ oldData := tc.GetData() // 使用者業務, 資料處理 newData := Transform(oldData)

// 返回帶拓撲排序的資料
tc.SetResult(newData)
return nil

} ```` ShardingProcess

分片處理函式,主要是進行資料更新、建立操作。ShardingProcess 的入參是切分後的陣列([]interface{})。Executor 需要對引數進行兩部分額外處理:

  1. []interface{}中的interface{}斷言成具體struct{}
  1. 處理後。計算當前子任務執行進度,進行上報;如果省略上報,服務端以分片粒度生成 NotifyTask通知執行器。 ```` func (s *ShardingProcessor) ShardingProcess(ctx context.Context, tc taskContext) error{ taskData := tc.GetData()

for _, data := range taskData{ // 使用者業務, 資料處理 }

tc.SetResult(...) return } ```` PostProcess

接受所有分片處理結果,進行後續處理,如生成錯誤檔案。 ```` func (s *ShardingProcessor) PostProcess(ctx context.Context, tc taskContext) error{ taskData := tc.GetData() for _, result := range taskData{ // do something }

// 使用者業務
tc.SetResult(...)

} ```` Notify

提供給子任務上報的能力,Scheduler 會根據所有子任務上報結果計算進度,通知 Executor,通知粒度為資料條數。如果接入方不主動上報子任務進度,Scheduler 會根據子任務完成度進行通知,通知粒度為分片粒度。

分片任務流程

7.png

削峰填谷-流量控制

背景:提供能力,而非施加壓力

在 Scheduler 設計初期時,更多的是把注意力放在瞭如何能夠快速、準確、低延遲的觸發任務,為此還多次優化了觸發器、分派器、派遣器三大模組的輪詢邏輯,但是忽略了任務量過大時下游能否抗住流量的問題。

如果 Scheduler 在排程時無法準確感知下游壓力,那麼很容易將下游打掛,如:在定時任務首次上線時,因為 kunlun 的裝包機制導致數千個應用下配置了同樣的定時任務,雖然一個包內的數十個定時任務觸發時間分散,但是應用包之間的同一個任務觸發時間相同,導致下游需要在同一時刻處理數千個任務,再加上任務的處理流程還會通過訊息中介軟體進行擴散,導致資料庫在任務執行階段一直處理低 IDLE 階段。

分析:流量追蹤,剝繭抽絲

目前大部分後端服務,通過分析任務的流量走向,可以大致確認每一條任務在執行過程中不論擴散還是非擴散流量都會走向 DB,流量圖大致如圖。

8.png

任務的流量最終打到了 DB,所以流量控制的目標就更加清晰:對 DB 的流量控制。

需要對 DB 進行流量控制,那麼就要設定合理的指標,理論上,只要指標採納的足夠合理,就能嚴格、準確的控制流量,指標則需要具備以下條件:

  1. 實時性。能夠準時反應資料庫健康狀況。
  1. 權威性。能夠準確反應資料庫健康狀況。

優先順序: 實時性 > 權威性。當一個指標的實時性不夠高,那麼它的權威性就不再有價值。

只需要實時監聽著 DB 的指標,來判斷任務是立刻執行,還是延遲執行就能有效的保護 DB。

指標選擇

  1. 消費 metrics 的監控點,關於資料庫的打點資訊非常全面,能夠非常輕易的獲取到資料庫宿主機的 CPU、記憶體或資料庫本身的連線數、查詢數等指標,這些指標的權威性毋庸置疑,但是 metrics 通過將指標收集到本地代理,代理每 30s 做一次聚合傳送至服務端,其時效性太差。
  1. 資料庫不可用因素為:大量任務觸發 -> DB 訪問流量增高 -> CPU idle 降低 -> 資料庫不可用。造成 CPU idle 降低的因素為 DB 流量增高,可以將 DB 的流量作為指標進行流量控制,缺點是需要自己採集指標。

指標收集

指標範圍

反映 DB 壓力較為直接的指標是 cpu idle,但考慮到服務部署往往多例項以及 cpu idle 採集難度大的情況,以近似指標來代替。另一方面,通過歷史資料分析,DB 流量與 cpu idle 有一定的關聯,因此以 DB 流量作為 DB 壓力指標。

圖.png

資料儲存

參考限流的實現方案,採用單獨的 Redis 儲存流量資料,以 1s 為時間視窗作為 Redis key,每個時間視窗的流量作為 Redis value,每次發生 DB 操作時更新流量資料。系統中存在多個 DB,每個 DB 單獨統計,在 Redis key 中加入 db 資訊。Redis key 設定10s過期時間,查詢時根據過去3個視窗的加權平均(80%/15%/5%)作為當前流量,以處理視窗交界處的突發流量。

收集方式

目前 DB 流量已有 metrics 監控資料,但由於 metrics 會在本地聚合 30s 資料後上報,至少會有 30s的延遲。而造成 DB 壓力大的定時任務多為短期集中觸發,使用 metrics 資料會有感知不及時的問題,因此需要額外收集資料。參考 DB metrics 資料採集的方式,通過 Gorm 的 callback 機制插入具體的採集邏輯,減少對業務程式碼的侵入。 ```` func SetMonitorCallBack(db *gorm.DB) { db.Callback().Create().Before("gorm:before_create").Register("metric:before_create", beforeCreateCallback) db.Callback().Delete().Before("gorm:before_delete").Register("metric:before_delete", beforeDeleteCallback) ... }

func beforeCreateCallback(scope *gorm.Scope) { beforeCallback(scope, "create") }

func beforeCallback(scope *gorm.Scope, method string) { ... } ````

在採集邏輯上,需要考慮以下幾個問題:

  • 效能:callback 中需要儘量減少延遲,優先使用非同步的方式上報資料。使用 channel 充當佇列,callback 中將資料寫入 channel,當 channel 容量滿時丟棄資料,防止阻塞。另有非同步協程從 channel 中取資料上報。兼顧時效性和網路開銷,在上報前預先在本地以 100ms 視窗做聚合。

```` type MetricType int8

const ( QueryCount MetricType = 1 )

type DBMetric struct { DBName string DBMethod string Type MetricType Timestamp int64 Value interface{} }

// callback中將metric資料寫入channel func beforeCallback(scope *gorm.Scope, method string) { dbName := getStringValueFromCtx(scope.DB().Ctx, CtxVariableDBName) curMs := time.Now().UnixNano()/int64(time.Millisecond) metric := DBMetric{dbName, method, QueryCount, curMs, 1} select { case ch <- metric: default: // channel is full, ignore this metric } }

// 非同步上報,在resource_sdk的Init()中根據配置判斷是否啟動此協程 func metricAgent() { windowSize := 100 // window time -> (metric key(dbName|dbMethod|type) -> metric) aggrMetrics := map[int64]map[string]DBMetric timer := time.NewTicker(windowSize) defer func() { timer.Stop() }() for { select { case msg := <-ch: curWindow := curMs/windowSize 更新aggrMetrics中curWindow對應的metric(對queryCount來說是加1) case <-timer.C: 將aggrMetrics中key+windowSize<=curTime的資料上報並清除 } } } ````

  • 運維成本:採集邏輯會執行在各個服務上,考慮到後續會收集更多的指標,直接上報 Redis 需要給各個服務開通讀寫許可權,運維管理成本較高。基於此,使用額外的服務來管理指標資料,接收上報的指標資料存入 Redis,並通過介面的方式提供查詢服務。指標存放在更加聚焦在 DB 資源的 resource 服務中,在 resource 服務中通過增加介面的方式實現指標資料的管理功能,同時,為了不影響 resource 原有業務的穩定性,使用單獨的叢集提供服務。

9.png

Scheduler 排程反饋

流量閾值限制

Scheduler 排程速率與 DB 負載之間的關係較為複雜,本期採用簡單的閾值反饋機制,設定 DB 流量閾值,當流量超出閾值時,停止 Scheduler 當前週期排程。根據歷史資料,設定閾值為 5K。

當流量未超出閾值時,不能預估任務對 DB 流量的影響,採用簡單策略對任務數進行限制:

任務數 = max((DB流量閾值 - DB當前流量)* 100 / DB 流量閾值, 0)

DB路由

目前 Kunlun 的 DB 資源根據租戶進行分配,不同租戶的資料和流量會落在不同的 DB 上。Scheduler 會記錄 Job 所處租戶,所以在排程時,需要根據租戶查詢真實的 DB 資源,通過 DB 指標的健康狀況來決定是否派遣任務:

  • 查詢 Job 所在租戶分配的 pg 資源標識
  • 根據 pg 資源標識去 Redis 查詢對應的流量資料

排程控制流程

10.png

- END -