如何輕鬆做數據治理?開源技術棧告訴你答案

語言: CN / TW / HK

data-lineage

搭建一套數據治理體系耗時耗力,但或許我們沒有必要從頭開始搞自己的數據血緣項目。本文分享如何用開源、現代的 DataOps、ETL、Dashboard、元數據、數據血緣管理系統構建大數據治理基礎設施。

元數據治理系統

元數據治理系統是一個提供了所有數據在哪、格式化方式、生成、轉換、依賴、呈現和所屬的一站式視圖

元數據治理系統是所有數據倉庫、數據庫、表、儀表板、ETL 作業等的目錄接口(catalog),有了它,我們就不用在羣裏喊“大家好,我可以更改這個表的 schema 嗎?”、 “請問誰知道我如何找到 table-view-foo-bar 的原始數據?”…一個成熟的數據治理方案中的元數據治理系統,對數據團隊來説非常必要。

數據血緣則是元數據治理系統眾多需要管理的元數據之一,例如,某些 Dashboard 是某一個 Table View 的下游,而這個 Table View 又是從另外兩個上游表 JOIN 而來。顯然,應該清晰地掌握、管理這些信息,去構建一個可信、可控的系統和數據質量控制體系。

數據治理的可行方案

數據治理方案設計

元數據和數據血緣本質上非常適合採用圖數據建模、圖數據庫。因為數據治理涉及的典型查詢便是面向圖關係的查詢,像“查找指定組件(即表)的所有 n 度(深度)的數據血緣”就是圖查詢語句 FIND ALL PATH 跑起來的事。從日常大家在論壇、微信羣裏討論的查詢和圖建模來看,NebulaGraph 社區很多人在從零開始搭建數據血緣系統,而這些工作看起來大多是在重複造輪子,而且還是不容易造的輪子。

既然如此,前人種樹後人乘涼,這裏我決定搭建一個完備、端到端(不只有元數據管理)的數據系統,供大家參考解決數據血緣、數據治理問題。這個套數據系統會採用市面上優秀的開源項目,而圖數據庫這塊還是採用大家的老朋友——NebulaGraph。希望對大家能有所啟發,在此基礎之上擁有一個相對完善的圖模型,以及設計精巧、開箱即用的元數據治理系統。

下面,來看看元數據治理系統的輪子都需要哪些功能組件:

  • 元數據抽取
    • 這部分需要從不同的數據棧拉/推數據,像是從數據庫、數倉、Dashboard,甚至是 ETL Pipeline 和應用、服務中搞數據。
  • 元數據存儲
    • 可以存在數據庫、圖數據庫裏,甚至存成超大的 JSON manifest 文件都行
  • 元數據目錄接口系統 Catalog
    • 提供 API / GUI 來讀寫元數據和數據血緣系統

下圖是整個方案的簡單示意圖:

其中,上面的虛線框是元數據的來源與導入、下面的虛線框是元數據的存儲與展示、發現。

diagram-of-ref-project

開源技術棧

下面,介紹下數據治理系統的每個部分。

數據庫和數倉

為了處理和使用原始和中間數據,這裏一定涉及至少一個數據庫或者數倉。它可以是 Hive、Apache Delta、TiDB、Cassandra、MySQL 或 Postgres。

在這個參考項目中,我們選一個簡單、流行的 Postgres。  

✓ 數據倉庫:Postgres

數據運維 DataOps

我們應該有某種 DataOps 的方案,讓 Pipeline 和環境具有可重複性、可測試性和版本控制性。

在這裏,我們使用了 GitLab 創建的 Meltano

Meltano 是一個 just-work 的 DataOps 平台,它可以用巧妙且優雅的方式將 Singer 作為 EL 和 dbt 作為 T 連接起來。此外,它還連接到其他一些 dataInfra 實用程序,例如 Apache Superset 和 Apache Airflow 等。

至此,我們又納入了一個成員:

✓ GitOps:Meltano https://gitlab.com/meltano/meltano

ETL 工具

上面我們提到過組合 Singer 與 Meltano 將來自許多不同數據源的數據 E(提取)和 L(加載)數據目標,並使用 dbt 作為 Transform 的平台。於是我們得到:

✓ EL:Singer ✓ T: dbt

數據可視化

在數據之上創建 Dashboard、圖表和表格得到數據的洞察是很符合直覺的,類似大數據之上的 Excel 圖標功能。

Apache Superset 是我很喜歡的開源數據可視化項目,我準備用它來作為被治理管理的目標之一。同時,還會利用它實現可視化功能來完成元數據洞察。於是,

✓ Dashboard:Apache Superset

任務編排(DAG Job Orchestration)

在大多數情況下,我們的 DataOps 作業、任務會演變成需要編排系統的規模,我們可以用 Apache Airflow 來負責這一塊。

✓ DAG:Apache Airflow https://airflow.apache.org/

元數據治理

隨着越來越多的組件和數據被引入數據基礎設施,在數據庫、表、數據建模(schema)、Dashboard、DAG(編排系統中的有向無環圖)、應用與服務的各個生命週期階段中都將存着海量的元數據,需要對它們的管理員和團隊進行協同管理、連接和發現。

Linux Foundation Amundsen 是解決該問題的最佳項目之一。 Amundsen 用圖數據庫為事實源(single source of truth)以加速多跳查詢,Elasticsearch 為全文搜索引擎。它在順滑地處理所有元數據及其血緣之餘,還提供了優雅的 UI 和 API。 Amundsen 支持多種圖數據庫為後端,這裏咱們用 NebulaGraph

現在的技術棧:

✓ 數據發現:Linux Foundation Amundsen ✓ 全文搜索:Elasticsearch ✓ 圖數據庫:NebulaGraph

好的,所有組件都齊正了,開始組裝它們吧。

環境搭建與各組件初識

本次實踐的項目方案已開源,你可以訪問 https://github.com/wey-gu/data-lineage-ref-solution 來獲得對應的代碼。

整個實踐過程,我遵循了儘量乾淨、鼓勵共建的原則。項目預設在一個 unix-like 系統上運行,且聯網和裝有 Docker-Compose。

這裏,我將在 Ubuntu 20.04 LTS X86_64 上運行它,當然在其他發行版或 Linux 版本上應該也沒有問題。

運行一個數倉、數據庫

首先,安裝 Postgres 作為我們的數倉。

這個單行命令會創建一個使用 Docker 在後台運行的 Postgres,進程關閉之後容器不會殘留而是被清理掉(因為參數--rm)。

docker run --rm --name postgres \
    -e POSTGRES_PASSWORD=lineage_ref \
    -e POSTGRES_USER=lineage_ref \
    -e POSTGRES_DB=warehouse -d \
    -p 5432:5432 postgres

我們可以用 Postgres CLI 或 GUI 客户端來驗證命令是否執行成功。

DataOps 工具鏈部署

接下來,安裝有機結合了 Singer 和 dbt 的 Meltano。

Meltano 幫助我們管理 ETL 工具(作為插件)及其所有配置和 pipeline。這些元信息位於 Meltano 配置及其系統數據庫中,其中配置是基於文件的(可以使用 GitOps 管理),它的默認系統數據庫是 SQLite。

安裝 Meltano

使用 Meltano 的工作流是啟動一個“meltano 項目”並開始將 E、L 和 T 添加到配置文件中。Meltano 項目的啟動只需要一個 CLI 命令 meltano init yourprojectname。不過,在那之前,先用 Python 的包管理器 pip 或者 Docker 鏡像安裝 Meltano,像我示範的這樣:

在 Python 虛擬環境中使用 pip 安裝 Meltano:

mkdir .venv
# example in a debian flavor Linux distro
sudo apt-get install python3-dev python3-pip python3-venv python3-wheel -y
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano

# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
meltano init <yourprojectname>

或者,用 Docker 容器安裝 Meltano:

docker pull meltano/meltano:latest
docker run --rm meltano/meltano --version

# init a project
mkdir meltano_projects && cd meltano_projects

# replace <yourprojectname> with your own one
touch .env
docker run --rm -v "$(pwd)":/projects \
             -w /projects --env-file .env \
             meltano/meltano init <yourprojectname>

除了知曉 meltano init 之外,最好掌握 Meltano 部分命令,例如 meltano etl 表示 ETL 的執行,meltano invoke <plugin> 來調用插件命令。詳細可以參考它的速查表 https://docs.meltano.com/reference/command-line-interface

Meltano GUI 界面

Meltano 自帶一個基於 Web 的 UI,執行 ui 子命令就能啟動它:

meltano ui

它默認會跑在 http://localhost:5000 上。

針對 Docker 的運行環境,在暴露 5000 端口的情況下運行容器即可。由於容器的默認命令已經是 meltano ui,所以 run 的命令只需:

docker run -v "$(pwd)":/project \
             -w /project \
             -p 5000:5000 \
             meltano/meltano

Meltano 項目示例

GitHub 用户 Pat Nadolny 在開源項目 singer_dbt_jaffle 中做了很好的示例。他採用 dbt 的 Meltano 示例數據集,利用 Airflow 編排 ETL 任務

不只這樣,他還有利用 Superset 的例子,見 jaffle_superset

前人種樹我們來吃果,按照 Pat Nadolny 的實踐,我們可以這樣地運行數據管道(pipeline):

  • tap-CSV(Singer)從 CSV 文件中提取數據
  • target-postgres(Singer) 將數據加載到 Postgres
  • dbt 將數據轉換為聚合表或視圖

注意,上面我們已經啟動了 Postgres,可以跳過容器啟動 Postgres 這步。

操作過程是:

git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/singer_dbt_jaffle/

meltano install
touch .env
echo PG_PASSWORD="lineage_ref" >> .env
echo PG_USERNAME="lineage_ref" >> .env

# Extract and Load(with Singer)
meltano run tap-csv target-postgres

# Trasnform(with dbt)
meltano run dbt:run

# Generate dbt docs
meltano invoke dbt docs generate

# Serve generated dbt docs
meltano invoke dbt docs to serve

# Then visit http://localhost:8080

現在,我們可以連接到 Postgres 來查看加載和轉換後的數據預覽。如下所示,截圖來自 VS Code 的 SQLTool。

payments 表裏長這樣子:

搭一個 BI Dashboard 系統

現在,我們的數據倉庫有數據了。接下來,可以試着用下這些數據。

像儀表盤 Dashbaord 這樣的 BI 工具能幫我們從數據中獲得有用的洞察。使用可視化工具 Apache Superset 可以很容易地創建和管理這些基於數據源的 Dashboard 和各式各樣的圖表。

本章的重點不在於 Apache Superset 本身,所以,咱們還是複用 Pat Nadolny 的 jaffle_superset 例子。

Bootstrap Meltano 和 Superset

創建一個安裝了 Meltano 的 Python VENV:

mkdir .venv
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano

參考 Pat 的小抄,做一些細微的調整:

克隆 repo,進入 jaffle_superset 項目:

git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/jaffle_superset/

修改 meltano 配置文件,讓 Superset 連接到我們創建的 Postgres:

vim meltano_projects/jaffle_superset/meltano.yml

這裏,我將主機名更改為“10.1.1.111”,這是我當前主機的 IP。如果你是 Windows 或者 macOS 上的 Docker Desktop,這裏不要修改主機名,否則就要和我一樣手動改成實際地址:

--- a/meltano_projects/jaffle_superset/meltano.yml
+++ b/meltano_projects/jaffle_superset/meltano.yml
@@ -71,7 +71,7 @@ plugins:
               A list of database driver dependencies can be found here https://superset.apache.org/docs/databases/installing-database-drivers
     config:
       database_name: my_postgres
-      sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@host.docker.internal:${PG_PORT}/${PG_DATABASE}
+      sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@10.1.1.168:${PG_PORT}/${PG_DATABASE}
       tables:
       - model.my_meltano_project.customers
       - model.my_meltano_project.orders

添加 Postgres 登錄的信息到 .env 文件:

echo PG_USERNAME=lineage_ref >> .env
echo PG_PASSWORD=lineage_ref >> .env

安裝 Meltano 項目,運行 ETL 任務:

meltano install
meltano run tap-csv target-postgres dbt:run

調用、啟動 Superset,這裏注意 ui 不是 meltano 的內部命令,而是一個配置進去的自定義行為(user-defined action):

meltano invoke superset:ui

在另一個命令行終端執行自定義的命令 load_datasources

meltano invoke superset:load_datasources

通過瀏覽器訪問 http://localhost:8088/ 就是 Superset 的圖形界面了:

創建一個 Dashboard

現在,我們站在 Meltano、Postgres 的肩膀上,用 ETL 數據建一個 Dashboard 吧:

點擊 + DASHBOARD,填寫儀表盤名稱,再先後點擊 SAVE+ CREATE A NEW CHART

在新圖表(Create a new chart)視圖中,選擇圖表類型和數據集。在這裏,我選擇了 orders 表作為數據源和 Pie Chart 圖表類型:

點擊 CREATE NEW CHART 後,在圖表定義視圖中選擇 “status” 的 “Query” 為 “DIMENSIONS”,“COUNT(amount)” 為 “METRIC”。至此,咱們就可以看到每個訂單狀態分佈的餅圖了。

點擊 SAVE,系統會詢問應該將此圖表添加到哪個 Dashboard。選擇後,單擊 SAVE & GO TO DASHBOARD

在 Dashboard 中,我們可以看到所有的圖表。這不,你可以看到我額外添加的、用來顯示客户訂單數量分佈的圖表:

··· 能看到刷新率設置、下載渲染圖等其他的功能。

現在,我們有一個簡單但典型的 HomeLAB 數據技術棧了,並且所有東西都是開源的!

想象一下,我們在 CSV 中有 100 個數據集,在數據倉庫中有 200 個表,並且有幾個數據工程師在運行不同的項目,這些項目使用、生成不同的應用與服務、Dashbaord 和數據庫。當有人想要查找、發現或者修改其中的一些表、數據集、Dashbaord 和管道,在溝通和工程方面可能都是非常不好管理的。

上面我們提到,這個示例項目的主要功能是元數據發現系統

元數據發現系統

現在,需要我們部署一個帶有 NebulaGraph 和 Elasticsearch 的 Amundsen。有了 Amundsen,我們可以在一個地方發現和管理整個數據棧中的所有元數據。

Amundsen 主要有兩個部分組成:

它的工作原理是:利用 Databuilder 提取不同數據源的元數據,並將元數據持久化到 Metadata ServiceSearch Service 中,用户從 Frontend ServiceMetadata Service 的 API 獲取數據。

部署 Amundsen

元數據服務 Metadata Service

我們用 docker-compose 部署一個 Amundsen 集羣。由於 Amundsen 對 NebulaGraph 後端的支持 pr#1817 尚未合併,還不能用官方的代碼。這裏,先用我的 fork 版本。

先克隆包含所有子模塊的 repo:

git clone -b amundsen_nebula_graph --recursive [email protected]:wey-gu/amundsen.git
cd amundsen

啟動所有目錄服務 catalog services 及其後端存儲:

docker-compose -f docker-amundsen-nebula.yml up

由於這個 docker-compose 文件是供開發人員試玩、調試 Amundsen 用的,而不是給生產部署準備的,它在啟動的時候會從代碼庫構建鏡像,第一次跑的時候啟動會慢一些。

部署好了之後,我們使用 Databuilder 將一些示例、虛構的數據加載存儲裏。

抓取元數據 Databuilder

Amundsen Databuilder 就像 Meltano 系統一樣,只不過是用在元數據的上的 ETL ,它把元數據加載到 Metadata ServiceSearch Service 的後端存儲:NebulaGraph 和 Elasticsearch 裏。這裏的 Databuilder 只是一個 Python 模塊,所有的元數據 ETL 作業可以作為腳本運行,也可以用 Apache Airflow 等 DAG 平台進行編排。

安裝 Amundsen Databuilder

cd databuilder
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install wheel
python3 -m pip install -r requirements.txt
python3 setup.py install

調用這個示例數據構建器 ETL 腳本來把示例的虛擬數據導進去。

python3 example/scripts/sample_data_loader_nebula.py
驗證一下 Amundsen

在訪問 Amundsen 之前,我們需要創建一個測試用户:

# run a container with curl attached to amundsenfrontend
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot

# Create a user with id test_user_id
curl -X PUT -v http://amundsenmetadata:5002/user \
    -H "Content-Type: application/json" \
    --data \
    '{"user_id":"test_user_id","first_name":"test","last_name":"user", "email":"[email protected]"}'

exit

然後我們可以在 http://localhost:5000 查看 UI 並嘗試搜索 test,它應該會返回一些結果。

然後,可以單擊並瀏覽在 sample_data_loader_nebula.py 期間加載到 Amundsen 的那些示例元數據。

此外,我們還可以通過 NebulaGraph Studio 的地址 http://localhost:7001 訪問 NebulaGraph 裏的這些數據。

下圖顯示了有關 Amundsen 組件的更多詳細信息:

       ┌────────────────────────┐ ┌────────────────────────────────────────┐
       │ Frontend:5000          │ │ Metadata Sources                       │
       ├────────────────────────┤ │ ┌────────┐ ┌─────────┐ ┌─────────────┐ │
       │ Metaservice:5001       │ │ │        │ │         │ │             │ │
       │ ┌──────────────┐       │ │ │ Foo DB │ │ Bar App │ │ X Dashboard │ │
  ┌────┼─┤ Nebula Proxy │       │ │ │        │ │         │ │             │ │
  │    │ └──────────────┘       │ │ │        │ │         │ │             │ │
  │    ├────────────────────────┤ │ └────────┘ └─────┬───┘ └─────────────┘ │
┌─┼────┤ Search searvice:5002   │ │                  │                     │
│ │    └────────────────────────┘ └──────────────────┼─────────────────────┘
│ │    ┌─────────────────────────────────────────────┼───────────────────────┐
│ │    │                                             │                       │
│ │    │ Databuilder     ┌───────────────────────────┘                       │
│ │    │                 │                                                   │
│ │    │ ┌───────────────▼────────────────┐ ┌──────────────────────────────┐ │
│ │ ┌──┼─► Extractor of Sources           ├─► nebula_search_data_extractor │ │
│ │ │  │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │  │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │  │ │ Loader filesystem_csv_nebula   │ │ Loader Elastic FS loader     │ │
│ │ │  │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │  │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │  │ │ Publisher nebula_csv_publisher │ │ Publisher Elasticsearch      │ │
│ │ │  │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │  └─────────────────┼─────────────────────────────────┼─────────────────┘
│ │ └────────────────┐   │                                 │
│ │    ┌─────────────┼───►─────────────────────────┐ ┌─────▼─────┐
│ │    │ NebulaGraph │   │                         │ │           │
│ └────┼─────┬───────┴───┼───────────┐     ┌─────┐ │ │           │
│      │     │           │           │     │MetaD│ │ │           │
│      │ ┌───▼──┐    ┌───▼──┐    ┌───▼──┐  └─────┘ │ │           │
│ ┌────┼─►GraphD│    │GraphD│    │GraphD│          │ │           │
│ │    │ └──────┘    └──────┘    └──────┘  ┌─────┐ │ │           │
│ │    │ :9669                             │MetaD│ │ │  Elastic  │
│ │    │ ┌────────┐ ┌────────┐ ┌────────┐  └─────┘ │ │  Search   │
│ │    │ │        │ │        │ │        │          │ │  Cluster  │
│ │    │ │StorageD│ │StorageD│ │StorageD│  ┌─────┐ │ │  :9200    │
│ │    │ │        │ │        │ │        │  │MetaD│ │ │           │
│ │    │ └────────┘ └────────┘ └────────┘  └─────┘ │ │           │
│ │    ├───────────────────────────────────────────┤ │           │
│ └────┤ Nebula Studio:7001                        │ │           │
│      └───────────────────────────────────────────┘ └─────▲─────┘
└──────────────────────────────────────────────────────────┘

穿針引線:元數據發現

設置好基本環境後,讓我們把所有東西穿起來。還記得我們有 ELT 一些數據到 PostgreSQL 嗎?

那麼,我們如何讓 Amundsen 發現這些數據和 ETL 的元數據呢?

提取 Postgres 元數據

我們從數據源開始:首先是 Postgres。

我們為 Python3 安裝 Postgres 客户端:

sudo apt-get install libpq-dev
pip3 install Psycopg2

執行 Postgres 元數據 ETL

運行一個腳本來解析 Postgres 元數據:

export CREDENTIALS_POSTGRES_USER=lineage_ref
export CREDENTIALS_POSTGRES_PASSWORD=lineage_ref
export CREDENTIALS_POSTGRES_DATABASE=warehouse

python3 example/scripts/sample_postgres_loader_nebula.py

我們看看把 Postgres 元數據加載到 NebulaGraph 的示例腳本的代碼,非常簡單直接:

# part 1: PostgresMetadata --> CSV --> NebulaGraph
job = DefaultJob(
      conf=job_config,
      task=DefaultTask(
          extractor=PostgresMetadataExtractor(),
          loader=FsNebulaCSVLoader()),
      publisher=NebulaCsvPublisher())

...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

第一個工作路徑是:PostgresMetadata --> CSV --> NebulaGraph

第二個工作路徑是:Metadata stored in NebulaGraph --> Elasticsearch

  • NebulaSearchDataExtractor 用於獲取存儲在 NebulaGraph 中的元數據
  • SearchMetadatatoElasticasearchTask 用於使 Elasticsearch 對元數據進行索引。

請注意,在生產環境中,我們可以在腳本中或使用 Apache Airflow 等編排平台觸發這些作業。

驗證 Postgres 中元數據的獲取

搜索 payments 或者直接訪問 http://localhost:5000/table_detail/warehouse/postgres/public/payments,你可以看到我們 Postgres 的元數據,比如:

像上面的屏幕截圖一樣,我們可以輕鬆完成元數據管理操作,如:添加標籤、所有者和描述。

提取 dbt 元數據

其實,我們也可以從 dbt 本身提取元數據。

Amundsen DbtExtractor 會解析 catalog.jsonmanifest.json 文件並將元數據加載到 Amundsen 存儲,這裏當然指的是 NebulaGraph 和 Elasticsearch。

在上面的 Meltano 章節中,我們已經使用 meltano invoke dbt docs generate 生成了這個文件:

14:23:15  Done.
14:23:15  Building catalog
14:23:15  Catalog written to /home/ubuntu/ref-data-lineage/meltano_example_implementations/meltano_projects/singer_dbt_jaffle/.meltano/transformers/dbt/target/catalog.json

dbt 元數據 ETL 的執行

我們試着解析示例 dbt 文件中的元數據吧:

$ ls -l example/sample_data/dbt/
total 184
-rw-rw-r-- 1 w w   5320 May 15 07:17 catalog.json
-rw-rw-r-- 1 w w 177163 May 15 07:17 manifest.json

我寫的這個示例的加載例子如下:

python3 example/scripts/sample_dbt_loader_nebula.py

其中主要的代碼如下:

# part 1: dbt manifest --> CSV --> NebulaGraph
job = DefaultJob(
      conf=job_config,
      task=DefaultTask(
          extractor=DbtExtractor(),
          loader=FsNebulaCSVLoader()),
      publisher=NebulaCsvPublisher())

...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

它和 Postgres 元數據 ETL 的唯一區別是 extractor=DbtExtractor(),它帶有以下配置以獲取有關 dbt 項目的以下信息:

  • 數據庫名稱
  • 目錄_json
  • manifest_json
job_config = ConfigFactory.from_dict({
  'extractor.dbt.database_name': database_name,
  'extractor.dbt.catalog_json': catalog_file_loc,  # File
  'extractor.dbt.manifest_json': json.dumps(manifest_data),  # JSON Dumped objecy
  'extractor.dbt.source_url': source_url})

驗證 dbt 抓取結果

搜索 dbt_demo 或者直接訪問 http://localhost:5000/table_detail/dbt_demo/snowflake/public/raw_inventory_value,可以看到

這裏給一個小提示,其實,我們可以選擇啟用 DEBUG log 級別去看已發送到 Elasticsearch 和 NebulaGraph 的內容。

- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.DEBUG)

或者,在 NebulaGraph Studio 中探索導入的數據:

先點擊 Start with Vertices,並填寫頂點 vid:snowflake://dbt_demo.public/fact_warehouse_inventory

我們可以看到頂點顯示為粉紅色的點。再讓我們修改下 Expand / ”拓展“選項:

  • 方向:雙向
  • 步數:單向、三步

並雙擊頂點(點),它將雙向拓展 3 步:

像截圖展示的那般,在可視化之後的圖數據庫中,這些元數據可以很容易被查看、分析,並從中獲得洞察。

而且,我們在 NebulaGraph Studio 中看到的同 Amundsen 元數據服務的數據模型相呼應:

最後,請記住我們曾利用 dbt 來轉換 Meltano 中的一些數據,並且清單文件路徑是 .meltano/transformers/dbt/target/catalog.json,你可以嘗試創建一個數據構建器作業來導入它。

提取 Superset 中的元數據

Amundsen 的 Superset Extractor 可以獲取

來,現在試試提取之前創建的 Superset Dashboard 的元數據。

Superset 元數據 ETL 的執行

下邊執行的示例 Superset 提取腳本可以獲取數據並將元數據加載到 NebulaGraph 和 Elasticsearch 中。

python3 sample_superset_data_loader_nebula.py

如果我們將日誌記錄級別設置為 DEBUG,我們實際上可以看到這些中間的過程日誌:

# fetching metadata from superset
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /api/v1/security/login HTTP/1.1" 200 280
INFO:databuilder.task.task:Running a task
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 308 374
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard/?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 200 1058
...

# insert Dashboard

DEBUG:databuilder.publisher.nebula_csv_publisher:Query: INSERT VERTEX `Dashboard` (`dashboard_url`, `name`, published_tag, publisher_last_updated_epoch_ms) VALUES  "superset_dashboard://my_cluster.1/3":("http://localhost:8088/superset/dashboard/3/","my_dashboard","unique_tag",timestamp());
...

# insert a DASHBOARD_WITH_TABLE relationship/edge

INFO:databuilder.publisher.nebula_csv_publisher:Importing data in edge files: ['/tmp/amundsen/dashboard/relationships/Dashboard_Table_DASHBOARD_WITH_TABLE.csv']
DEBUG:databuilder.publisher.nebula_csv_publisher:Query:
INSERT edge `DASHBOARD_WITH_TABLE` (`END_LABEL`, `START_LABEL`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/orders":("Table","Dashboard","unique_tag", timestamp()), "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/customers":("Table","Dashboard","unique_tag", timestamp());

驗證 Superset Dashboard 元數據

通過在 Amundsen 中搜索它,我們現在可以獲得 Dashboard 信息。

我們也可以從 NebulaGraph Studio 進行驗證。

注:可以參閲 Dashboard 抓取指南中的 Amundsen Dashboard 圖建模:

用 Superset 預覽數據

Superset 可以用來預覽表格數據,文檔可以參考 https://www.amundsen.io/amundsen/frontend/docs/configuration/#preview-client,其中 /superset/sql_json/ 的 API 被 Amundsen Frontend Service 調用,取得預覽信息。

開啟數據血緣信息

默認情況下,數據血緣是關閉的,我們可以通過以下方式啟用它:

第一步,cd 到 Amundsen 代碼倉庫下,這也是我們運行 docker-compose -f docker-amundsen-nebula.yml up 命令的地方:

cd amundsen

第二步,修改 frontend 下的 TypeScript 配置

--- a/frontend/amundsen_application/static/js/config/config-default.ts
+++ b/frontend/amundsen_application/static/js/config/config-default.ts
   tableLineage: {
-    inAppListEnabled: false,
-    inAppPageEnabled: false,
+    inAppListEnabled: true,
+    inAppPageEnabled: true,
     externalEnabled: false,
     iconPath: 'PATH_TO_ICON',
     isBeta: false,

第三步,重新構建 Docker 鏡像,其中將重建前端圖像。

docker-compose -f docker-amundsen-nebula.yml build

第四步,重新運行 up -d 以確保前端用新的配置:

docker-compose -f docker-amundsen-nebula.yml up -d

結果大概長這樣子:

$ docker-compose -f docker-amundsen-nebula.yml up -d
...
Recreating amundsenfrontend           ... done

之後,我們可以訪問 http://localhost:5000/lineage/table/gold/hive/test_schema/test_table1 看到 Lineage (beta) 血緣按鈕已經顯示出來了:

我們可以點擊 Downstream 查看該表的下游資源:

或者點擊血緣按鈕查看血緣的圖表式:

也有用於血緣查詢的 API。

下面這個例子中,我們用 cURL 調用下這個 API:

docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot

curl "http://amundsenmetadata:5002/table/snowflake://dbt_demo.public/raw_inventory_value/lineage?depth=3&direction=both"

上面的 API 調用是查詢上游和下游方向的 linage,表 snowflake://dbt_demo.public/raw_inventory_value 的深度為 3。

結果應該是這樣的:

{
    "depth": 3,
    "downstream_entities": [
        {
            "level": 2,
            "usage": 0,
            "key": "snowflake://dbt_demo.public/fact_daily_expenses",
            "parent": "snowflake://dbt_demo.public/fact_warehouse_inventory",
            "badges": [],
            "source": "snowflake"
        },
        {
            "level": 1,
            "usage": 0,
            "key": "snowflake://dbt_demo.public/fact_warehouse_inventory",
            "parent": "snowflake://dbt_demo.public/raw_inventory_value",
            "badges": [],
            "source": "snowflake"
        }
    ],
    "key": "snowflake://dbt_demo.public/raw_inventory_value",
    "direction": "both",
    "upstream_entities": []
}

實際上,這個血緣數據就是在我們的 dbtExtractor 執行期間提取和加載的,其中 extractor .dbt.{DbtExtractor.EXTRACT_LINEAGE} 默認為 true,因此,創建了血緣元數據並將其加載到了 Amundsen。

在 NebulaGraph 中洞察血緣

使用圖數據庫作為元數據存儲的兩個優點是:

圖查詢本身是一個靈活的 DSL for lineage API,例如,這個查詢幫助我們執行 Amundsen 元數據 API 的等價的查詢:

MATCH p=(t:`Table`) -[:`HAS_UPSTREAM`|:`HAS_DOWNSTREAM` *1..3]->(x)
WHERE id(t) == "snowflake://dbt_demo.public/raw_inventory_value" RETURN p

來,在 NebulaGraph Studio 或者 Explorer 的控制枱中查詢下:

​渲染下這個結果:

提取數據血緣

這些血緣信息是需要我們明確指定、獲取的,獲取的方式可以是自己寫 Extractor,也可以是用已有的方式。比如:dbt 的 Extractor 和 Open Lineage 項目的 Amundsen Extractor。

通過 dbt

這個在剛才已經展示過了,dbt 的 Extractor 會從表級別獲取血緣同其他 dbt 中產生的元數據信息一起被拿到。

通過 Open Lineage

Amundsen 中的另一個開箱即用的血緣 Extractor 是 OpenLineageTableLineageExtractor

Open Lineage 是一個開放的框架,可以將不同來源的血統數據收集到一個地方,它可以將血統信息輸出為 JSON 文件,參見文檔 https://www.amundsen.io/amundsen/databuilder/#openlineagetablelineageextractor

下邊是它的 Amundsen Databuilder 例子:

dict_config = {
    # ...
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.CLUSTER_NAME}': 'datalab',
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.OL_DATASET_NAMESPACE_OVERRIDE}': 'hive_table',
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION}': 'input_dir/openlineage_nd.json',
}
...

task = DefaultTask(
    extractor=OpenLineageTableLineageExtractor(),
    loader=FsNebulaCSVLoader())

回顧

整套元數據治理/發現的方案思路如下:

  • 將整個數據技術棧中的組件作為元數據源(從任何數據庫、數倉,到 dbt、Airflow、Openlineage、Superset 等各級項目)
  • 使用 Databuilder(作為腳本或 DAG)運行元數據 ETL,以使用 NebulaGraph 和 Elasticsearch 存儲和索引
  • 從前端 UI(使用 Superset 預覽)或 API 去使用、消費、管理和發現元數據
  • 通過查詢和 UI 對 NebulaGraph,我們可以獲得更多的可能性、靈活性和數據、血緣的洞察

涉及到的開源

此參考項目中使用的所有項目都按字典順序在下面列出。

  • Amundsen
  • Apache Airflow
  • Apache Superset
  • dbt
  • Elasticsearch
  • meltano
  • NebulaGraph
  • Open Lineage
  • Singer

謝謝你讀完本文 (///▽///)

要來近距離體驗一把圖數據庫嗎?現在可以用用 NebulaGraph Cloud 來搭建自己的圖數據系統喲,快來節省大量的部署安裝時間來搞定業務吧~ NebulaGraph 阿里雲計算巢現 30 天免費使用中,點擊鏈接來用用圖數據庫吧~

想看源碼的小夥伴可以前往 GitHub 閲讀、使用、(^з^)-☆ star 它 -> GitHub;和其他的 NebulaGraph 用户一起交流圖數據庫技術和應用技能,留下「你的名片」一起玩耍呢~