深入理解Delta Lake 實現原理(on Zeppelin)

語言: CN / TW / HK

Delta Lake 已經出來很久了,看過一些介紹性的文章,也跑過一些簡單的例子。週末花時間重新整理一下Delta Lake的原理,並且在Zeppelin裡跑了一些簡單的例子,相信通過這些例子,大家可以更好的理解Delta Lake的內部實現機制,特別是Transaction Log的機制。

啟動Zeppelin Docker Container

為了能夠更直觀得展示Delta Lake的內部機制,我會用Zeppelin來跑本文全部的例子。你也可以在Zeppelin裡重現本文所有的例子。簡單說明下如何啟動Zeppelin Docker Container 來執行本文的例子:

  • 步驟1. git clone https://github.com/zjffdu/zeppelin-notebook.git

  • 步驟2. 下載 Spark 3.1.2 (這是本文用的Spark版本,Spark 3.2.0 還暫不支援)

  • 步驟3. 執行下面的程式碼,啟動 Zeppelin Docker Container。命令裡的${zeppelin_notebook} 是你在步驟1裡clone下來的 Zeppelin notebook目錄,${spark_location} 是指你在步驟2下載的Spark資料夾路徑

docker run -u $(id -u) -p 8080:8080 -p 4040:4040 \ 
--rm -v ${spark_location}:/opt/spark -v ${zeppelin_notebook}:/opt/notebook \
-e ZEPPELIN_NOTEBOOK_DIR=/opt/notebook -e SPARK_HOME=/opt/spark \
-e ZEPPELIN_LOCAL_IP=0.0.0.0 --name zeppelin apache/zeppelin:0.10.0

接下來瀏覽器裡開啟 http://localhost:8080 , 開啟其中的Notebook: Spark/Deep Dive into Delta Lake

配置 Spark

這是 Deep Dive into Delta Lake 的第一個段落,用來初始化Spark Interpreter的配置。

%spark.conf 是Zeppelin裡一種特殊的用來配置Interpreter的Interpreter。這裡我按照Delta Lake的 官方教程 來配置Spark,此外還配置 spark.sql.warehouse.dir ,這樣在後面的教程裡我可以去table的指定目錄檢視變化。好,那接下來就讓我們在Zeppelin裡體驗Delta Lake吧。 

建立 Delta 表

首先我們來建立一個含有2個欄位的 events 表:id 和 data.

那執行這條Sql語句 Delta 到底在後臺做了什麼呢?其實Delta就做裡 2件事:

  • 在資料夾 /tmp/warehouse 下建立資料夾 events  

  • 生成第一個transaction log 檔案,裡面含有這張表的schema資訊

插入資料

接下來我們插入2條記錄到這個 events 表。

然後再用Select語句查詢下這張表,確認下這條insert語句是否成功。

那麼這條Insert語句背後Delta到底做了什麼呢?我們可以檢視下目錄 /tmp/warehouse/events 會發現2個變化:

  • 生產了一個新的 transaction log檔案.

  • 新增加了 2 個parquet 檔案

首先我們來看下這個生成的transaction log 檔案:00000000000000000001.json. 這個檔案內容很好理解, 裡面包含了這個insert語句的操作內容:Add 2個parquet檔案。這2個parquet檔案裡就是包含了這2條記錄。你可能注意到了這個transation log檔案裡沒有表的schema資訊。因為schema資訊已經存在第一個transaction log裡了(00000000000000000000.json)。

當Delta讀表的時候,他會merge所有的歷史transaction log來獲得這張表的所有資訊,包括表的schema資訊和data檔案資訊。

因為我們正好插入了2條記錄,所有我們有理由相信應該每個parquet檔案包含一條記錄,我們可以用Spark API直接來讀取這2個檔案來驗證下。

更新資料

Delta Lake 最重要的feature就是對ACID的支援,你可以在任何時候更新一張表,而不用擔心這個操作會影響到其他人的讀和寫。

接下來我們就來做一個更新資料的操作。

然後再用Select語句驗證下這個Update操作。

那這個Update語句背後Delta做了什麼呢?同樣我們可以看下 events 表的資料夾,會發現2個變化:

  • 新增了一個transaction log檔案

  • 新增了一個 parquet 檔案,之前的2個 parquet 檔案沒有刪除。

我們首先來看這個這個新的transaction log檔案的內容,裡面包含了2個操作:

  • 刪除了一個parquet檔案

  • 新增了一個parquet檔案

很自然,我們會猜想刪除的parquet檔案應該就是包含了記錄(2, data_2), 而新增的parquet檔案應該正好包含記錄 (2, data_2_updated). 我們可以直接讀取這2個parquet檔案來驗證我們的猜想。

接下來我們可以嘗試下Delta的time travel功能。我們可以用下面的程式碼來讀取上個版本(執行Update語句之前)的events表。

Time travel 功能能夠work主要就是因為Delta不會真正做刪除資料檔案操作,它只會把真正的刪除檔案操作記錄在transaction log裡。上面程式碼裡Delta只會讀取第一個和第二個transaction log檔案:00000000000000000000.json & 00000000000000000001.json.

刪除資料

接下來我們做一個刪除資料的操作

用Select語句來驗證下這個Delete操作

那這個Delete語句背後Delta做了什麼呢?同樣我們通過檢視 events 表目錄會發現2個變動:

  • 新生成了一個 transaction log檔案

  • 新生成了一個 parquet 檔案

在這個transtion log檔案裡有2個操作:remove 和 add。

很自然我們會猜測 這個刪除的檔案就是包含記錄(1,data_1)的檔案,那這個add的檔案是什麼呢?其實這個add檔案裡面是空的。我們可以直接讀取這2個parquet檔案來驗證。

總結

這篇文章裡,我總共做了4個步驟:

  • 建立Delta表

  • 插入資料

  • 更新資料

  • 刪除資料

每個步驟裡,我都會檢視transaction log的內容以及data檔案的變化。上面所有的程式碼都可以在 Zeppelin docker container裡執行,希望這篇文章對你理解Delta Lake的內部機制有幫助。當然這篇文章只是一個基本的介紹,其他更高階的Delta Lake內容以後有時間再和大家分享。