如何輕鬆做數據治理?開源技術棧告訴你答案
搭建一套數據治理體系耗時耗力,但或許我們沒有必要從頭開始搞自己的數據血緣項目。本文分享如何用開源、現代的 DataOps、ETL、Dashboard、元數據、數據血緣管理系統構建大數據治理基礎設施。
元數據治理系統
元數據治理系統是一個提供了所有數據在哪、格式化方式、生成、轉換、依賴、呈現和所屬的一站式視圖。
元數據治理系統是所有數據倉庫、數據庫、表、儀表板、ETL 作業等的目錄接口(catalog),有了它,我們就不用在羣裏喊“大家好,我可以更改這個表的 schema 嗎?”、 “請問誰知道我如何找到 table-view-foo-bar 的原始數據?”…一個成熟的數據治理方案中的元數據治理系統,對數據團隊來説非常必要。
而數據血緣則是元數據治理系統眾多需要管理的元數據之一,例如,某些 Dashboard 是某一個 Table View 的下游,而這個 Table View 又是從另外兩個上游表 JOIN 而來。顯然,應該清晰地掌握、管理這些信息,去構建一個可信、可控的系統和數據質量控制體系。
數據治理的可行方案
數據治理方案設計
元數據和數據血緣本質上非常適合採用圖數據建模、圖數據庫。因為數據治理涉及的典型查詢便是面向圖關係的查詢,像“查找指定組件(即表)的所有 n 度(深度)的數據血緣”就是圖查詢語句 FIND ALL PATH
跑起來的事。從日常大家在論壇、微信羣裏討論的查詢和圖建模來看,NebulaGraph 社區很多人在從零開始搭建數據血緣系統,而這些工作看起來大多是在重複造輪子,而且還是不容易造的輪子。
既然如此,前人種樹後人乘涼,這裏我決定搭建一個完備、端到端(不只有元數據管理)的數據系統,供大家參考解決數據血緣、數據治理問題。這個套數據系統會採用市面上優秀的開源項目,而圖數據庫這塊還是採用大家的老朋友——NebulaGraph。希望對大家能有所啟發,在此基礎之上擁有一個相對完善的圖模型,以及設計精巧、開箱即用的元數據治理系統。
下面,來看看元數據治理系統的輪子都需要哪些功能組件:
- 元數據抽取
- 這部分需要從不同的數據棧拉/推數據,像是從數據庫、數倉、Dashboard,甚至是 ETL Pipeline 和應用、服務中搞數據。
- 元數據存儲
- 可以存在數據庫、圖數據庫裏,甚至存成超大的 JSON manifest 文件都行
- 元數據目錄接口系統 Catalog
- 提供 API / GUI 來讀寫元數據和數據血緣系統
下圖是整個方案的簡單示意圖:
其中,上面的虛線框是元數據的來源與導入、下面的虛線框是元數據的存儲與展示、發現。
開源技術棧
下面,介紹下數據治理系統的每個部分。
數據庫和數倉
為了處理和使用原始和中間數據,這裏一定涉及至少一個數據庫或者數倉。它可以是 Hive、Apache Delta、TiDB、Cassandra、MySQL 或 Postgres。
在這個參考項目中,我們選一個簡單、流行的 Postgres。
✓ 數據倉庫:Postgres
數據運維 DataOps
我們應該有某種 DataOps 的方案,讓 Pipeline 和環境具有可重複性、可測試性和版本控制性。
在這裏,我們使用了 GitLab 創建的 Meltano。
Meltano 是一個 just-work 的 DataOps 平台,它可以用巧妙且優雅的方式將 Singer 作為 EL 和 dbt 作為 T 連接起來。此外,它還連接到其他一些 dataInfra 實用程序,例如 Apache Superset 和 Apache Airflow 等。
至此,我們又納入了一個成員:
✓ GitOps:Meltano https://gitlab.com/meltano/meltano
ETL 工具
上面我們提到過組合 Singer 與 Meltano 將來自許多不同數據源的數據 E(提取)和 L(加載)數據目標,並使用 dbt 作為 Transform 的平台。於是我們得到:
✓ EL:Singer ✓ T: dbt
數據可視化
在數據之上創建 Dashboard、圖表和表格得到數據的洞察是很符合直覺的,類似大數據之上的 Excel 圖標功能。
Apache Superset 是我很喜歡的開源數據可視化項目,我準備用它來作為被治理管理的目標之一。同時,還會利用它實現可視化功能來完成元數據洞察。於是,
✓ Dashboard:Apache Superset
任務編排(DAG Job Orchestration)
在大多數情況下,我們的 DataOps 作業、任務會演變成需要編排系統的規模,我們可以用 Apache Airflow 來負責這一塊。
✓ DAG:Apache Airflow https://airflow.apache.org/
元數據治理
隨着越來越多的組件和數據被引入數據基礎設施,在數據庫、表、數據建模(schema)、Dashboard、DAG(編排系統中的有向無環圖)、應用與服務的各個生命週期階段中都將存着海量的元數據,需要對它們的管理員和團隊進行協同管理、連接和發現。
Linux Foundation Amundsen 是解決該問題的最佳項目之一。 Amundsen 用圖數據庫為事實源(single source of truth)以加速多跳查詢,Elasticsearch 為全文搜索引擎。它在順滑地處理所有元數據及其血緣之餘,還提供了優雅的 UI 和 API。 Amundsen 支持多種圖數據庫為後端,這裏咱們用 NebulaGraph。
現在的技術棧:
✓ 數據發現:Linux Foundation Amundsen ✓ 全文搜索:Elasticsearch ✓ 圖數據庫:NebulaGraph
好的,所有組件都齊正了,開始組裝它們吧。
環境搭建與各組件初識
本次實踐的項目方案已開源,你可以訪問 https://github.com/wey-gu/data-lineage-ref-solution 來獲得對應的代碼。
整個實踐過程,我遵循了儘量乾淨、鼓勵共建的原則。項目預設在一個 unix-like 系統上運行,且聯網和裝有 Docker-Compose。
這裏,我將在 Ubuntu 20.04 LTS X86_64 上運行它,當然在其他發行版或 Linux 版本上應該也沒有問題。
運行一個數倉、數據庫
首先,安裝 Postgres 作為我們的數倉。
這個單行命令會創建一個使用 Docker 在後台運行的 Postgres,進程關閉之後容器不會殘留而是被清理掉(因為參數--rm
)。
docker run --rm --name postgres \
-e POSTGRES_PASSWORD=lineage_ref \
-e POSTGRES_USER=lineage_ref \
-e POSTGRES_DB=warehouse -d \
-p 5432:5432 postgres
我們可以用 Postgres CLI 或 GUI 客户端來驗證命令是否執行成功。
DataOps 工具鏈部署
接下來,安裝有機結合了 Singer 和 dbt 的 Meltano。
Meltano 幫助我們管理 ETL 工具(作為插件)及其所有配置和 pipeline。這些元信息位於 Meltano 配置及其系統數據庫中,其中配置是基於文件的(可以使用 GitOps 管理),它的默認系統數據庫是 SQLite。
安裝 Meltano
使用 Meltano 的工作流是啟動一個“meltano 項目”並開始將 E、L 和 T 添加到配置文件中。Meltano 項目的啟動只需要一個 CLI 命令 meltano init yourprojectname
。不過,在那之前,先用 Python 的包管理器 pip 或者 Docker 鏡像安裝 Meltano,像我示範的這樣:
在 Python 虛擬環境中使用 pip 安裝 Meltano:
mkdir .venv
# example in a debian flavor Linux distro
sudo apt-get install python3-dev python3-pip python3-venv python3-wheel -y
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano
# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
meltano init <yourprojectname>
或者,用 Docker 容器安裝 Meltano:
docker pull meltano/meltano:latest
docker run --rm meltano/meltano --version
# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
docker run --rm -v "$(pwd)":/projects \
-w /projects --env-file .env \
meltano/meltano init <yourprojectname>
除了知曉 meltano init
之外,最好掌握 Meltano 部分命令,例如 meltano etl
表示 ETL 的執行,meltano invoke <plugin>
來調用插件命令。詳細可以參考它的速查表 https://docs.meltano.com/reference/command-line-interface。
Meltano GUI 界面
Meltano 自帶一個基於 Web 的 UI,執行 ui
子命令就能啟動它:
meltano ui
它默認會跑在 http://localhost:5000
上。
針對 Docker 的運行環境,在暴露 5000 端口的情況下運行容器即可。由於容器的默認命令已經是 meltano ui
,所以 run
的命令只需:
docker run -v "$(pwd)":/project \
-w /project \
-p 5000:5000 \
meltano/meltano
Meltano 項目示例
GitHub 用户 Pat Nadolny 在開源項目 singer_dbt_jaffle 中做了很好的示例。他採用 dbt 的 Meltano 示例數據集,利用 Airflow 編排 ETL 任務。
不只這樣,他還有利用 Superset 的例子,見 jaffle_superset。
前人種樹我們來吃果,按照 Pat Nadolny 的實踐,我們可以這樣地運行數據管道(pipeline):
- tap-CSV(Singer)從 CSV 文件中提取數據
- target-postgres(Singer) 將數據加載到 Postgres
- dbt 將數據轉換為聚合表或視圖
注意,上面我們已經啟動了 Postgres,可以跳過容器啟動 Postgres 這步。
操作過程是:
git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/singer_dbt_jaffle/
meltano install
touch .env
echo PG_PASSWORD="lineage_ref" >> .env
echo PG_USERNAME="lineage_ref" >> .env
# Extract and Load(with Singer)
meltano run tap-csv target-postgres
# Trasnform(with dbt)
meltano run dbt:run
# Generate dbt docs
meltano invoke dbt docs generate
# Serve generated dbt docs
meltano invoke dbt docs to serve
# Then visit http://localhost:8080
現在,我們可以連接到 Postgres 來查看加載和轉換後的數據預覽。如下所示,截圖來自 VS Code 的 SQLTool。
payments 表裏長這樣子:
搭一個 BI Dashboard 系統
現在,我們的數據倉庫有數據了。接下來,可以試着用下這些數據。
像儀表盤 Dashbaord 這樣的 BI 工具能幫我們從數據中獲得有用的洞察。使用可視化工具 Apache Superset 可以很容易地創建和管理這些基於數據源的 Dashboard 和各式各樣的圖表。
本章的重點不在於 Apache Superset 本身,所以,咱們還是複用 Pat Nadolny 的 jaffle_superset 例子。
Bootstrap Meltano 和 Superset
創建一個安裝了 Meltano 的 Python VENV:
mkdir .venv
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano
參考 Pat 的小抄,做一些細微的調整:
克隆 repo,進入 jaffle_superset
項目:
git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/jaffle_superset/
修改 meltano 配置文件,讓 Superset 連接到我們創建的 Postgres:
vim meltano_projects/jaffle_superset/meltano.yml
這裏,我將主機名更改為“10.1.1.111”,這是我當前主機的 IP。如果你是 Windows 或者 macOS 上的 Docker Desktop,這裏不要修改主機名,否則就要和我一樣手動改成實際地址:
--- a/meltano_projects/jaffle_superset/meltano.yml
+++ b/meltano_projects/jaffle_superset/meltano.yml
@@ -71,7 +71,7 @@ plugins:
A list of database driver dependencies can be found here https://superset.apache.org/docs/databases/installing-database-drivers
config:
database_name: my_postgres
- sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@host.docker.internal:${PG_PORT}/${PG_DATABASE}
+ sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@10.1.1.168:${PG_PORT}/${PG_DATABASE}
tables:
- model.my_meltano_project.customers
- model.my_meltano_project.orders
添加 Postgres 登錄的信息到 .env
文件:
echo PG_USERNAME=lineage_ref >> .env
echo PG_PASSWORD=lineage_ref >> .env
安裝 Meltano 項目,運行 ETL 任務:
meltano install
meltano run tap-csv target-postgres dbt:run
調用、啟動 Superset,這裏注意 ui
不是 meltano 的內部命令,而是一個配置進去的自定義行為(user-defined action):
meltano invoke superset:ui
在另一個命令行終端執行自定義的命令 load_datasources
:
meltano invoke superset:load_datasources
通過瀏覽器訪問 http://localhost:8088/
就是 Superset 的圖形界面了:
創建一個 Dashboard
現在,我們站在 Meltano、Postgres 的肩膀上,用 ETL 數據建一個 Dashboard 吧:
點擊 + DASHBOARD
,填寫儀表盤名稱,再先後點擊 SAVE
和 + CREATE A NEW CHART
:
在新圖表(Create a new chart)視圖中,選擇圖表類型和數據集。在這裏,我選擇了 orders
表作為數據源和 Pie Chart
圖表類型:
點擊 CREATE NEW CHART
後,在圖表定義視圖中選擇 “status” 的 “Query” 為 “DIMENSIONS”,“COUNT(amount)” 為 “METRIC”。至此,咱們就可以看到每個訂單狀態分佈的餅圖了。
點擊 SAVE
,系統會詢問應該將此圖表添加到哪個 Dashboard。選擇後,單擊 SAVE & GO TO DASHBOARD
。
在 Dashboard 中,我們可以看到所有的圖表。這不,你可以看到我額外添加的、用來顯示客户訂單數量分佈的圖表:
點 ···
能看到刷新率設置、下載渲染圖等其他的功能。
現在,我們有一個簡單但典型的 HomeLAB 數據技術棧了,並且所有東西都是開源的!
想象一下,我們在 CSV 中有 100 個數據集,在數據倉庫中有 200 個表,並且有幾個數據工程師在運行不同的項目,這些項目使用、生成不同的應用與服務、Dashbaord 和數據庫。當有人想要查找、發現或者修改其中的一些表、數據集、Dashbaord 和管道,在溝通和工程方面可能都是非常不好管理的。
上面我們提到,這個示例項目的主要功能是元數據發現系統。
元數據發現系統
現在,需要我們部署一個帶有 NebulaGraph 和 Elasticsearch 的 Amundsen。有了 Amundsen,我們可以在一個地方發現和管理整個數據棧中的所有元數據。
Amundsen 主要有兩個部分組成:
- 元數據導入 Metadata Ingestion
- 元數據目錄服務 Metadata Catalog
它的工作原理是:利用 Databuilder
提取不同數據源的元數據,並將元數據持久化到 Metadata Service
和 Search Service
中,用户從 Frontend Service
或 Metadata Service
的 API 獲取數據。
部署 Amundsen
元數據服務 Metadata Service
我們用 docker-compose 部署一個 Amundsen 集羣。由於 Amundsen 對 NebulaGraph 後端的支持 pr#1817 尚未合併,還不能用官方的代碼。這裏,先用我的 fork 版本。
先克隆包含所有子模塊的 repo:
git clone -b amundsen_nebula_graph --recursive [email protected]:wey-gu/amundsen.git
cd amundsen
啟動所有目錄服務 catalog services 及其後端存儲:
docker-compose -f docker-amundsen-nebula.yml up
由於這個 docker-compose 文件是供開發人員試玩、調試 Amundsen 用的,而不是給生產部署準備的,它在啟動的時候會從代碼庫構建鏡像,第一次跑的時候啟動會慢一些。
部署好了之後,我們使用 Databuilder 將一些示例、虛構的數據加載存儲裏。
抓取元數據 Databuilder
Amundsen Databuilder 就像 Meltano 系統一樣,只不過是用在元數據的上的 ETL ,它把元數據加載到 Metadata Service
和 Search Service
的後端存儲:NebulaGraph 和 Elasticsearch 裏。這裏的 Databuilder 只是一個 Python 模塊,所有的元數據 ETL 作業可以作為腳本運行,也可以用 Apache Airflow 等 DAG 平台進行編排。
cd databuilder
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install wheel
python3 -m pip install -r requirements.txt
python3 setup.py install
調用這個示例數據構建器 ETL 腳本來把示例的虛擬數據導進去。
python3 example/scripts/sample_data_loader_nebula.py
驗證一下 Amundsen
在訪問 Amundsen 之前,我們需要創建一個測試用户:
# run a container with curl attached to amundsenfrontend
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot
# Create a user with id test_user_id
curl -X PUT -v http://amundsenmetadata:5002/user \
-H "Content-Type: application/json" \
--data \
'{"user_id":"test_user_id","first_name":"test","last_name":"user", "email":"[email protected]"}'
exit
然後我們可以在 http://localhost:5000
查看 UI 並嘗試搜索 test
,它應該會返回一些結果。
然後,可以單擊並瀏覽在 sample_data_loader_nebula.py
期間加載到 Amundsen 的那些示例元數據。
此外,我們還可以通過 NebulaGraph Studio 的地址 http://localhost:7001
訪問 NebulaGraph 裏的這些數據。
下圖顯示了有關 Amundsen 組件的更多詳細信息:
┌────────────────────────┐ ┌────────────────────────────────────────┐
│ Frontend:5000 │ │ Metadata Sources │
├────────────────────────┤ │ ┌────────┐ ┌─────────┐ ┌─────────────┐ │
│ Metaservice:5001 │ │ │ │ │ │ │ │ │
│ ┌──────────────┐ │ │ │ Foo DB │ │ Bar App │ │ X Dashboard │ │
┌────┼─┤ Nebula Proxy │ │ │ │ │ │ │ │ │ │
│ │ └──────────────┘ │ │ │ │ │ │ │ │ │
│ ├────────────────────────┤ │ └────────┘ └─────┬───┘ └─────────────┘ │
┌─┼────┤ Search searvice:5002 │ │ │ │
│ │ └────────────────────────┘ └──────────────────┼─────────────────────┘
│ │ ┌─────────────────────────────────────────────┼───────────────────────┐
│ │ │ │ │
│ │ │ Databuilder ┌───────────────────────────┘ │
│ │ │ │ │
│ │ │ ┌───────────────▼────────────────┐ ┌──────────────────────────────┐ │
│ │ ┌──┼─► Extractor of Sources ├─► nebula_search_data_extractor │ │
│ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │ │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │ │ │ Loader filesystem_csv_nebula │ │ Loader Elastic FS loader │ │
│ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │ │ ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ │
│ │ │ │ │ Publisher nebula_csv_publisher │ │ Publisher Elasticsearch │ │
│ │ │ │ └───────────────┬────────────────┘ └──────────────┬───────────────┘ │
│ │ │ └─────────────────┼─────────────────────────────────┼─────────────────┘
│ │ └────────────────┐ │ │
│ │ ┌─────────────┼───►─────────────────────────┐ ┌─────▼─────┐
│ │ │ NebulaGraph │ │ │ │ │
│ └────┼─────┬───────┴───┼───────────┐ ┌─────┐ │ │ │
│ │ │ │ │ │MetaD│ │ │ │
│ │ ┌───▼──┐ ┌───▼──┐ ┌───▼──┐ └─────┘ │ │ │
│ ┌────┼─►GraphD│ │GraphD│ │GraphD│ │ │ │
│ │ │ └──────┘ └──────┘ └──────┘ ┌─────┐ │ │ │
│ │ │ :9669 │MetaD│ │ │ Elastic │
│ │ │ ┌────────┐ ┌────────┐ ┌────────┐ └─────┘ │ │ Search │
│ │ │ │ │ │ │ │ │ │ │ Cluster │
│ │ │ │StorageD│ │StorageD│ │StorageD│ ┌─────┐ │ │ :9200 │
│ │ │ │ │ │ │ │ │ │MetaD│ │ │ │
│ │ │ └────────┘ └────────┘ └────────┘ └─────┘ │ │ │
│ │ ├───────────────────────────────────────────┤ │ │
│ └────┤ Nebula Studio:7001 │ │ │
│ └───────────────────────────────────────────┘ └─────▲─────┘
└──────────────────────────────────────────────────────────┘
穿針引線:元數據發現
設置好基本環境後,讓我們把所有東西穿起來。還記得我們有 ELT 一些數據到 PostgreSQL 嗎?
那麼,我們如何讓 Amundsen 發現這些數據和 ETL 的元數據呢?
提取 Postgres 元數據
我們從數據源開始:首先是 Postgres。
我們為 Python3 安裝 Postgres 客户端:
sudo apt-get install libpq-dev
pip3 install Psycopg2
執行 Postgres 元數據 ETL
運行一個腳本來解析 Postgres 元數據:
export CREDENTIALS_POSTGRES_USER=lineage_ref
export CREDENTIALS_POSTGRES_PASSWORD=lineage_ref
export CREDENTIALS_POSTGRES_DATABASE=warehouse
python3 example/scripts/sample_postgres_loader_nebula.py
我們看看把 Postgres 元數據加載到 NebulaGraph 的示例腳本的代碼,非常簡單直接:
# part 1: PostgresMetadata --> CSV --> NebulaGraph
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=PostgresMetadataExtractor(),
loader=FsNebulaCSVLoader()),
publisher=NebulaCsvPublisher())
...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)
job = DefaultJob(conf=job_config, task=task)
第一個工作路徑是:PostgresMetadata --> CSV --> NebulaGraph
PostgresMetadataExtractor
用於從 Postgres 中提取元數據,可以參考文檔 https://www.amundsen.io/amundsen/databuilder/#postgresmetadataextractor。FsNebulaCSVLoader
用於將提取的數據轉為 CSV 文件NebulaCsvPublisher
用於將元數據以 CSV 格式發佈到 NebulaGraph
第二個工作路徑是:Metadata stored in NebulaGraph --> Elasticsearch
NebulaSearchDataExtractor
用於獲取存儲在 NebulaGraph 中的元數據SearchMetadatatoElasticasearchTask
用於使 Elasticsearch 對元數據進行索引。
請注意,在生產環境中,我們可以在腳本中或使用 Apache Airflow 等編排平台觸發這些作業。
驗證 Postgres 中元數據的獲取
搜索 payments
或者直接訪問 http://localhost:5000/table_detail/warehouse/postgres/public/payments
,你可以看到我們 Postgres 的元數據,比如:
像上面的屏幕截圖一樣,我們可以輕鬆完成元數據管理操作,如:添加標籤、所有者和描述。
提取 dbt 元數據
其實,我們也可以從 dbt 本身提取元數據。
Amundsen DbtExtractor 會解析 catalog.json
或 manifest.json
文件並將元數據加載到 Amundsen 存儲,這裏當然指的是 NebulaGraph 和 Elasticsearch。
在上面的 Meltano 章節中,我們已經使用 meltano invoke dbt docs generate
生成了這個文件:
14:23:15 Done.
14:23:15 Building catalog
14:23:15 Catalog written to /home/ubuntu/ref-data-lineage/meltano_example_implementations/meltano_projects/singer_dbt_jaffle/.meltano/transformers/dbt/target/catalog.json
dbt 元數據 ETL 的執行
我們試着解析示例 dbt 文件中的元數據吧:
$ ls -l example/sample_data/dbt/
total 184
-rw-rw-r-- 1 w w 5320 May 15 07:17 catalog.json
-rw-rw-r-- 1 w w 177163 May 15 07:17 manifest.json
我寫的這個示例的加載例子如下:
python3 example/scripts/sample_dbt_loader_nebula.py
其中主要的代碼如下:
# part 1: dbt manifest --> CSV --> NebulaGraph
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=DbtExtractor(),
loader=FsNebulaCSVLoader()),
publisher=NebulaCsvPublisher())
...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)
job = DefaultJob(conf=job_config, task=task)
它和 Postgres 元數據 ETL 的唯一區別是 extractor=DbtExtractor()
,它帶有以下配置以獲取有關 dbt 項目的以下信息:
- 數據庫名稱
- 目錄_json
- manifest_json
job_config = ConfigFactory.from_dict({
'extractor.dbt.database_name': database_name,
'extractor.dbt.catalog_json': catalog_file_loc, # File
'extractor.dbt.manifest_json': json.dumps(manifest_data), # JSON Dumped objecy
'extractor.dbt.source_url': source_url})
驗證 dbt 抓取結果
搜索 dbt_demo
或者直接訪問 http://localhost:5000/table_detail/dbt_demo/snowflake/public/raw_inventory_value
,可以看到
這裏給一個小提示,其實,我們可以選擇啟用 DEBUG log 級別去看已發送到 Elasticsearch 和 NebulaGraph 的內容。
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.DEBUG)
或者,在 NebulaGraph Studio 中探索導入的數據:
先點擊 Start with Vertices
,並填寫頂點 vid:snowflake://dbt_demo.public/fact_warehouse_inventory
我們可以看到頂點顯示為粉紅色的點。再讓我們修改下 Expand
/ ”拓展“選項:
- 方向:雙向
- 步數:單向、三步
並雙擊頂點(點),它將雙向拓展 3 步:
像截圖展示的那般,在可視化之後的圖數據庫中,這些元數據可以很容易被查看、分析,並從中獲得洞察。
而且,我們在 NebulaGraph Studio 中看到的同 Amundsen 元數據服務的數據模型相呼應:
最後,請記住我們曾利用 dbt 來轉換 Meltano 中的一些數據,並且清單文件路徑是 .meltano/transformers/dbt/target/catalog.json
,你可以嘗試創建一個數據構建器作業來導入它。
提取 Superset 中的元數據
Amundsen 的 Superset Extractor 可以獲取
- Dashboard 元數據抽取,見 apache_superset_metadata_extractor.py
- 圖表元數據抽取,見 apache_superset_chart_extractor.py
- Superset 元素與數據源(表)的關係抽取,見 apache_superset_table_extractor.py
來,現在試試提取之前創建的 Superset Dashboard 的元數據。
Superset 元數據 ETL 的執行
下邊執行的示例 Superset 提取腳本可以獲取數據並將元數據加載到 NebulaGraph 和 Elasticsearch 中。
python3 sample_superset_data_loader_nebula.py
如果我們將日誌記錄級別設置為 DEBUG
,我們實際上可以看到這些中間的過程日誌:
# fetching metadata from superset
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /api/v1/security/login HTTP/1.1" 200 280
INFO:databuilder.task.task:Running a task
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 308 374
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard/?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 200 1058
...
# insert Dashboard
DEBUG:databuilder.publisher.nebula_csv_publisher:Query: INSERT VERTEX `Dashboard` (`dashboard_url`, `name`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3":("http://localhost:8088/superset/dashboard/3/","my_dashboard","unique_tag",timestamp());
...
# insert a DASHBOARD_WITH_TABLE relationship/edge
INFO:databuilder.publisher.nebula_csv_publisher:Importing data in edge files: ['/tmp/amundsen/dashboard/relationships/Dashboard_Table_DASHBOARD_WITH_TABLE.csv']
DEBUG:databuilder.publisher.nebula_csv_publisher:Query:
INSERT edge `DASHBOARD_WITH_TABLE` (`END_LABEL`, `START_LABEL`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/orders":("Table","Dashboard","unique_tag", timestamp()), "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/customers":("Table","Dashboard","unique_tag", timestamp());
驗證 Superset Dashboard 元數據
通過在 Amundsen 中搜索它,我們現在可以獲得 Dashboard 信息。
我們也可以從 NebulaGraph Studio 進行驗證。
注:可以參閲 Dashboard 抓取指南中的 Amundsen Dashboard 圖建模:
用 Superset 預覽數據
Superset 可以用來預覽表格數據,文檔可以參考 https://www.amundsen.io/amundsen/frontend/docs/configuration/#preview-client,其中 /superset/sql_json/
的 API 被 Amundsen Frontend Service
調用,取得預覽信息。
開啟數據血緣信息
默認情況下,數據血緣是關閉的,我們可以通過以下方式啟用它:
第一步,cd
到 Amundsen 代碼倉庫下,這也是我們運行 docker-compose -f docker-amundsen-nebula.yml up
命令的地方:
cd amundsen
第二步,修改 frontend 下的 TypeScript 配置
--- a/frontend/amundsen_application/static/js/config/config-default.ts
+++ b/frontend/amundsen_application/static/js/config/config-default.ts
tableLineage: {
- inAppListEnabled: false,
- inAppPageEnabled: false,
+ inAppListEnabled: true,
+ inAppPageEnabled: true,
externalEnabled: false,
iconPath: 'PATH_TO_ICON',
isBeta: false,
第三步,重新構建 Docker 鏡像,其中將重建前端圖像。
docker-compose -f docker-amundsen-nebula.yml build
第四步,重新運行 up -d
以確保前端用新的配置:
docker-compose -f docker-amundsen-nebula.yml up -d
結果大概長這樣子:
$ docker-compose -f docker-amundsen-nebula.yml up -d
...
Recreating amundsenfrontend ... done
之後,我們可以訪問 http://localhost:5000/lineage/table/gold/hive/test_schema/test_table1
看到 Lineage (beta)
血緣按鈕已經顯示出來了:
我們可以點擊 Downstream
查看該表的下游資源:
或者點擊血緣按鈕查看血緣的圖表式:
也有用於血緣查詢的 API。
下面這個例子中,我們用 cURL 調用下這個 API:
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot
curl "http://amundsenmetadata:5002/table/snowflake://dbt_demo.public/raw_inventory_value/lineage?depth=3&direction=both"
上面的 API 調用是查詢上游和下游方向的 linage,表 snowflake://dbt_demo.public/raw_inventory_value
的深度為 3。
結果應該是這樣的:
{
"depth": 3,
"downstream_entities": [
{
"level": 2,
"usage": 0,
"key": "snowflake://dbt_demo.public/fact_daily_expenses",
"parent": "snowflake://dbt_demo.public/fact_warehouse_inventory",
"badges": [],
"source": "snowflake"
},
{
"level": 1,
"usage": 0,
"key": "snowflake://dbt_demo.public/fact_warehouse_inventory",
"parent": "snowflake://dbt_demo.public/raw_inventory_value",
"badges": [],
"source": "snowflake"
}
],
"key": "snowflake://dbt_demo.public/raw_inventory_value",
"direction": "both",
"upstream_entities": []
}
實際上,這個血緣數據就是在我們的 dbtExtractor 執行期間提取和加載的,其中 extractor .dbt.{DbtExtractor.EXTRACT_LINEAGE}
默認為 true
,因此,創建了血緣元數據並將其加載到了 Amundsen。
在 NebulaGraph 中洞察血緣
使用圖數據庫作為元數據存儲的兩個優點是:
圖查詢本身是一個靈活的 DSL for lineage API,例如,這個查詢幫助我們執行 Amundsen 元數據 API 的等價的查詢:
MATCH p=(t:`Table`) -[:`HAS_UPSTREAM`|:`HAS_DOWNSTREAM` *1..3]->(x)
WHERE id(t) == "snowflake://dbt_demo.public/raw_inventory_value" RETURN p
來,在 NebulaGraph Studio 或者 Explorer 的控制枱中查詢下:
渲染下這個結果:
提取數據血緣
這些血緣信息是需要我們明確指定、獲取的,獲取的方式可以是自己寫 Extractor,也可以是用已有的方式。比如:dbt 的 Extractor 和 Open Lineage 項目的 Amundsen Extractor。
通過 dbt
這個在剛才已經展示過了,dbt 的 Extractor 會從表級別獲取血緣同其他 dbt 中產生的元數據信息一起被拿到。
通過 Open Lineage
Amundsen 中的另一個開箱即用的血緣 Extractor 是 OpenLineageTableLineageExtractor。
Open Lineage 是一個開放的框架,可以將不同來源的血統數據收集到一個地方,它可以將血統信息輸出為 JSON 文件,參見文檔 https://www.amundsen.io/amundsen/databuilder/#openlineagetablelineageextractor。
下邊是它的 Amundsen Databuilder 例子:
dict_config = {
# ...
f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.CLUSTER_NAME}': 'datalab',
f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.OL_DATASET_NAMESPACE_OVERRIDE}': 'hive_table',
f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION}': 'input_dir/openlineage_nd.json',
}
...
task = DefaultTask(
extractor=OpenLineageTableLineageExtractor(),
loader=FsNebulaCSVLoader())
回顧
整套元數據治理/發現的方案思路如下:
- 將整個數據技術棧中的組件作為元數據源(從任何數據庫、數倉,到 dbt、Airflow、Openlineage、Superset 等各級項目)
- 使用 Databuilder(作為腳本或 DAG)運行元數據 ETL,以使用 NebulaGraph 和 Elasticsearch 存儲和索引
- 從前端 UI(使用 Superset 預覽)或 API 去使用、消費、管理和發現元數據
- 通過查詢和 UI 對 NebulaGraph,我們可以獲得更多的可能性、靈活性和數據、血緣的洞察
涉及到的開源
此參考項目中使用的所有項目都按字典順序在下面列出。
- Amundsen
- Apache Airflow
- Apache Superset
- dbt
- Elasticsearch
- meltano
- NebulaGraph
- Open Lineage
- Singer
謝謝你讀完本文 (///▽///)
要來近距離體驗一把圖數據庫嗎?現在可以用用 NebulaGraph Cloud 來搭建自己的圖數據系統喲,快來節省大量的部署安裝時間來搞定業務吧~ NebulaGraph 阿里雲計算巢現 30 天免費使用中,點擊鏈接來用用圖數據庫吧~
想看源碼的小夥伴可以前往 GitHub 閲讀、使用、(^з^)-☆ star 它 -> GitHub;和其他的 NebulaGraph 用户一起交流圖數據庫技術和應用技能,留下「你的名片」一起玩耍呢~
- 圖數據庫在中國移動金融風控的落地應用
- 記一次 rr 和硬件斷點解決內存踩踏問題
- 用圖技術搞定附近好友、時空交集等 7 個典型社交網絡應用
- 用圖技術搞定附近好友、時空交集等 7 個典型社交網絡應用
- 圖數據庫中的“分佈式”和“數據切分”(切圖)
- 揭祕可視化圖探索工具 NebulaGraph Explore 是如何實現圖計算的
- 連接微信羣、Slack 和 GitHub:社區開放溝通的基礎設施搭建
- 圖數據庫認證考試 NGCP 錯題解析 vol.02:這 10 道題竟無一人全部答對
- 如何判斷多賬號是同一個人?用圖技術搞定 ID Mapping
- 複雜場景下圖數據庫的 OLTP 與 OLAP 融合實踐
- 如何運維多集羣數據庫?58 同城 NebulaGraph Database 運維實踐
- 有了 ETL 數據神器 dbt,表數據秒變 NebulaGraph 中的圖數據
- 基於圖的下一代入侵檢測系統
- 從實測出發,掌握 NebulaGraph Exchange 性能最大化的祕密
- 讀 NebulaGraph源碼 | 查詢語句 LOOKUP 的一生
- 當雲原生網關遇上圖數據庫,NebulaGraph 的 APISIX 最佳實踐
- 從全球頂級數據庫大會 SIGMOD 看數據庫發展趨勢
- 「實操」結合圖數據庫、圖算法、機器學習、GNN 實現一個推薦系統
- 如何輕鬆做數據治理?開源技術棧告訴你答案
- 圖算法、圖數據庫在風控場景的應用