深入理解Delta Lake 實現原理(on Zeppelin)

語言: CN / TW / HK

Delta Lake 已經出來很久了,看過一些介紹性的文章,也跑過一些簡單的例子。週末花時間重新整理一下Delta Lake的原理,並且在Zeppelin裏跑了一些簡單的例子,相信通過這些例子,大家可以更好的理解Delta Lake的內部實現機制,特別是Transaction Log的機制。

啟動Zeppelin Docker Container

為了能夠更直觀得展示Delta Lake的內部機制,我會用Zeppelin來跑本文全部的例子。你也可以在Zeppelin裏重現本文所有的例子。簡單説明下如何啟動Zeppelin Docker Container 來運行本文的例子:

  • 步驟1. git clone https://github.com/zjffdu/zeppelin-notebook.git

  • 步驟2. 下載 Spark 3.1.2 (這是本文用的Spark版本,Spark 3.2.0 還暫不支持)

  • 步驟3. 運行下面的代碼,啟動 Zeppelin Docker Container。命令裏的${zeppelin_notebook} 是你在步驟1裏clone下來的 Zeppelin notebook目錄,${spark_location} 是指你在步驟2下載的Spark文件夾路徑

docker run -u $(id -u) -p 8080:8080 -p 4040:4040 \ 
--rm -v ${spark_location}:/opt/spark -v ${zeppelin_notebook}:/opt/notebook \
-e ZEPPELIN_NOTEBOOK_DIR=/opt/notebook -e SPARK_HOME=/opt/spark \
-e ZEPPELIN_LOCAL_IP=0.0.0.0 --name zeppelin apache/zeppelin:0.10.0

接下來瀏覽器裏打開 http://localhost:8080 , 打開其中的Notebook: Spark/Deep Dive into Delta Lake

配置 Spark

這是 Deep Dive into Delta Lake 的第一個段落,用來初始化Spark Interpreter的配置。

%spark.conf 是Zeppelin裏一種特殊的用來配置Interpreter的Interpreter。這裏我按照Delta Lake的 官方教程 來配置Spark,此外還配置 spark.sql.warehouse.dir ,這樣在後面的教程裏我可以去table的指定目錄查看變化。好,那接下來就讓我們在Zeppelin裏體驗Delta Lake吧。 

創建 Delta 表

首先我們來創建一個含有2個字段的 events 表:id 和 data.

那執行這條Sql語句 Delta 到底在後台做了什麼呢?其實Delta就做裏 2件事:

  • 在文件夾 /tmp/warehouse 下創建文件夾 events  

  • 生成第一個transaction log 文件,裏面含有這張表的schema信息

插入數據

接下來我們插入2條記錄到這個 events 表。

然後再用Select語句查詢下這張表,確認下這條insert語句是否成功。

那麼這條Insert語句背後Delta到底做了什麼呢?我們可以查看下目錄 /tmp/warehouse/events 會發現2個變化:

  • 生產了一個新的 transaction log文件.

  • 新增加了 2 個parquet 文件

首先我們來看下這個生成的transaction log 文件:00000000000000000001.json. 這個文件內容很好理解, 裏面包含了這個insert語句的操作內容:Add 2個parquet文件。這2個parquet文件裏就是包含了這2條記錄。你可能注意到了這個transation log文件裏沒有表的schema信息。因為schema信息已經存在第一個transaction log裏了(00000000000000000000.json)。

當Delta讀表的時候,他會merge所有的歷史transaction log來獲得這張表的所有信息,包括表的schema信息和data文件信息。

因為我們正好插入了2條記錄,所有我們有理由相信應該每個parquet文件包含一條記錄,我們可以用Spark API直接來讀取這2個文件來驗證下。

更新數據

Delta Lake 最重要的feature就是對ACID的支持,你可以在任何時候更新一張表,而不用擔心這個操作會影響到其他人的讀和寫。

接下來我們就來做一個更新數據的操作。

然後再用Select語句驗證下這個Update操作。

那這個Update語句背後Delta做了什麼呢?同樣我們可以看下 events 表的文件夾,會發現2個變化:

  • 新增了一個transaction log文件

  • 新增了一個 parquet 文件,之前的2個 parquet 文件沒有刪除。

我們首先來看這個這個新的transaction log文件的內容,裏面包含了2個操作:

  • 刪除了一個parquet文件

  • 新增了一個parquet文件

很自然,我們會猜想刪除的parquet文件應該就是包含了記錄(2, data_2), 而新增的parquet文件應該正好包含記錄 (2, data_2_updated). 我們可以直接讀取這2個parquet文件來驗證我們的猜想。

接下來我們可以嘗試下Delta的time travel功能。我們可以用下面的代碼來讀取上個版本(執行Update語句之前)的events表。

Time travel 功能能夠work主要就是因為Delta不會真正做刪除數據文件操作,它只會把真正的刪除文件操作記錄在transaction log裏。上面代碼裏Delta只會讀取第一個和第二個transaction log文件:00000000000000000000.json & 00000000000000000001.json.

刪除數據

接下來我們做一個刪除數據的操作

用Select語句來驗證下這個Delete操作

那這個Delete語句背後Delta做了什麼呢?同樣我們通過查看 events 表目錄會發現2個變動:

  • 新生成了一個 transaction log文件

  • 新生成了一個 parquet 文件

在這個transtion log文件裏有2個操作:remove 和 add。

很自然我們會猜測 這個刪除的文件就是包含記錄(1,data_1)的文件,那這個add的文件是什麼呢?其實這個add文件裏面是空的。我們可以直接讀取這2個parquet文件來驗證。

總結

這篇文章裏,我總共做了4個步驟:

  • 創建Delta表

  • 插入數據

  • 更新數據

  • 刪除數據

每個步驟裏,我都會查看transaction log的內容以及data文件的變化。上面所有的代碼都可以在 Zeppelin docker container裏運行,希望這篇文章對你理解Delta Lake的內部機制有幫助。當然這篇文章只是一個基本的介紹,其他更高級的Delta Lake內容以後有時間再和大家分享。