[LakeHouse] Delta Lake全部開源,聊聊Delta的實現架構

語言: CN / TW / HK

剛剛結束的Data + AI summit上,Databricks宣佈將Delta Lake全部開源。

目前在LakeHouse的市場上國內有Hudi,國外有Iceberg, Delta Lake社區正被他們衝擊着,這次Delta Lake的全部開源不管是急病亂投醫,還是絕地反擊我們暫不討論。今天我們主要來了解了Delta Lake是如何實現的。

Delta Lake的誕生

在2017年,Delta Lake 橫空出世,它主打的概念是湖倉一體,最初只開放給付費用户使用。在2019年時,為提高其市場的佔用份額和影響力,將其進行部分開源。

Delta Lake創建之初的定位主要是為解決雲存儲中很難實現 ACID 事務和高性能的問題。

  1. 更新不是原子操作,因此查詢不是隔離的,那麼在多對象的更新中,reader將可以查詢到部分的更新,某個對象更新失敗後回滾需要整體回滾。
  2. 在大型表的雲存儲中進行元數據操作成本很高。例如parquet文件的footer中包含min/max統計信息可以幫助reader進行選擇性的讀取,在HDFS上讀取這樣的頁腳可能需要幾毫秒,但是在雲存儲上需要更高的讀取延時。
  3. 對象存儲上的list操作性能非常差。

為了解決上面的問題,設計並實現了基於雲存儲的ACID表存儲層--Delta Lake。

Delta Lake的實現思想也很簡單:使用存儲在雲對象存儲中的預寫日誌,以ACID的方式來管理維護Delta表中的信息。

那麼Delta Lake是如何解決上面的存儲層問題呢?我列舉了如下幾個重要的特性:

  1. 時間旅行,允許用户查詢時間點的快照,也可以根據時間點進行回滾。
  2. Upsert、Delete和Merge操作,可以有效的重寫對象,支持流式更新操作。
  3. 高效的流式IO, 通過流式操作將小對象寫入表中,並以事務的方式進行合併更新,同時還支持增量消費。
  4. 自動的數據佈局優化,可以自動的優化表中的對象大小,並將數據記錄進行聚類。
  5. 支持schema進化,支持表的schema更改但不用重寫他們。

Delta Lake的存儲架構

Delta Lake 的數據存儲原理其實很簡單。它通過 Partition Directories 存儲數據,數據格式推薦為 Parquet,然後通過預寫的 Transaction Log (事務日誌)記錄的表版本(Table Version) 和變更歷史,以維護歷史版本數據。

Delta Lake中的一些表級的操作,例如更新元數據、更新表名、變更 Schema、增加或刪除Partition、添加或者移除文件,都會以日誌的形式將所有的操作存儲在表中。

上圖展示了Delta中數據的組織形式。數據和事務日誌都被存儲在表級的目錄下,其中數據以傳統的Hive分區目錄的方式存儲,事務日誌被存儲在_delta_log的目錄下。

  1. Delta每次事務commit都會產生一個json的元數據文件,文件內容包括本次commit做的所有action,比如AddFile/RemoveFile,也包括對schema的修改等等;
  2. 每產生一個新的json文件就會產生一個新的Delta的snapshot,snapshot的版本即該json文件中的數字,該數字必須是連續自增,Delta的某個版本的snapshot是通過順序回放所有小於等於該snapshot版本號的所有json文件得到;
  3. Delta Lake會以一定的頻率做checkpoint,checkpoint以Parquet的格式存儲,目的是為了便於使用Spark並行進行向量化處理。
  4. delta_log 子目錄下還包含一個last_checkpoint文件指向最新的checkpoint,從而在日誌操作時可以快速找到最新的checkpoint。

從上面的元數據結構可以看出,Delta和Hudi和Iceberg其實是大同小異。

那麼Delta基於事務日誌實現的細節又是怎樣的呢?

Delta事務日誌的實現細節

Delta事務日誌的實現主要是基於MVCC多版本控制協議實現。Delta 的 MVCC 算法保留多個數據副本,而不是立即替換包含正在更新或刪除的記錄的文件。

表的讀取:主要是通過使用事務日誌有選擇地選擇要處理的數據文件,確保他們一次只能看到表的一致快照。

表的寫入與修改:首先,樂觀地寫出新數據文件或修改現有數據文件的拷貝副本。然後,進行事務提交,通過向日志中添加新條目來創建表的最新原子版本。在此日誌條目中,他們記錄了要在邏輯上添加和刪除哪些數據文件,以及對有關表的其他元數據的更改。

在用户指定的保留期(默認為 7 天)後,過期的數據文件將被刪除。

  • Delta files
./_delta_log/00000000000000000001.json

Delta files是以原子性單位產生的文件(即沒提交一次commit事務),文件的命名是遞增的版本id, 它與checkpoints文件一起構成表中所有更改的日誌。

Delta files的json文件中會包含一組應用應用於前一個表版本的actions操作,每一個actions是以一個json組存儲與Delta files中。

其action行為主要有以下幾種:SetTransaction、AddFile、RemoveFile、Metadata、Protocol、CommitInfo。

  • Checkpoints
./_delta_log/00000000000000000010.checkpoint.parquet

checkpoints文件也存儲在 _delta_log 目錄中,可以為任何版本的表創建。檢查點包含在此版本之前的所有操作的完整回放,並刪除了無效操作。無效操作是那些已被後續操作取消的操作(例如刪除已添加的文件)。

默認情況下,參考實現每 10 次提交創建一個checkpoint。checkpoints文件名基於檢查點包含的表的版本。

  • 最後一個Checkpoint

Delta 事務日誌通常包含許多(例如 10,000+)文件。列出如此大的目錄可能會非常昂貴。最後一個checkpoint文件可以通過提供指向日誌末尾附近的指針來幫助降低構建表的最新快照的成本。

讀者可以通過查看 _delta_log/_last_checkpoint 文件來定位最近的檢查點,而不是列出整個目錄。

那麼接下來我們來看看json文件中的內容是什麼?下面我們撿幾個重要的展開看看。

Actions

  1. Metadata

元數據操作更改表的當前元數據。表的第一個版本必須包含元數據操作。隨後的元數據操作完全覆蓋表的當前元數據。

{
  "metaData":{
    "id":"af23c9d7-fff1-4a5a-a2c8-55c59bd782aa",
    "format":{"provider":"parquet","options":{}},
    "schemaString":"...",
    "partitionColumns":[],
    "configuration":{
      "appendOnly": "true"
    }
  }
}

Metadata中記錄當前表id, 表級別的file format, 分區列信息等。在schemaString存儲這columnMapping信息。使用列映射來避免任何列命名限制,並支持重命名和刪除列,而無需重寫所有數據。列映射有三種模式,按名稱和按id和none。

{
    "name" : "e",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "d",
          "type" : "integer",
          "nullable" : false,
          "metadata" : {
            "delta.columnMapping.id": 5,
            "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
          }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : {
      "delta.columnMapping.id": 4,
      "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
    }
  }
  1. ADD / Delete File

添加和刪除操作分別用於通過添加或刪除單個數據文件來修改表中的數據。

{
  "add": {
    "path":"date=2017-12-10/part-000...c000.gz.parquet",
    "partitionValues":{"date":"2017-12-10"},
    "size":841454,
    "modificationTime":1512909768000,
    "dataChange":true
    "stats":"{\\"numRecords\\":1,\\"minValues\\":{\\"val..."
  }
}

add中記錄添加文件的相對路徑,以及當前file所屬的分區信息,通過還包含了file的統計信息,包括min/max。

{
  "remove":{
    "path":"part-00001-9…..snappy.parquet",
    "deletionTimestamp":1515488792485,
    "dataChange":true
  }
}

刪除操作只包括一個時間戳,指示刪除發生的時間。文件的物理刪除可能會延遲進行在用户指定的過期時間之後。刪除操作應該作為邏輯刪除保持在表的狀態中,直到過期。當增量文件的創建時間戳超過添加到刪除操作時間戳的過期閾值時,邏輯刪除將過期。

  1. Transaction Identifiers
{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

事務標識符以appId版本對的形式存儲,其中appId是修改表的進程的唯一標識符,版本表示該應用程序取得了多大進展。該信息的原子記錄以及對錶的修改使這些外部系統能夠將其寫入到Delta表冪等中。

下面我們來總結對比下:

  1. Delta的實現和Spark深度綁定,目前只支持Spark計算引擎,Iceberg和Hudi都可以支持多種引擎。
  2. 目前Delta只支持COW形式,Iceberg和Hudi都支持部分MOR。
  3. 在實現方式上與Hudi, Iceberg大同小異,但是其事務日誌文件中只記錄了上一版本與當前版本的差分Action。如果要獲取某個commit的完整文件列表就需要把之前的差分Action進行重放。不過Delta引入checkpoint機制,當commit積累到一定數量,會生成一個checkpoint, 此時會刪除簡化無效的Action, 後續的讀取可以基於這個checkpoint開始重放。
  4. Delta可以生成較少的元數據文件,基於checkpoint機制和過期文件的刪除,減少了大量小文件的產生,但是並不能很好獲取某個commit的數據。相比Iceberg可能會產生大量的元數據文件,影響大數據量下的查詢性能,但也相應的增加文件組跳過的能力。

後續會再繼續解密下開源的付費功能Z-order的實現源碼。