[LakeHouse] Delta Lake全部开源,聊聊Delta的实现架构

语言: CN / TW / HK

刚刚结束的Data + AI summit上,Databricks宣布将Delta Lake全部开源。

目前在LakeHouse的市场上国内有Hudi,国外有Iceberg, Delta Lake社区正被他们冲击着,这次Delta Lake的全部开源不管是急病乱投医,还是绝地反击我们暂不讨论。今天我们主要来了解了Delta Lake是如何实现的。

Delta Lake的诞生

在2017年,Delta Lake 横空出世,它主打的概念是湖仓一体,最初只开放给付费用户使用。在2019年时,为提高其市场的占用份额和影响力,将其进行部分开源。

Delta Lake创建之初的定位主要是为解决云存储中很难实现 ACID 事务和高性能的问题。

  1. 更新不是原子操作,因此查询不是隔离的,那么在多对象的更新中,reader将可以查询到部分的更新,某个对象更新失败后回滚需要整体回滚。
  2. 在大型表的云存储中进行元数据操作成本很高。例如parquet文件的footer中包含min/max统计信息可以帮助reader进行选择性的读取,在HDFS上读取这样的页脚可能需要几毫秒,但是在云存储上需要更高的读取延时。
  3. 对象存储上的list操作性能非常差。

为了解决上面的问题,设计并实现了基于云存储的ACID表存储层--Delta Lake。

Delta Lake的实现思想也很简单:使用存储在云对象存储中的预写日志,以ACID的方式来管理维护Delta表中的信息。

那么Delta Lake是如何解决上面的存储层问题呢?我列举了如下几个重要的特性:

  1. 时间旅行,允许用户查询时间点的快照,也可以根据时间点进行回滚。
  2. Upsert、Delete和Merge操作,可以有效的重写对象,支持流式更新操作。
  3. 高效的流式IO, 通过流式操作将小对象写入表中,并以事务的方式进行合并更新,同时还支持增量消费。
  4. 自动的数据布局优化,可以自动的优化表中的对象大小,并将数据记录进行聚类。
  5. 支持schema进化,支持表的schema更改但不用重写他们。

Delta Lake的存储架构

Delta Lake 的数据存储原理其实很简单。它通过 Partition Directories 存储数据,数据格式推荐为 Parquet,然后通过预写的 Transaction Log (事务日志)记录的表版本(Table Version) 和变更历史,以维护历史版本数据。

Delta Lake中的一些表级的操作,例如更新元数据、更新表名、变更 Schema、增加或删除Partition、添加或者移除文件,都会以日志的形式将所有的操作存储在表中。

上图展示了Delta中数据的组织形式。数据和事务日志都被存储在表级的目录下,其中数据以传统的Hive分区目录的方式存储,事务日志被存储在_delta_log的目录下。

  1. Delta每次事务commit都会产生一个json的元数据文件,文件内容包括本次commit做的所有action,比如AddFile/RemoveFile,也包括对schema的修改等等;
  2. 每产生一个新的json文件就会产生一个新的Delta的snapshot,snapshot的版本即该json文件中的数字,该数字必须是连续自增,Delta的某个版本的snapshot是通过顺序回放所有小于等于该snapshot版本号的所有json文件得到;
  3. Delta Lake会以一定的频率做checkpoint,checkpoint以Parquet的格式存储,目的是为了便于使用Spark并行进行向量化处理。
  4. delta_log 子目录下还包含一个last_checkpoint文件指向最新的checkpoint,从而在日志操作时可以快速找到最新的checkpoint。

从上面的元数据结构可以看出,Delta和Hudi和Iceberg其实是大同小异。

那么Delta基于事务日志实现的细节又是怎样的呢?

Delta事务日志的实现细节

Delta事务日志的实现主要是基于MVCC多版本控制协议实现。Delta 的 MVCC 算法保留多个数据副本,而不是立即替换包含正在更新或删除的记录的文件。

表的读取:主要是通过使用事务日志有选择地选择要处理的数据文件,确保他们一次只能看到表的一致快照。

表的写入与修改:首先,乐观地写出新数据文件或修改现有数据文件的拷贝副本。然后,进行事务提交,通过向日志中添加新条目来创建表的最新原子版本。在此日志条目中,他们记录了要在逻辑上添加和删除哪些数据文件,以及对有关表的其他元数据的更改。

在用户指定的保留期(默认为 7 天)后,过期的数据文件将被删除。

  • Delta files
./_delta_log/00000000000000000001.json

Delta files是以原子性单位产生的文件(即没提交一次commit事务),文件的命名是递增的版本id, 它与checkpoints文件一起构成表中所有更改的日志。

Delta files的json文件中会包含一组应用应用于前一个表版本的actions操作,每一个actions是以一个json组存储与Delta files中。

其action行为主要有以下几种:SetTransaction、AddFile、RemoveFile、Metadata、Protocol、CommitInfo。

  • Checkpoints
./_delta_log/00000000000000000010.checkpoint.parquet

checkpoints文件也存储在 _delta_log 目录中,可以为任何版本的表创建。检查点包含在此版本之前的所有操作的完整回放,并删除了无效操作。无效操作是那些已被后续操作取消的操作(例如删除已添加的文件)。

默认情况下,参考实现每 10 次提交创建一个checkpoint。checkpoints文件名基于检查点包含的表的版本。

  • 最后一个Checkpoint

Delta 事务日志通常包含许多(例如 10,000+)文件。列出如此大的目录可能会非常昂贵。最后一个checkpoint文件可以通过提供指向日志末尾附近的指针来帮助降低构建表的最新快照的成本。

读者可以通过查看 _delta_log/_last_checkpoint 文件来定位最近的检查点,而不是列出整个目录。

那么接下来我们来看看json文件中的内容是什么?下面我们捡几个重要的展开看看。

Actions

  1. Metadata

元数据操作更改表的当前元数据。表的第一个版本必须包含元数据操作。随后的元数据操作完全覆盖表的当前元数据。

{
  "metaData":{
    "id":"af23c9d7-fff1-4a5a-a2c8-55c59bd782aa",
    "format":{"provider":"parquet","options":{}},
    "schemaString":"...",
    "partitionColumns":[],
    "configuration":{
      "appendOnly": "true"
    }
  }
}

Metadata中记录当前表id, 表级别的file format, 分区列信息等。在schemaString存储这columnMapping信息。使用列映射来避免任何列命名限制,并支持重命名和删除列,而无需重写所有数据。列映射有三种模式,按名称和按id和none。

{
    "name" : "e",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "d",
          "type" : "integer",
          "nullable" : false,
          "metadata" : {
            "delta.columnMapping.id": 5,
            "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
          }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : {
      "delta.columnMapping.id": 4,
      "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
    }
  }
  1. ADD / Delete File

添加和删除操作分别用于通过添加或删除单个数据文件来修改表中的数据。

{
  "add": {
    "path":"date=2017-12-10/part-000...c000.gz.parquet",
    "partitionValues":{"date":"2017-12-10"},
    "size":841454,
    "modificationTime":1512909768000,
    "dataChange":true
    "stats":"{\\"numRecords\\":1,\\"minValues\\":{\\"val..."
  }
}

add中记录添加文件的相对路径,以及当前file所属的分区信息,通过还包含了file的统计信息,包括min/max。

{
  "remove":{
    "path":"part-00001-9…..snappy.parquet",
    "deletionTimestamp":1515488792485,
    "dataChange":true
  }
}

删除操作只包括一个时间戳,指示删除发生的时间。文件的物理删除可能会延迟进行在用户指定的过期时间之后。删除操作应该作为逻辑删除保持在表的状态中,直到过期。当增量文件的创建时间戳超过添加到删除操作时间戳的过期阈值时,逻辑删除将过期。

  1. Transaction Identifiers
{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

事务标识符以appId版本对的形式存储,其中appId是修改表的进程的唯一标识符,版本表示该应用程序取得了多大进展。该信息的原子记录以及对表的修改使这些外部系统能够将其写入到Delta表幂等中。

下面我们来总结对比下:

  1. Delta的实现和Spark深度绑定,目前只支持Spark计算引擎,Iceberg和Hudi都可以支持多种引擎。
  2. 目前Delta只支持COW形式,Iceberg和Hudi都支持部分MOR。
  3. 在实现方式上与Hudi, Iceberg大同小异,但是其事务日志文件中只记录了上一版本与当前版本的差分Action。如果要获取某个commit的完整文件列表就需要把之前的差分Action进行重放。不过Delta引入checkpoint机制,当commit积累到一定数量,会生成一个checkpoint, 此时会删除简化无效的Action, 后续的读取可以基于这个checkpoint开始重放。
  4. Delta可以生成较少的元数据文件,基于checkpoint机制和过期文件的删除,减少了大量小文件的产生,但是并不能很好获取某个commit的数据。相比Iceberg可能会产生大量的元数据文件,影响大数据量下的查询性能,但也相应的增加文件组跳过的能力。

后续会再继续解密下开源的付费功能Z-order的实现源码。