深入理解Delta Lake 实现原理(on Zeppelin)
Delta Lake 已经出来很久了,看过一些介绍性的文章,也跑过一些简单的例子。周末花时间重新整理一下Delta Lake的原理,并且在Zeppelin里跑了一些简单的例子,相信通过这些例子,大家可以更好的理解Delta Lake的内部实现机制,特别是Transaction Log的机制。
启动Zeppelin Docker Container
为了能够更直观得展示Delta Lake的内部机制,我会用Zeppelin来跑本文全部的例子。你也可以在Zeppelin里重现本文所有的例子。简单说明下如何启动Zeppelin Docker Container 来运行本文的例子:
-
步骤1. git clone http://github.com/zjffdu/zeppelin-notebook.git
-
步骤2. 下载 Spark 3.1.2 (这是本文用的Spark版本,Spark 3.2.0 还暂不支持)
-
步骤3. 运行下面的代码,启动 Zeppelin Docker Container。命令里的${zeppelin_notebook} 是你在步骤1里clone下来的 Zeppelin notebook目录,${spark_location} 是指你在步骤2下载的Spark文件夹路径
docker run -u $(id -u) -p 8080:8080 -p 4040:4040 \
--rm -v ${spark_location}:/opt/spark -v ${zeppelin_notebook}:/opt/notebook \
-e ZEPPELIN_NOTEBOOK_DIR=/opt/notebook -e SPARK_HOME=/opt/spark \
-e ZEPPELIN_LOCAL_IP=0.0.0.0 --name zeppelin apache/zeppelin:0.10.0
接下来浏览器里打开 http://localhost:8080 , 打开其中的Notebook: Spark/Deep Dive into Delta Lake
配置 Spark
这是 Deep Dive into Delta Lake 的第一个段落,用来初始化Spark Interpreter的配置。
%spark.conf 是Zeppelin里一种特殊的用来配置Interpreter的Interpreter。这里我按照Delta Lake的 官方教程 来配置Spark,此外还配置 spark.sql.warehouse.dir ,这样在后面的教程里我可以去table的指定目录查看变化。好,那接下来就让我们在Zeppelin里体验Delta Lake吧。
创建 Delta 表
首先我们来创建一个含有2个字段的 events 表:id 和 data.
那执行这条Sql语句 Delta 到底在后台做了什么呢?其实Delta就做里 2件事:
-
在文件夹 /tmp/warehouse 下创建文件夹 events
-
生成第一个transaction log 文件,里面含有这张表的schema信息
插入数据
接下来我们插入2条记录到这个 events 表。
然后再用Select语句查询下这张表,确认下这条insert语句是否成功。
那么这条Insert语句背后Delta到底做了什么呢?我们可以查看下目录 /tmp/warehouse/events 会发现2个变化:
-
生产了一个新的 transaction log文件.
-
新增加了 2 个parquet 文件
首先我们来看下这个生成的transaction log 文件:00000000000000000001.json. 这个文件内容很好理解, 里面包含了这个insert语句的操作内容:Add 2个parquet文件。这2个parquet文件里就是包含了这2条记录。你可能注意到了这个transation log文件里没有表的schema信息。因为schema信息已经存在第一个transaction log里了(00000000000000000000.json)。
当Delta读表的时候,他会merge所有的历史transaction log来获得这张表的所有信息,包括表的schema信息和data文件信息。
因为我们正好插入了2条记录,所有我们有理由相信应该每个parquet文件包含一条记录,我们可以用Spark API直接来读取这2个文件来验证下。
更新数据
Delta Lake 最重要的feature就是对ACID的支持,你可以在任何时候更新一张表,而不用担心这个操作会影响到其他人的读和写。
接下来我们就来做一个更新数据的操作。
然后再用Select语句验证下这个Update操作。
那这个Update语句背后Delta做了什么呢?同样我们可以看下 events 表的文件夹,会发现2个变化:
-
新增了一个transaction log文件
-
新增了一个 parquet 文件,之前的2个 parquet 文件没有删除。
我们首先来看这个这个新的transaction log文件的内容,里面包含了2个操作:
-
删除了一个parquet文件
-
新增了一个parquet文件
很自然,我们会猜想删除的parquet文件应该就是包含了记录(2, data_2), 而新增的parquet文件应该正好包含记录 (2, data_2_updated). 我们可以直接读取这2个parquet文件来验证我们的猜想。
接下来我们可以尝试下Delta的time travel功能。我们可以用下面的代码来读取上个版本(执行Update语句之前)的events表。
Time travel 功能能够work主要就是因为Delta不会真正做删除数据文件操作,它只会把真正的删除文件操作记录在transaction log里。上面代码里Delta只会读取第一个和第二个transaction log文件:00000000000000000000.json & 00000000000000000001.json.
删除数据
接下来我们做一个删除数据的操作
用Select语句来验证下这个Delete操作
那这个Delete语句背后Delta做了什么呢?同样我们通过查看 events 表目录会发现2个变动:
-
新生成了一个 transaction log文件
-
新生成了一个 parquet 文件
在这个transtion log文件里有2个操作:remove 和 add。
很自然我们会猜测 这个删除的文件就是包含记录(1,data_1)的文件,那这个add的文件是什么呢?其实这个add文件里面是空的。我们可以直接读取这2个parquet文件来验证。
总结
这篇文章里,我总共做了4个步骤:
-
创建Delta表
-
插入数据
-
更新数据
-
删除数据
每个步骤里,我都会查看transaction log的内容以及data文件的变化。上面所有的代码都可以在 Zeppelin docker container里运行,希望这篇文章对你理解Delta Lake的内部机制有帮助。当然这篇文章只是一个基本的介绍,其他更高级的Delta Lake内容以后有时间再和大家分享。
- Airflow sensor简介
- Airflow 基础系列 - 03 (Operators介绍)
- Airflow 基础系列 - 02 (Executor详解)
- 深入理解Delta Lake 实现原理(on Zeppelin)
- Airflow 基础系列-01 (Airflow组件)
- PyFlink 开发环境利器:Zeppelin Notebook
- 在Zeppelin中如何使用Flink
- 记一次 Centos7.x 安装、部署 Zeppelin v0.9.0 并配置 PostgreSql 数据库
- 新版Denodo Platform 8.0加速混合/多云集成,通过AI/ML实现数据管理自动化,并提高性能
- PyFlink 区块链?揭秘行业领头企业 BTC.com 如何实现实时计算
- PyFlink 区块链?揭秘行业领头企业 BTC.com 如何实现实时计算
- Flink x Zeppelin ,Hive Streaming 实战解析
- Zeppelin整合Flink采坑实录