極客星球 | 資料質量監控的設計與實現

語言: CN / TW / HK

前言

講資料質量監控重要性和理論的文章,去網上可以搜到很多,這篇文章主要講怎麼設計和開發資料質量監控平臺。

第一部分:介紹

資料質量監控平臺(DQC)是支援多資料來源的根據使用者配置的質量監控規則及時發現問題,並通過郵件通知告警的一站式平臺。

目前,資料質量監控功能為使用者提供10餘種預設的資料質量檢測模板,支援:PSI計算、缺失分割槽和及時性檢查、磁碟波動率檢查、錶行數波動率檢查、飽和度檢查、列異常資料檢查、主鍵重複值檢查和計算統計指標等功能。

想了解我們公司的資料處理流程和資料監控內容,可以參考墨竹的文章,但請注意截圖為老版本DQC,新版本UI發生了較多的變化。平臺新版本的截圖如下。極客星球| 資料質量保障之路的探索與實踐

DQC,在架構上分為2層,java開發負責的業務層,大資料開發負責的資料處理層。

業務層是一個web服務,實現了對使用者和檢測物件的管理,對檢測物件的計算排程,對返回監控結果的展示。

資料處理層完成了具體的資料質量監控任務。DQC的架構圖如下:

因為本人是大資料平臺開發,所以設計和實現主要集中在資料質量監控的資料處理層。業務層的檢測物件配置和排程因為和資料處理層聯絡比較密切,所以也參與了設計。

第二部分:設計

資料質量的核心功能實現,大致就是根據需要預先定義的監控指標設計出計算模版(sql和api)。

然後根據具體需要監控的物件填充對應的模版就可以進行監控。

下面將展示我設計和實現DQC的過程。

一、基本思路

規則的顯示大致可以分為2類:

1.sql可以解決的,例如錶行數波動率,資料異常率等。

2.其他型別,通過指令碼或者介面來實現,例如缺失分割槽檢查,磁碟容量波動率等。

先給一個波動率的公式:

​1.1 SQL模版具體的例子(錶行數波動率):

1 CREATE TEMPORARY VIEW a as

2 SELECT COUNT(*)

3 FROM ${database}.${table}

4 WHERE day = 'today - period';

5

6 CREATE TEMPORARY VIEW b as

7 SELECT COUNT(*)

8 FROM ${database}.${table}

9 WHERE day = 'today';

10

11 SELECT (b.cnt - a.cnt) / b.cnt as rate

12 FROM a JOIN b

在這個錶行數波動率sql模版中需要填充的變數是庫表名和時間週期(日,2周,1月等)。

在這個sql模版在實際中會有很多不足,例如日期欄位不是day而是date,使用者還需要額外的可配置過濾條件等等。這些都需要在結合生產進行進一步的設計和實現。

1.2 指令碼的例子(磁碟容量波動率):

以下的程式碼並不能執行,需要再附加一些額外處理。

rate = (b - a) / b

從這2個例子中,可以看出這裡一共由2個子步驟組成。首先是就是獲取規則中所需要的統計數量(錶行數,磁碟容量等);然後是根據具體的規則指標(波動率)進行計算。

通過計算得到規則實際結果,需要跟預期值進行比較。例如希望錶行數波動在[-0.2, 0.2]之間,如果是0.5就告警,通知對應負責人進行處理。

二、模組化設計

根據基本思路中三步驟,來詳細展開。我在設計系統的時候,參考了ebay開源的Apache Griffin。

2.1 統計數量的計算獲取

統計數量的計算獲取可以分為2個部分,一個是資料來源,一個統計數量。

資料來源可以是hive,hbase,mysql,mongodb等等。

統計數量也就是計算或者是獲取到對應的資料,在基本思路中可以看到,這裡的計算和獲取動作是會重複的。所以在這一層是可以做一些抽象的。可以細化出一些小的方法,然後進行組合,來得到實際的效果。

這一層抽象出了資料來源和資料計運算元步驟。

2.2 規則指標的計算

規則指標的計算就是最重要的一部分,這部分會影響到上面的統計數量具體的實現。

上面是我們在實際生產經驗中提取出來需要監控的規則指標。

一共14個,除了第一個缺失分割槽檢查可以直接得到結果。後面都含有計算邏輯在其中。統計指標我直接使用spark dataframe的summary運算元得到的,這裡也可以使用udf來實現。其他的12個大致由以下操作(操作包含了2.1中統計數量)得到。

1.檢查資料是否匹配 Accuracy

2.檢查資料完整性 Completeness

3.資料去重計數和區間計數 Distinctness

4.環比 Ratio

5.PSI

所以在這裡,我根據需要計算的指標模版進行了詳細的操作拆分,形成4個計算步驟組合,其子步驟部分,不同步驟組之間可以共享,也可以單獨實現子步驟組。觀察上面可以看到有sample這個步驟,做資料質量檢查的時候,全量有時候太慢,就失去了及時性,同時也會對叢集資源造成壓力。所以需要提供資料取樣功能。

這裡附一張我們系統實現的步驟組。多出來的2個,MultiRatio是做分組波動計算的,算是ratio的擴充套件。profiling可以算是子步驟的一種,使用者實際處理資料中,會有更高的要求,profiling這個步驟組可以完成spark sql計算的所有功能(使用者並不會傳完整的sql,所以需要這樣一個步驟組來處理)。本人對特殊指標,例如缺失分割槽檢查,就單獨實現了,並不會用到步驟組,後面實現部分可以看到另一層抽象。

所以在這個部分,我們完成對整個資料計算的步驟抽象。

2.3 結果的檢查與輸出

經過以上2步,我們就到了指標結果。我們需要將指標結果落庫,同時對指標結果進行檢查,不符合使用者設定的閾值就傳送郵件告警,讓使用者來處理。

結果輸出部分,結果是以約定的json形式儲存,採用json儲存,看中其可伸縮性。原因如下:在計算指標的時候,一些中間結果會在查錯提供很大幫助。例如:是我們計算異常佔比的時候,同時會計算出異常值的數量和總量,當結果不符合預期的時候,這些內容是很重要的查錯資訊。我們可以選擇mongodb,mysql等儲存最終結果。

結果檢查部分,我用了scala的解析器組合子庫來實現,具體見實現。在結果輸出部分提到了,結果可能輸出多個,那同樣的,使用者也就可以同時檢查多個結果值,提供了dsl來完成這項工作。

在這一層,我們完成了結果輸出的抽象和結果檢查的功能實現。

三、實現

整個資料處理層會使用spark來作為計算引擎。根據業務層傳來的Json引數來進行具體的任務生成。下圖是任務引數框架。

1. 資料來源

因為採用spark作為計算引擎,所以從資料來源中獲取對應的資料用spark的運算元即可。spark本身實現對很多資料來源的資料提取,所以整個資料來源部分的實現相對簡單,可以根據自己實際的需求來編寫資料來源類。對每種資料來源會有自己需要定製化的內容,我們在config引數中進行即可。

我會拿Hive資料來源講解一下,實現了哪些內容,資料庫類的資料來源相對容易實現。針對Hive資料來源,首先是需要一些過濾條件,這部分以key value對的方式來配置。

在資料質量檢查中最重要的一個變數就是時間。關於時間,主要有2個需要處理的點。

1.因為具體的時間格式不同,有的是yyyyMM, yyyy-MM-dd,或者具體到時間。我們採用的方案是業務層統一傳yyyyMMdd,其為變數$day,可以在分割槽欄位中使用即可。還提供了${latest_partition}變數來獲取最新分割槽。以及對這2個變數可以使用spark udf來獲取指定的時間格式。

2.另外一些指標,例如波動率,是同一個資料2個時間點的比較。所以還會帶入時間週期的概念。例如每天,每週二,每月16日。

2. 構建計算步驟

這裡許多概念和Apache Griffin一致,但具體實現有很大差異。

業務層首先會根據觸發條件和庫表聚合排程,使得同一個資料來源同一日期的檢測物件一起排程。

檢測物件下文都稱為rule。

第一步 rule的處理

構建的第一步就是對rule進行處理,例如psi這類的rule,可以先拆分成多個rule,例如分佈區間的psi,可以拆分為分佈區間和psi 2個rule,這裡就存在了依賴關係。構建了一個rule的DAG。

第二步 rule轉換成執行計劃

一個rule轉換成一個執行計劃。執行計劃的功能就是生成具體的任務步驟。

第三步 生成計算步驟

這一步有很多的優化空間,詳細內容見四、優化

計算步驟分為4大類:

1.不需要走spark任務計算,例如磁碟容量,直接用hadoop api查詢即可。

2.spark sql步驟組就是2.2 規則指標的計算中提到的,由4組成。

3.spark 運算元,例如查詢hbase和資料庫,summary運算元之類的操作。

4.spark sql計算步驟,寫好的spark sql,預留了變數填充。

每一個執行計劃會對應一個或者多個計算步驟。如下圖,展示了錶行數波動率和異常資料佔比率的轉換過程。

3 .計算步驟執行

經過上述的轉換,我們可以得到計算步驟的DAG,之後對這個DAG進行併發執行就可以了。

這樣的併發執行可以更充分的使用sprak executor的資源。比如序列執行,200cores的資源,一個job因為資料傾斜等原因,最後20個task執行較慢。這時候後面的job並不會啟動,而且可能因為等待時間過長,部分executor被釋放了。但如果是併發執行,空閒的180cores就會執行其他未完成的job。

4. 結果告警

結果檢查部分,我用了scala的解析器組合子庫來實現,BNF和規則如下圖。得到結果後就會發給郵件指定的負責人,這裡也可以支援簡訊和電話。

5. 結果輸出

這裡是結果落庫的步驟,根據不同庫的寫入方式落庫即可。

四、優化

這裡的優化都是針對構建計算步驟的。整體的優化方向就是合併DAG和複用已經存在的結果。

1. 過程複用

計算步驟DAG可以根據節點是否計算邏輯相同進行合併重組,形成優化的計算步驟DAG。

如圖所示,16個計算步驟被優化成11個計算步驟。被複用的節點會用 df.cache() 運算元來進行快取,加速計算。

這裡主要實現有節點的身份資訊(其計算邏輯和資料資訊),依據節點的身份資訊來進行比較合併。

同時需要依據更新的節點資訊來合併多個子DAG。

2 .計算步驟DAG節點轉換

常量替換

計算異常資料波動率,公式為異常資料量/總資料量。如果使用者使用該規則,取樣了10w資料,那總資料量其實並不需要觸發計算,可以直接轉換成常量。公式就會轉換成異常資料量 / 10w。

過往結果複用

錶行數波動率(每日),公式為(今⽇資料量—昨⽇資料量)/ 昨⽇資料量。

7月2日的時候,公式為

7月3日的時候,公式為

可以從中看到7月3日計算錶行數波動率時,7月2日的Cnt可以讀取前一天的結果,從而避免計算。但需要注意的是,複用這個結果之前,需要檢查前一天結果計算完畢之後,分割槽資料和檢測物件配置有沒有發生改變。沒有發生改變才可以複用,否則,必須重新計算。

這種將計算步驟轉換成常量讀取,來規避觸發計算,也能帶來顯著的效能提升。