深入理解Delta Lake 實現原理(on Zeppelin)
Delta Lake 已經出來很久了,看過一些介紹性的文章,也跑過一些簡單的例子。週末花時間重新整理一下Delta Lake的原理,並且在Zeppelin裡跑了一些簡單的例子,相信通過這些例子,大家可以更好的理解Delta Lake的內部實現機制,特別是Transaction Log的機制。
啟動Zeppelin Docker Container
為了能夠更直觀得展示Delta Lake的內部機制,我會用Zeppelin來跑本文全部的例子。你也可以在Zeppelin裡重現本文所有的例子。簡單說明下如何啟動Zeppelin Docker Container 來執行本文的例子:
-
步驟1. git clone https://github.com/zjffdu/zeppelin-notebook.git
-
步驟2. 下載 Spark 3.1.2 (這是本文用的Spark版本,Spark 3.2.0 還暫不支援)
-
步驟3. 執行下面的程式碼,啟動 Zeppelin Docker Container。命令裡的${zeppelin_notebook} 是你在步驟1裡clone下來的 Zeppelin notebook目錄,${spark_location} 是指你在步驟2下載的Spark資料夾路徑
docker run -u $(id -u) -p 8080:8080 -p 4040:4040 \
--rm -v ${spark_location}:/opt/spark -v ${zeppelin_notebook}:/opt/notebook \
-e ZEPPELIN_NOTEBOOK_DIR=/opt/notebook -e SPARK_HOME=/opt/spark \
-e ZEPPELIN_LOCAL_IP=0.0.0.0 --name zeppelin apache/zeppelin:0.10.0
接下來瀏覽器裡開啟 http://localhost:8080 , 開啟其中的Notebook: Spark/Deep Dive into Delta Lake
配置 Spark
這是 Deep Dive into Delta Lake 的第一個段落,用來初始化Spark Interpreter的配置。
%spark.conf 是Zeppelin裡一種特殊的用來配置Interpreter的Interpreter。這裡我按照Delta Lake的 官方教程 來配置Spark,此外還配置 spark.sql.warehouse.dir ,這樣在後面的教程裡我可以去table的指定目錄檢視變化。好,那接下來就讓我們在Zeppelin裡體驗Delta Lake吧。
建立 Delta 表
首先我們來建立一個含有2個欄位的 events 表:id 和 data.
那執行這條Sql語句 Delta 到底在後臺做了什麼呢?其實Delta就做裡 2件事:
-
在資料夾 /tmp/warehouse 下建立資料夾 events
-
生成第一個transaction log 檔案,裡面含有這張表的schema資訊
插入資料
接下來我們插入2條記錄到這個 events 表。
然後再用Select語句查詢下這張表,確認下這條insert語句是否成功。
那麼這條Insert語句背後Delta到底做了什麼呢?我們可以檢視下目錄 /tmp/warehouse/events 會發現2個變化:
-
生產了一個新的 transaction log檔案.
-
新增加了 2 個parquet 檔案
首先我們來看下這個生成的transaction log 檔案:00000000000000000001.json. 這個檔案內容很好理解, 裡面包含了這個insert語句的操作內容:Add 2個parquet檔案。這2個parquet檔案裡就是包含了這2條記錄。你可能注意到了這個transation log檔案裡沒有表的schema資訊。因為schema資訊已經存在第一個transaction log裡了(00000000000000000000.json)。
當Delta讀表的時候,他會merge所有的歷史transaction log來獲得這張表的所有資訊,包括表的schema資訊和data檔案資訊。
因為我們正好插入了2條記錄,所有我們有理由相信應該每個parquet檔案包含一條記錄,我們可以用Spark API直接來讀取這2個檔案來驗證下。
更新資料
Delta Lake 最重要的feature就是對ACID的支援,你可以在任何時候更新一張表,而不用擔心這個操作會影響到其他人的讀和寫。
接下來我們就來做一個更新資料的操作。
然後再用Select語句驗證下這個Update操作。
那這個Update語句背後Delta做了什麼呢?同樣我們可以看下 events 表的資料夾,會發現2個變化:
-
新增了一個transaction log檔案
-
新增了一個 parquet 檔案,之前的2個 parquet 檔案沒有刪除。
我們首先來看這個這個新的transaction log檔案的內容,裡面包含了2個操作:
-
刪除了一個parquet檔案
-
新增了一個parquet檔案
很自然,我們會猜想刪除的parquet檔案應該就是包含了記錄(2, data_2), 而新增的parquet檔案應該正好包含記錄 (2, data_2_updated). 我們可以直接讀取這2個parquet檔案來驗證我們的猜想。
接下來我們可以嘗試下Delta的time travel功能。我們可以用下面的程式碼來讀取上個版本(執行Update語句之前)的events表。
Time travel 功能能夠work主要就是因為Delta不會真正做刪除資料檔案操作,它只會把真正的刪除檔案操作記錄在transaction log裡。上面程式碼裡Delta只會讀取第一個和第二個transaction log檔案:00000000000000000000.json & 00000000000000000001.json.
刪除資料
接下來我們做一個刪除資料的操作
用Select語句來驗證下這個Delete操作
那這個Delete語句背後Delta做了什麼呢?同樣我們通過檢視 events 表目錄會發現2個變動:
-
新生成了一個 transaction log檔案
-
新生成了一個 parquet 檔案
在這個transtion log檔案裡有2個操作:remove 和 add。
很自然我們會猜測 這個刪除的檔案就是包含記錄(1,data_1)的檔案,那這個add的檔案是什麼呢?其實這個add檔案裡面是空的。我們可以直接讀取這2個parquet檔案來驗證。
總結
這篇文章裡,我總共做了4個步驟:
-
建立Delta表
-
插入資料
-
更新資料
-
刪除資料
每個步驟裡,我都會檢視transaction log的內容以及data檔案的變化。上面所有的程式碼都可以在 Zeppelin docker container裡執行,希望這篇文章對你理解Delta Lake的內部機制有幫助。當然這篇文章只是一個基本的介紹,其他更高階的Delta Lake內容以後有時間再和大家分享。
- Airflow sensor簡介
- Airflow 基礎系列 - 03 (Operators介紹)
- Airflow 基礎系列 - 02 (Executor詳解)
- 深入理解Delta Lake 實現原理(on Zeppelin)
- Airflow 基礎系列-01 (Airflow元件)
- PyFlink 開發環境利器:Zeppelin Notebook
- 在Zeppelin中如何使用Flink
- 記一次 Centos7.x 安裝、部署 Zeppelin v0.9.0 並配置 PostgreSql 資料庫
- 新版Denodo Platform 8.0加速混合/多雲整合,通過AI/ML實現資料管理自動化,並提高效能
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- PyFlink 區塊鏈?揭祕行業領頭企業 BTC.com 如何實現實時計算
- Flink x Zeppelin ,Hive Streaming 實戰解析
- Zeppelin整合Flink採坑實錄