深入思考 Schema 管理的幾個基本問題

語言: CN / TW / HK

前言

我發現理解某一個具體「事物」最好的方式是先去理解其背後所遵循的「正規化」。正規化是一個領域中某個行事套路,某種方法論,或者是某些踩坑經驗總結。一個領域中可以有多種正規化,在不同場景下,正規化之間有優劣之分,一個正規化能流行起來,最終成為一種「行事標準」,通常意味著它在當前「時代背景」下被大量驗證過,當然,隨著時間的推移(也許因為一些基礎設施的發展),原來流行的正規化也會過時(從早期的 ETL 到現在的 ELT 就是一個典型的例子)。

對於程式語言來說,逃不開的幾個 基本問題 是:

  • 型別模型

  • 程式設計正規化(過程式,函式式,面向物件等等)

  • 怎樣和語言互動

  • 語言的判斷結構和核心資料結構

  • 與眾不同的核心特性

熟悉這些主題,從某種意義上來說,也就抓住了程式語言的本質,如果你現在要主導設計一門新的語言,你至少知道要考慮哪些基本問題,在此基礎上,你可以針對目標場景給出更有競爭力的實現。

作為半路出家 BI 領域的我來說,日常工作中經常有「只見樹木,不見森林」的感覺,比如當我想針對「某一類經常出現的問題」進行優化時,我發現自己的思考是「缺乏體系」的。其中讓我感覺困惑的點在於:

  • 一開始我都不知道自己經常遇到的問題其實是一個特定領域問題,而且它並不簡單

  • 「某類問題」對應的領域是什麼?整個團隊都有相同的認知嗎?

  • 該領域中需要考慮的 基本問題 是什麼?

這些問題最好是能有一個經驗豐富且善於敘述的“前輩”指點一二,否則日常工作容易不得要領。前段時間周圍同事紛紛推薦《 Designing Clould Data Platforms 》,我跟著讀了幾章後感覺找到了入門的臺階。

在我看來,這本書描述了雲原生架構的設計正規化,閱讀過程中對於提到的概念並不陌生,但同時也把我日常遇到的問題全部串了起來。比如,作者告訴我們設計一個雲原生的資料分析平臺,需要考慮的 基本問題 是什麼?

  • ingesting data from RDBM

而當上下游系統之間存在「重複的資料遷移」關係,且下游系統對 schema 敏感時,就出現了 schema 管理問題。什麼意思?我們從一個「Data Warehouse(以下簡稱 DW)載入檔案資料」的例子開始講起。

DW 在載入資料過程中,資料總是先被 load 到 DW 的 landing table。

landing table 在 DW 中的作用是用來存放從資料來源抽取的新資料,它的 schema 資訊會直接仿照資料來源的 schema 資訊。

當 DW 完成第一次載入時,兩邊的 schema 資訊將保持一致。此時如果修改資料來源 transaction_amount 欄位為 transaction_total,就像這樣:

那麼資料載入就會失敗,資料工程師此時就要開始介入並維護 landing table 了。這種工作模式看起來很低效對不對?

後來,隨著 Hadoop 興起,開始出現“schema on read”的概念。相對與 DW 的“schema on write”模式,Hadoop 所基於的檔案系統 HDFS 在資料寫入階段並不關心其 schema 資訊。

schema-on-write:需要先明確 schema 資訊,建立表,才能開始寫入資料。典型代表 Mysql,DW 等

schema-on-read:資料寫入階段無需關注 schema 資訊,它就是資料拷貝的過程,只有在讀取資料的時候才會開始關注 schema 資訊。典型代表 HDFS

現在假設我們用 HDFS 來替換 DW,看看情況有沒有變好?

這次,我們把下游的 ETL 邏輯也加進來。ETL pipeline 在執行時本質上是生成一段 sql(本書說描述的 clould data platform 底層基於 spark,所以生成的是 sparkSQL),sql 會引用具體的欄位名稱。

同樣的,我們去修改資料來源的欄位名稱,你會發現,HDFS 這一層在載入新資料時並不會出錯,但是最終 ETL 執行出錯了,原因是 transaction_amount 欄位不存在。

現在看來,具備“schema on read”機制的儲存的確可以減輕資料工程師的部分工作(至少不用維護 data landing 的過程),但並沒有真正解決 schema 變更所導致的問題,只不過把問題往後推了一步。

事實上,在實際應用中不同公司面對這種情況處理的方式不一樣。有些大公司會在 schema 改變發生的當下主動提交“change request”,目的是儘可能避免或者減輕下游系統的錯誤,整個過程會謹慎規劃,花幾個星期甚至幾個月的時間來完成這件事情一點也不奇怪。而在一些體量比較小的公司,他們有另外一套策略,那就是啥也不幹,直到下游 ETL 出錯,然後讓資料工程師自行修改,這當然會導致很不好的使用者體驗。

不管怎麼樣,我們需要意識到,schema 的變更管理在資料分析領域是不可忽視的問題。並且以上所述都是一種「手動管理」的方式,我們接下來要開始探索更聰明的做法。

Schema 的管理思路

我的理解,schema 的管理思路可以簡單概括為「共享」和「拷貝」兩種。

「共享」這種方式,也就是作者所說的 schema as a contract,是一種中心化管理思路。

我們想象有這麼一個 Schema Registry 倉庫(裡面儲存了所有資料來源的 schema 資訊),上游資料來源在每次 schema 變更時,都主動推送到 Schema Registry,而下游資料消費者每次需要的時候來 Schema Registry 引用最新版本的 schema。

這種做法有一些好處,比如:

  • 上下游職責邊界清晰

  • 比較容易擴充套件新的資料消費者

  • 欄位只需要維護一份就好(但這隻能向後相容資料,關於相容問題下文會繼續說明)

但想實現這種思路,有一個前提,就是資料來源和資料消費者,兩撥研發團隊需要高度協同,簡直就要像一個團隊一樣開發。這是一個幾乎無法實現的方案,除非涉及的所有資料來源都是公司內部自研系統。

剩下就是基於「拷貝」的方式了,即資料在上下游系統轉移的過程中,schema 資訊是不斷被複制的,比如資料從資料來源到 DW 過程中,schema 資訊就在 DW 的 landing table 中複製了一份。

和「共享」方式相比,最大的區別在於「管理 schema 的職責」完全轉移到了資料分析平臺,而資料生產者,也就是使用資料分析平臺的使用者,不需要去關心這些細節。這也是接下來 Schema-management 實現思路的基調。

實現 Schema-management module

概要

在這一小節中,我將和大家分享資料分析平臺需要關注的幾個 基本問題

  • 資料進入平臺時如何獲取資料來源的 schema 資訊?(主要包括欄位名稱,欄位型別)

欄位名稱容易獲取,但對於像 CSV,JSON 這種格式的資料,我們怎麼拿到資料型別?

  • 基於「拷貝」的 Schema Registry 的設計問題

Schema Registry 是 schema 的倉庫,需要存哪些資訊?大致需要哪些介面?

  • 資料來源 schema 變更時,如何保證平臺 common transformation 過程中的相容問題

關於 common transformation 下文會討論

  • 資料來源 schema 變更時,如何自動管理下游數以千計的 custom transformation

關於 custom transformation 下文會討論

  • 資料來源 schema 變更時,如何自動級聯變更下游其他儲存的 schema

資料來源的資料經過 ETL 的處理之後,最終又被存到 DW 供資料消費者分析,而 DW 的 schema 如何級聯變更呢?

  • 資料來源 schema 變更時,有哪些問題是必須要使用者參與手動維護的

程式不是銀彈,我們需要理解哪些情況是無法被自動化的,然後思考方案如何用最優雅的方式讓客戶參與維護?

一個現代的雲原生資料分析平臺,肯定不會如此簡單。

給大家展示一個被簡化了的雲原生資料分析平臺架構,一起看下它的大致流程是怎樣的?

當資料進入平臺時大體上會經過三個步驟的處理:

  • 第一步,資料抽取以及 data landing 的過程

  • 第二步,common data transformation

  • 第三步,custom data transformation

custom data transformation 指的是諸如 ETL,reports 等等 pipeline

我們進一步細化上述第二步,什麼是 common data transformation?它大概負責哪些工作?

資料來源資料在平臺 landing 之後,需要做通用的轉換處理:

  • Data format conversion module

資料來源的格式各種各樣,比如 CSV,JSON,XML,甚至還有二進位制資料,一個很直接的問題是後續的 analytics pipelines 該怎麼基於這些格式構建呢?這中間需要做一層抽象和解耦,該模組的工作就是統一資料格式。而在實際應用中,我們會結合使用avro和parquet兩種格式。

  • Deduplication module

這是一個比較大的話題,本書主要指重複資料清理,感興趣的還可以瞭解一下 MDM tools

  • Data quality checks module

按照使用者的規則對資料來源資料質量做檢查,保證拿到的是“乾淨”的資料

現在,我們要新加入一個環節:Schema-management module,它需要做的事情是,檢查資料來源的 schema 資訊是否已經存在 Schema Registry 中:

  • 如果不存在:

  • 推測新資料的 schema 資訊

  • 將該 schema 資訊註冊到 Schema Registry 中,並將版本號設定為 1

  • 如果存在:

  • 獲取 Registry 中的 schema 資訊

  • 推測新資料的 schema 資訊

  • 對比上述兩個 schema 資訊,並已「向後相容」的方式做 combine 操作(關於相容問題下文會討論)

  • 將最終結果以一個新的版本註冊到 Schema Registry 中

值得注意的是,第二步和第三步都會和 Schema Registry 有互動,這也就意味著 schema 變更會影響到這兩個步驟,在後面會逐步展開討論。

Schema 資訊獲取——欄位推測

在 Schema-management module 中第一個 基本問題 是「需要有方案知道資料來源的 schema 資訊(主要包括名稱和型別)」。對於 RDBMS 型別的資料來源,schema 資訊是很容易獲取的。但對於像 CSV 或者 JSON 這樣的資料來源,則需要通過「欄位推測」的方式來獲取。

怎麼做欄位推測呢?幸運的是,咱們的平臺底層使用 Spark 作為計算框架來處理各種資料轉換,spark 自帶一個強大的功能叫 schema inference。它的大概原理是讀取資料的前 1000 行,然後自動解析出欄位的型別資訊,這在解析 CSV 檔案或者高度巢狀的 JSON 有非常好的表現。我們通過一個JSON生成工具得到以下資料:

使用 spark shell 可以快速驗證「欄位推測」功能。

值得注意的是,以上 spark 的推測結果,其實使用的是 spark 內部自帶的型別系統,我們當然可以把這當成最終結果,但考慮到我們設計的是一個能廣泛相容的資料平臺,所以我們會考慮將 spark 推測出來的 schema 資訊轉換成 Avro Schema 再存入我們的 Registry 當中。

上文提到在 Common data transformation 中有一個環節是 Data format conversion,其目的是要統一資料來源的資料格式,這可以給下游的 Custom data transformation 提供一個統一的抽象層,使得程式碼耦合度大幅降低。同樣的,我們也希望在「資料型別」這件事情上能做到統一,廣泛相容。而 Avro Schema 是非常合適的選擇,它支援非常多通用的原生資料型別:strings,integers, float,null 等等,同時也支援複雜型別,比如 records,arrays,enums 等等。

下面簡單展示 Spark schema 和 Avro schema 的轉換方式。

Schema Registry 的設計

拿到了 schema 資訊之後,我們需要考慮的第二個 基本問題 是 Schema Registry 應該怎麼設計?包括需要存哪些資訊?需要提供什麼介面?相信這個難不倒大家。

基本結構就是 DB+API layer,我們先看下 Schema Registry 和其他模組的互動大概是怎樣的?

  • 在資料 landing 的過程中,Ingestion pipelines 會往 Registry 中增加或者更新資料

  • 而下游的 transformation pipelines(如 ETL)在構建過程中首先需要讀取 schema 資訊,其次,它最終的輸出也是一個新的資料來源,自然也會往 Registry 中增加資料

  • 監控工具也會週期性的檢查 schema 的 version,並給使用者以提醒

此處監控 schema 變更的目的是什麼?後面會詳細討論。

梳理清楚需求之後,大致也知道需要哪些 API:

  • 根據資料來源獲取當前的 schema 版本

  • 增加新版本的 schema 資料

  • 更新 schema 基本資訊

而表字段的設計可以像這樣(幫助大家理解,並不一定是最終實現):

  • ID

  • Version

  • Schema

  • Created Timestamp

  • Last Updated Timestamp

值得一提的是,為什麼我們要為 schema 記錄歷史版本呢?有一個直接好處是我們知道一個數據源的 schema 資訊的歷史變化情況,這對 debug 以及 troubleshooting 是有非常大的好處的。而在我們即將要討論的相容問題中,你會發現版本資訊的另一個好處。

Schema 變更場景

我們已經知道怎麼獲取 schema 資訊,也知道怎麼儲存這些資訊。是時候開始討論 schema 的變更場景了。

首先考慮一個簡單的問題,資料來源 schema 可能有哪些關鍵變化呢?

  • 增加一個欄位

  • 刪除一個欄位

  • 重新命名欄位

  • 修改欄位型別

其次回憶一下,資料來源 schema 的變更對哪些環節有影響?

我們拆解一下流程,重新理解各個環節所做的工作以及和 schema 的基本關係。

第一步,資料來源的資料經過 Ingestion layer 不斷寫入平臺,就像這樣:

資料來源 schema 的變化,對 Ingestion layer 並沒有什麼影響,它總是以最新的版本(也就是資料來源當前的 schema)寫入資料。所以隨著時間的推移,對於同一個資料來源,在平臺中可能存在部分老資料是用 schema V1 寫的,部分新資料是用 schema V2 寫的。

第二步,Common transformation pipelines,該環節需要做幾個工作(除 Schema management 以外):

  • Data format conversion module

  • Deduplication module

  • Data quality checks module

簡單理解,它需要對 Ingestion layer 寫入的資料進行二次處理。

第三步,Custom transformation pipelines,該環節使用者會自定義 ETL 資料處理邏輯,而 ETL 最終會輸出一個新的資料來源

有沒有發現,第二步和第三步都涉及到對已有資料的讀操作。既然如此,我們就不難想到會出現以下幾種情況:

  • 用新版本 Schema,讀取新資料(肯定不會有問題)

  • 用新版本 Schema,讀取老資料(會出問題嗎?)

  • 用老版本 Schema,讀取新資料(會出問題嗎?)

  • 用老版本 Schema,讀取老資料(肯定不會有問題)

對於第一種和第四種,肯定不會有問題,那麼對於中間兩種呢?這裡需要引入兩個概念: 向後相容與向前相容

當我們說某 schema 變更「 向後相容 」時,它指的是,data transformation pipelines(不管是 Common 還是 Custom)用最新版本的 schema 可以正常讀取老資料(用老版本的 schema 寫入的資料)。

當我們說某 schema 變更「 向前相容 」時,它指的是,data transformation pipelines 用老版本的 schema 可以正常讀取新資料(用新版本的 schema 寫入的資料)。

鋪墊的差不多了,最後,當我們考慮「schema 變更」所產生的影響時,一定要牢記一個藍圖,即在 schema 變更時,我們的終極目標不僅僅要保證 Common transformation 環節能正常讀取資料,還要保證下游成百上千的 ETL pipelines 以及 reports(儀表板),能跟著一起變更並且正常執行(下游的 Custom transformation 同樣會依賴資料來源的欄位),這樣可以極大的提高使用者的使用體驗以及效率。

Schema 變更對 Common Transformation Pipelines 的影響

我們從 Common transformation 開始談起,討論一下「用新版本 Schema,讀取老資料」以及「用老版本 Schema,讀取新資料」分別會發生什麼?

在下面的例子中,有一個單一資料來源已經完成了一輪資料抽取,使用 schema V1 往平臺寫入了資料。此時資料來源增加了一個欄位 column_3,並且通過 Ingestion layer 寫入了新的資料。

如果 Common transformation pipelines 用 schema V2 去讀取老資料會怎麼樣?

Avro 格式定義了幾種處理規則,使得 schema 變更可以向後相容。在這個例子中,Avro 使用 schema V2 讀取老資料時會自動為 column_3 欄位設定一個預設值,通常預設值是一個 empty 或者“null”值,當然也可以設定和欄位型別相匹配的預設值, 所以「增加一個欄位」對 Avro 來說是向後相容的

我們繼續,現在假設 Common transformation pipelines 因為某種原因,沒有立馬切換新版本,而是用 schema V1 去讀取新資料,會發生什麼?

Avro 會直接忽略新加的欄位,當前的 Common transformation piplines 不會有任何問題,piplines 可以在晚些時候再切換到新版本的 schema。 所以「刪除一個欄位」對 Avro 來說是向前相容的

雖然咱們的 pipleline 可以允許 schema 版本延遲切換,但我們並不建議這麼幹,因為使用者大概率是希望能儘快看到新的欄位。及時同步資料來源 schema 變更總是好的,這會讓使用者感覺到咱們的資料平臺是非常在意這件事情的。這也是後面我們要討論的「監控 Schema 變更」的原因之一。

我們已經討論了增加列和刪除列的相容性, 那麼重新命名相容性呢? 相信你也想到了,其實重新命名就等於刪除列+增加列,對 Avro 來說,如果該列有預設值,那麼重新命名操作是前後相容的,否則,就是前後都不相容。

最後一種操作是修改欄位型別。Avro 支援 promoting 欄位型別,保證資料不會丟失。比如 Avro 可以把 int 擴充套件成 long,float,和 double 型別。你可以在該文件中瞭解到更多 promote 資訊。

Schema 變更對 Custom Transformation Pipelines 的影響

custom transformation 和 common transformation 最大的區別在於前者開始加入了業務邏輯,而且數量上會變的很多(對於一箇中大型的客戶來說 ETL 或者 reports 數量往往是數以千記的),進而還會引出更多管理問題。

但在相容性問題上兩者沒有本質的區別。我們還是用類似的例子說明,資料來源刪除了 column_2,增加了 column_3(也可以說是 column_2 重新命名成了 column_3)。

同樣的,我們考慮幾種情況,custom transformation 使用老版本 schema 能讀取新資料嗎?這取決於當初建立 column_2 的時候有沒有設定預設值,如果有,那麼沒問題。使用新版本 schema 能讀取老資料嗎?這同樣取決於建立 column_3 時有設定預設值嗎?如果有,那麼也沒問題。

我們發現,使用 avro 的一個最佳實踐是「設定合適的預設值」,這樣會最大程度上保證資料的相容性。下面的表格詳細的說明了 schema 變更和相容性的關係。

下游儲存的 Schema 級聯變更

custom transformation pipeline 的輸出,比如 ETL,可能會作為一個新的資料來源,寫入到 DW,所以上游 schema 變更的時候,DW 的 schema 怎麼修改?

這個過程就需要我們自己寫一些程式碼了,基本的思路是基於 scehma management 模組,根據 schema 的歷史變更,生成對應的 Alter table 的 sql 語句。

比如我們刪除欄位 column_2,增加了欄位 column_3,我們最終會生成一個類似 Alter table some_table add column column_3 這樣的 sql 去 DW 中執行。那為什麼僅增加了 column_3,沒有刪除 column_2 的操作呢?因為 DW 包含很多有價值的歷史資料,通常我們不會做刪除操作。

當然還有一種更粗暴的方式,就是每次 schema 變更的時候,直接重建 DW 的表,即刪除原表,然後根據新的 schema 重新載入所有歷史資料,但這僅適用於資料量比較小的場景。

需要注意的是,很多 DW 在修改 schema 的過程中是無法查詢的,所以我們要權衡修改 schema 的時間,否則對基於 DW 的報表服務將產生很大影響。

監控 Schema 變更

終於,我們把 schema 變更對 common transformation,custom transformation 以及下游 DW 的影響和對應的解決方案都討論了一遍,我們盡全力降低了因為 schema 變更給使用者帶來的影響——保持各種 data transformation pipeline 正常執行狀態。但我們任然需要有一個通知機制去告訴使用者欄位的修改情況,這不僅僅是因為我們無法 100%規避因資料不相容導致的 pipeline 執行報錯(比如找不到某欄位),哪怕 pipeline 本身沒報錯,其最終計算的結果也可能是錯的。

還是這個熟悉的例子,對於這樣一個數據源,我們刪除了 daily_sales 欄位,增加了 total_day_sales 欄位。因為 daily_sales 的預設值是 null(設定預設值是一個很好的習慣),那麼當前的 pipeline 是向後相容的,它能正常執行,但結果呢?這顯然不是客戶想看到的資料。

對於這種問題沒有更好的自動化解決方案,我們需要思考的是,怎麼優雅的通知客戶哪些報表可能已經出錯,並讓客戶以最方便的方式去 review 和調整各種 pipeline 邏輯。

現有的 catalog 專案實現

到目前為止,我們算是對 Schema management 這一領域問題有了整體的瞭解,而在該領域有哪些現成的產品呢?

  • aws clue data catalog

  • azure data catalog

  • google clould data catalog

  • Confluent

  • DataHub in Linkedin

  • Amundsen in Lyft

  • Marquez in WeWork

  • Dataportal in Airbnb

  • Lexikon in Spotify

  • Metacat in Netflix

  • Databook in Uber

我給大家列了一些專案,作為開拓視野都很值得大家去了解,看看別人是怎麼做的,自己的產品又是怎麼做的。

寫在最後

假如這篇文章可以給大家帶來一些價值,我希望它能幫助大家意識到該領域問題的存在,並構建對它的整體認知。日後工作中遇到該領域的問題時,眼光不再侷限在一個個點狀的 jira task,而是能清晰的知道該問題發生在什麼環節,能在一個領域體系內思考問題的原因,以及優化方案,甚至還能觸類旁通,找到該領域內其他優秀產品擴充套件自己的思路。

在寫這篇文章的過程中,原書《 Designing Clould Data Platforms 》的 schema management 章節(包括相關聯的章節)已經被我反覆讀過很多遍。和原文相比,我幾乎重新組織編排了內容,用我所理解的“循序漸進”的方式重新表達,這並不是說原文“邏輯混亂”,恰恰相反,哪怕像我這樣的英文渣渣也毫無閱讀障礙,只不過這是“充分理解”過程中不得不做的事情。另外,為了讓大家在閱讀過程中避免不必要的認知負載,我適當的做了一些知識遮蔽,如果對於一些概念任然有疑惑,還是建議大家親自看看這本書。

最後,如果本文有任何錯誤的觀點都與原作者無關,請在評論區告訴我,大家一起成長。