[LakeHouse] Delta Lake全部開源,聊聊Delta的實現架構

語言: CN / TW / HK

剛剛結束的Data + AI summit上,Databricks宣佈將Delta Lake全部開源。

目前在LakeHouse的市場上國內有Hudi,國外有Iceberg, Delta Lake社群正被他們衝擊著,這次Delta Lake的全部開源不管是急病亂投醫,還是絕地反擊我們暫不討論。今天我們主要來了解了Delta Lake是如何實現的。

Delta Lake的誕生

在2017年,Delta Lake 橫空出世,它主打的概念是湖倉一體,最初只開放給付費使用者使用。在2019年時,為提高其市場的佔用份額和影響力,將其進行部分開源。

Delta Lake建立之初的定位主要是為解決雲端儲存中很難實現 ACID 事務和高效能的問題。

  1. 更新不是原子操作,因此查詢不是隔離的,那麼在多物件的更新中,reader將可以查詢到部分的更新,某個物件更新失敗後回滾需要整體回滾。
  2. 在大型表的雲端儲存中進行元資料操作成本很高。例如parquet檔案的footer中包含min/max統計資訊可以幫助reader進行選擇性的讀取,在HDFS上讀取這樣的頁尾可能需要幾毫秒,但是在雲端儲存上需要更高的讀取延時。
  3. 物件儲存上的list操作效能非常差。

為了解決上面的問題,設計並實現了基於雲端儲存的ACID表儲存層--Delta Lake。

Delta Lake的實現思想也很簡單:使用儲存在雲物件儲存中的預寫日誌,以ACID的方式來管理維護Delta表中的資訊。

那麼Delta Lake是如何解決上面的儲存層問題呢?我列舉了如下幾個重要的特性:

  1. 時間旅行,允許使用者查詢時間點的快照,也可以根據時間點進行回滾。
  2. Upsert、Delete和Merge操作,可以有效的重寫物件,支援流式更新操作。
  3. 高效的流式IO, 通過流式操作將小物件寫入表中,並以事務的方式進行合併更新,同時還支援增量消費。
  4. 自動的資料佈局優化,可以自動的優化表中的物件大小,並將資料記錄進行聚類。
  5. 支援schema進化,支援表的schema更改但不用重寫他們。

Delta Lake的儲存架構

Delta Lake 的資料儲存原理其實很簡單。它通過 Partition Directories 儲存資料,資料格式推薦為 Parquet,然後通過預寫的 Transaction Log (事務日誌)記錄的表版本(Table Version) 和變更歷史,以維護歷史版本資料。

Delta Lake中的一些表級的操作,例如更新元資料、更新表名、變更 Schema、增加或刪除Partition、新增或者移除檔案,都會以日誌的形式將所有的操作儲存在表中。

上圖展示了Delta中資料的組織形式。資料和事務日誌都被儲存在表級的目錄下,其中資料以傳統的Hive分割槽目錄的方式儲存,事務日誌被儲存在_delta_log的目錄下。

  1. Delta每次事務commit都會產生一個json的元資料檔案,檔案內容包括本次commit做的所有action,比如AddFile/RemoveFile,也包括對schema的修改等等;
  2. 每產生一個新的json檔案就會產生一個新的Delta的snapshot,snapshot的版本即該json檔案中的數字,該數字必須是連續自增,Delta的某個版本的snapshot是通過順序回放所有小於等於該snapshot版本號的所有json檔案得到;
  3. Delta Lake會以一定的頻率做checkpoint,checkpoint以Parquet的格式儲存,目的是為了便於使用Spark並行進行向量化處理。
  4. delta_log 子目錄下還包含一個last_checkpoint檔案指向最新的checkpoint,從而在日誌操作時可以快速找到最新的checkpoint。

從上面的元資料結構可以看出,Delta和Hudi和Iceberg其實是大同小異。

那麼Delta基於事務日誌實現的細節又是怎樣的呢?

Delta事務日誌的實現細節

Delta事務日誌的實現主要是基於MVCC多版本控制協議實現。Delta 的 MVCC 演算法保留多個數據副本,而不是立即替換包含正在更新或刪除的記錄的檔案。

表的讀取:主要是通過使用事務日誌有選擇地選擇要處理的資料檔案,確保他們一次只能看到表的一致快照。

表的寫入與修改:首先,樂觀地寫出新資料檔案或修改現有資料檔案的拷貝副本。然後,進行事務提交,通過向日志中新增新條目來建立表的最新原子版本。在此日誌條目中,他們記錄了要在邏輯上新增和刪除哪些資料檔案,以及對有關表的其他元資料的更改。

在使用者指定的保留期(預設為 7 天)後,過期的資料檔案將被刪除。

  • Delta files
./_delta_log/00000000000000000001.json

Delta files是以原子性單位產生的檔案(即沒提交一次commit事務),檔案的命名是遞增的版本id, 它與checkpoints檔案一起構成表中所有更改的日誌。

Delta files的json檔案中會包含一組應用應用於前一個表版本的actions操作,每一個actions是以一個json組儲存與Delta files中。

其action行為主要有以下幾種:SetTransaction、AddFile、RemoveFile、Metadata、Protocol、CommitInfo。

  • Checkpoints
./_delta_log/00000000000000000010.checkpoint.parquet

checkpoints檔案也儲存在 _delta_log 目錄中,可以為任何版本的表建立。檢查點包含在此版本之前的所有操作的完整回放,並刪除了無效操作。無效操作是那些已被後續操作取消的操作(例如刪除已新增的檔案)。

預設情況下,參考實現每 10 次提交建立一個checkpoint。checkpoints檔名基於檢查點包含的表的版本。

  • 最後一個Checkpoint

Delta 事務日誌通常包含許多(例如 10,000+)檔案。列出如此大的目錄可能會非常昂貴。最後一個checkpoint檔案可以通過提供指向日誌末尾附近的指標來幫助降低構建表的最新快照的成本。

讀者可以通過檢視 _delta_log/_last_checkpoint 檔案來定位最近的檢查點,而不是列出整個目錄。

那麼接下來我們來看看json檔案中的內容是什麼?下面我們撿幾個重要的展開看看。

Actions

  1. Metadata

元資料操作更改表的當前元資料。表的第一個版本必須包含元資料操作。隨後的元資料操作完全覆蓋表的當前元資料。

{
  "metaData":{
    "id":"af23c9d7-fff1-4a5a-a2c8-55c59bd782aa",
    "format":{"provider":"parquet","options":{}},
    "schemaString":"...",
    "partitionColumns":[],
    "configuration":{
      "appendOnly": "true"
    }
  }
}

Metadata中記錄當前表id, 表級別的file format, 分割槽列資訊等。在schemaString儲存這columnMapping資訊。使用列對映來避免任何列命名限制,並支援重新命名和刪除列,而無需重寫所有資料。列對映有三種模式,按名稱和按id和none。

{
    "name" : "e",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "d",
          "type" : "integer",
          "nullable" : false,
          "metadata" : {
            "delta.columnMapping.id": 5,
            "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
          }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : {
      "delta.columnMapping.id": 4,
      "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
    }
  }
  1. ADD / Delete File

新增和刪除操作分別用於通過新增或刪除單個數據檔案來修改表中的資料。

{
  "add": {
    "path":"date=2017-12-10/part-000...c000.gz.parquet",
    "partitionValues":{"date":"2017-12-10"},
    "size":841454,
    "modificationTime":1512909768000,
    "dataChange":true
    "stats":"{\\"numRecords\\":1,\\"minValues\\":{\\"val..."
  }
}

add中記錄新增檔案的相對路徑,以及當前file所屬的分割槽資訊,通過還包含了file的統計資訊,包括min/max。

{
  "remove":{
    "path":"part-00001-9…..snappy.parquet",
    "deletionTimestamp":1515488792485,
    "dataChange":true
  }
}

刪除操作只包括一個時間戳,指示刪除發生的時間。檔案的物理刪除可能會延遲進行在使用者指定的過期時間之後。刪除操作應該作為邏輯刪除保持在表的狀態中,直到過期。當增量檔案的建立時間戳超過新增到刪除操作時間戳的過期閾值時,邏輯刪除將過期。

  1. Transaction Identifiers
{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

事務識別符號以appId版本對的形式儲存,其中appId是修改表的程序的唯一識別符號,版本表示該應用程式取得了多大進展。該資訊的原子記錄以及對錶的修改使這些外部系統能夠將其寫入到Delta表冪等中。

下面我們來總結對比下:

  1. Delta的實現和Spark深度繫結,目前只支援Spark計算引擎,Iceberg和Hudi都可以支援多種引擎。
  2. 目前Delta只支援COW形式,Iceberg和Hudi都支援部分MOR。
  3. 在實現方式上與Hudi, Iceberg大同小異,但是其事務日誌檔案中只記錄了上一版本與當前版本的差分Action。如果要獲取某個commit的完整檔案列表就需要把之前的差分Action進行重放。不過Delta引入checkpoint機制,當commit積累到一定數量,會生成一個checkpoint, 此時會刪除簡化無效的Action, 後續的讀取可以基於這個checkpoint開始重放。
  4. Delta可以生成較少的元資料檔案,基於checkpoint機制和過期檔案的刪除,減少了大量小檔案的產生,但是並不能很好獲取某個commit的資料。相比Iceberg可能會產生大量的元資料檔案,影響大資料量下的查詢效能,但也相應的增加檔案組跳過的能力。

後續會再繼續解密下開源的付費功能Z-order的實現原始碼。