深入理解Delta Lake 实现原理(on Zeppelin)

语言: CN / TW / HK

Delta Lake 已经出来很久了,看过一些介绍性的文章,也跑过一些简单的例子。周末花时间重新整理一下Delta Lake的原理,并且在Zeppelin里跑了一些简单的例子,相信通过这些例子,大家可以更好的理解Delta Lake的内部实现机制,特别是Transaction Log的机制。

启动Zeppelin Docker Container

为了能够更直观得展示Delta Lake的内部机制,我会用Zeppelin来跑本文全部的例子。你也可以在Zeppelin里重现本文所有的例子。简单说明下如何启动Zeppelin Docker Container 来运行本文的例子:

  • 步骤1. git clone http://github.com/zjffdu/zeppelin-notebook.git

  • 步骤2. 下载 Spark 3.1.2 (这是本文用的Spark版本,Spark 3.2.0 还暂不支持)

  • 步骤3. 运行下面的代码,启动 Zeppelin Docker Container。命令里的${zeppelin_notebook} 是你在步骤1里clone下来的 Zeppelin notebook目录,${spark_location} 是指你在步骤2下载的Spark文件夹路径

docker run -u $(id -u) -p 8080:8080 -p 4040:4040 \ 
--rm -v ${spark_location}:/opt/spark -v ${zeppelin_notebook}:/opt/notebook \
-e ZEPPELIN_NOTEBOOK_DIR=/opt/notebook -e SPARK_HOME=/opt/spark \
-e ZEPPELIN_LOCAL_IP=0.0.0.0 --name zeppelin apache/zeppelin:0.10.0

接下来浏览器里打开 http://localhost:8080 , 打开其中的Notebook: Spark/Deep Dive into Delta Lake

配置 Spark

这是 Deep Dive into Delta Lake 的第一个段落,用来初始化Spark Interpreter的配置。

%spark.conf 是Zeppelin里一种特殊的用来配置Interpreter的Interpreter。这里我按照Delta Lake的 官方教程 来配置Spark,此外还配置 spark.sql.warehouse.dir ,这样在后面的教程里我可以去table的指定目录查看变化。好,那接下来就让我们在Zeppelin里体验Delta Lake吧。 

创建 Delta 表

首先我们来创建一个含有2个字段的 events 表:id 和 data.

那执行这条Sql语句 Delta 到底在后台做了什么呢?其实Delta就做里 2件事:

  • 在文件夹 /tmp/warehouse 下创建文件夹 events  

  • 生成第一个transaction log 文件,里面含有这张表的schema信息

插入数据

接下来我们插入2条记录到这个 events 表。

然后再用Select语句查询下这张表,确认下这条insert语句是否成功。

那么这条Insert语句背后Delta到底做了什么呢?我们可以查看下目录 /tmp/warehouse/events 会发现2个变化:

  • 生产了一个新的 transaction log文件.

  • 新增加了 2 个parquet 文件

首先我们来看下这个生成的transaction log 文件:00000000000000000001.json. 这个文件内容很好理解, 里面包含了这个insert语句的操作内容:Add 2个parquet文件。这2个parquet文件里就是包含了这2条记录。你可能注意到了这个transation log文件里没有表的schema信息。因为schema信息已经存在第一个transaction log里了(00000000000000000000.json)。

当Delta读表的时候,他会merge所有的历史transaction log来获得这张表的所有信息,包括表的schema信息和data文件信息。

因为我们正好插入了2条记录,所有我们有理由相信应该每个parquet文件包含一条记录,我们可以用Spark API直接来读取这2个文件来验证下。

更新数据

Delta Lake 最重要的feature就是对ACID的支持,你可以在任何时候更新一张表,而不用担心这个操作会影响到其他人的读和写。

接下来我们就来做一个更新数据的操作。

然后再用Select语句验证下这个Update操作。

那这个Update语句背后Delta做了什么呢?同样我们可以看下 events 表的文件夹,会发现2个变化:

  • 新增了一个transaction log文件

  • 新增了一个 parquet 文件,之前的2个 parquet 文件没有删除。

我们首先来看这个这个新的transaction log文件的内容,里面包含了2个操作:

  • 删除了一个parquet文件

  • 新增了一个parquet文件

很自然,我们会猜想删除的parquet文件应该就是包含了记录(2, data_2), 而新增的parquet文件应该正好包含记录 (2, data_2_updated). 我们可以直接读取这2个parquet文件来验证我们的猜想。

接下来我们可以尝试下Delta的time travel功能。我们可以用下面的代码来读取上个版本(执行Update语句之前)的events表。

Time travel 功能能够work主要就是因为Delta不会真正做删除数据文件操作,它只会把真正的删除文件操作记录在transaction log里。上面代码里Delta只会读取第一个和第二个transaction log文件:00000000000000000000.json & 00000000000000000001.json.

删除数据

接下来我们做一个删除数据的操作

用Select语句来验证下这个Delete操作

那这个Delete语句背后Delta做了什么呢?同样我们通过查看 events 表目录会发现2个变动:

  • 新生成了一个 transaction log文件

  • 新生成了一个 parquet 文件

在这个transtion log文件里有2个操作:remove 和 add。

很自然我们会猜测 这个删除的文件就是包含记录(1,data_1)的文件,那这个add的文件是什么呢?其实这个add文件里面是空的。我们可以直接读取这2个parquet文件来验证。

总结

这篇文章里,我总共做了4个步骤:

  • 创建Delta表

  • 插入数据

  • 更新数据

  • 删除数据

每个步骤里,我都会查看transaction log的内容以及data文件的变化。上面所有的代码都可以在 Zeppelin docker container里运行,希望这篇文章对你理解Delta Lake的内部机制有帮助。当然这篇文章只是一个基本的介绍,其他更高级的Delta Lake内容以后有时间再和大家分享。