Flink Table Store v0.2 應用場景和核心功能

語言: CN / TW / HK

分享嘉賓 :李勁鬆 阿里巴巴 技術專家

編輯整理:劉浩平 東北大學

出品平臺:DataFunTalk

導讀: 本文講分享Flink Table Store v0.2適用的應用場景,以及支撐這些應用場景的核心功能。 同時介紹Flink Table Store未來的規劃。

今天的主題圍繞以下四點展開:

  • 應用場景

  • 核心功能

  • 未來展望

  • 專案資訊

01

應用場景

首先了解一下 Flink Table Store v0.2的架構 (如圖 1) ,它首先是一個湖儲存,以低成本無服務的方式儲存大量資料。湖儲存中通過Manifest管理檔案,每個Bucket中是一個 LSM Tree。在湖儲存上也支援了和Kafka的整合,讓你一張表同時儲存離線和實時資料。

上游支援Streaming或Batch寫入資料,下游支援Flink Streaming、Flink Batch、Hive、Spark和Trino消費。湖儲存的資料儲存在DFS上,也可以使用Object Store、Cloud Storage來儲存。

圖1 Flink Table Store v0.2的架構

接下來分享v0.2主要面向的四個場景。

1. 場景一:離線數倉加速

第一個場景是離線數倉的加速(如圖 2),這個場景中是典型湖儲存的應用場景,支援Flink Streaming寫入,下游支援各種計算引擎的批量查詢(Batch Read)或OLAP查詢。

目前 Hudi、Iceberg 也具有以上能力,那 Flink Table Store v0.2 對比它們有什麼特點?

第一個特點 ,Flink Table Store 湖儲存面向Flink寫入,需要支援Flink Streaming SQL產生的實時更新的所有型別。包括主鍵更新、無主鍵更新和AppendOnly資料。

第二個特點 ,由於它面向Flink Streaming SQL,因此它支援實時更新。比如業界把Hive、Iceberg作為偏離線的數倉,Hudi作為近實時的數倉。而Table Store想要支援的是更快更大吞吐的實時更新。

Table Store需要哪些能力滿足以上兩個特點? 寫入讀取 兩個方面來說明。

在寫端,首先,所有的更新都應基於儲存自身而不應依賴Flink的狀態,這樣的好處是易用性非常高。由於儲存的更新不依賴寫入作業的狀態,寫入作業可以在流處理和批處理之間隨意切換;其次,儲存是基於非常高效的LSM的Sorted Merge,因此它的更新效能非常高。

對於讀端,基於LSM非常快速的Sorted Merge,可以實現高效的 MOR(Merge On Read),有序的合併開銷非常低,對比Iceberg或者Hive這種Copy On Write(COW)表的查詢時延差不多。同時,由於資料按照主鍵排序,相當於主鍵上有索引。如果有基於主鍵或部分主鍵的Filter或Range Filter 查詢,查詢速度會非常快。因為底層基於排序的查詢已經排除掉大量的檔案,點查最快可以在100毫秒返回。

圖 2 離線數倉加速

2. 場景二:Partial Update(COALESCE)

如圖3中的 CREATE TABLE 語句中定義了一個 pk,然後定義 merge-engine 為partial-update。圖3中INSERT語句寫入資料時merge兩張表,每張表更新的欄位不同,其中表Src1更新column_1,表Scr2更新column_2。這裡寫的方式與傳統的 Partial Update 有些區別,是將不需要更新的欄位設定為 NULL 。這裡的 Partial Update類似於SQL引擎中的COALESCE函式,非NULL欄位更新。

Partial Update適用於基於主鍵的大寬表的更新,比如實時更新10個表,它們基於相同主鍵更新不同欄位。在讀端可以進行實時批量查詢。(流消費還在開發中)

Table Store 在支援這個應用場景的特點與第一個場景類似,支援寫端和讀端的所有四個特點(如圖3)。

圖3  Partial Update

3. 場景三:預聚合 Rollup

由於Table Store是基於LSM面向更新的湖儲存,因此可以做一些更有趣的事,比如預聚合(如圖 4)。圖4中的SQL建表語句中定義了merge-engine為aggregation,分別對column_1和column_2做sum和max聚合操作,當寫入資料時就會自動的進行merge,實現與Flink流中使用agg函式類似的效果。

可以看到這兩個能力都有一定的限制,比 如agg函式的計算邏輯不能太複雜,如count distinct這種操作就無法實現。Flink Table Store與Flink Streaming作業比起來通用性較弱。那為什麼湖儲存還需要提供場景二和場景三的能力?因為在儲存側提供這些能力成本要低很多。在儲存側提供這種能力時不需要TTL,而Flink Streaming 作業就有State的 TTL,當Flink Streaming有錯誤的延遲事件資料時會導致錯誤的結果。而儲存側提供這種能力使得操作更寬泛。

圖 4 預聚合 Rollup

4. 場景四:實時數倉增強

前面三個場景主要面向湖儲存本身,而第四個場景面向實時數倉增強(如圖5)。

如圖5所示的SQL中指定Log System為kafka及其相關的配置資訊,從而使表具有兩種物理儲存形態——湖儲存和Log System。在Flink Streaming寫入時會雙寫兩個系統,且能通過offset處理好兩個儲存系統之間的一致性。傳統的實時數倉只有Kafka,且實時資料不可查,而 T able Store可以很方便的通過批處理引擎或者OLAP引擎查詢中間表的狀態,以及做資料印證和靈活的OLAP 。另一個能力是通過流處理引擎處理資料時可以實現Hybird的Backfill讀,也就是先讀取湖儲存,然後讀取Log Store。比如查詢一個月的資料等。這種能力與Kafka Tired Storage 類似,因為訊息引擎也在解決自身儲存能力不足的問題(不能無限的儲存)。訊息引擎一般有的TTL(Time-To-Live)retention指標來儲存最近某段時間內的資料,因此Kafka設計了Tired Storage實現資料儲存在DFS上。但是訊息佇列設計這種儲存存在一個問題,資料無法被各種計算引擎靈活的查詢。Table Store就解決了這個問題,可以讓外部計算引擎靈活的查詢資料。

那麼就會引出另外一個問題,要把Table Store當作Kafka的Tired Storage,就要保證讀資料的順序與獨立使用Kafka是一樣的。如果沒有定義PK和write-mode是AppendOnly模式,這種情況下無論Hybrid讀湖儲存還是Log System它讀到的資料順序是一樣的。 

圖5 實時數倉增強

以上是Table Store v0.2四個主要應用場景,那麼 它通過哪些能力來支撐這四個場景呢? 下面介紹一下它的核心功能。

02

核心功能

1. 湖儲存的結構

Table Store v0.1提供了湖儲存的能力,其湖儲存的特點有:

第一,提供 Snapshot級別的事務語義。

第二,支援物件儲存上的大規模資料儲存。

如圖6展示了v0.1在結構上對這兩個特點的支援,可以看到重要的細粒度的Meta資料都放在DFS上。

每個Snapshot相當於一次Commit,每個Snapshot更新的檔案存放在同樣位於DFS的Manifest中。但所有更新的檔案不能只記錄在一個Manifest中,因為當Table的資料達到TB甚至PB級時,這種量級的資料檔案自身的Meta就非常大。湖儲存通過分層的手段支援Manifest的增量更新(如圖6中的m0、m1等等),從而實現檔案Meta可更新。Table Store通過這種分層結構來支援大規模資料儲存,同時達到Snapshot級別的事務語義。

以下是Table Store的上層結構。

圖6 湖儲存結構

2. Table Store v0.1分割槽內部結構

在v0.1分割槽內部,使用者在定義表的時候需要設定其包含的Bucket數量。這樣每個分割槽中包含多個獨立的Bucket,資料通過Hash分佈到Bucket中。每個Bucket中都是一個支援更新的LSM Tree。由於LSM Tree是支援更新的資料結構,避免了單個Bucket 的資料儲存在一個檔案中。

如果Bucket內只有一個檔案,每次更新時需要對檔案中的資料全部重寫,當檔案中的資料達到GB量級時的代價就非常大。但是又不能把Bucket數量設定太多,當Bucket number的數量很大時每個Bucket都對應一個小檔案,此時對物件儲存或者DFS的壓力很大。

所以Table Store在Bucket中是LSM Tree的檔案結構,而不是單個檔案,以此來支援更新以及查詢效能的提升。

圖7 Table Store v0.1分割槽內部結構

3. Table Store的生產化

以上是Table Store v0.1提供的基本結構,在此基礎上Table Store v0.2 為了更接近生產化做了很多改進。 生產化需要哪些能力?主要包括API和生態兩個方面。

首先,在API方面提供了Catalog,如圖8中的Flink SQL建立一個CATALOG 指定它的type為Table Store。Catalog的元資料預設存在File System上,可通過metastore配置元資料的儲存位置,比如Hive,這樣就可以在hive中直接讀這些表。

Catalog的使用也比較簡單,直接USE CATELOG,然後再建立表。在建立表時你可以選擇性的提供Kafka的LogSystem資訊。

圖8 Table Store Catalog

其次,是Table Store的生態(如圖9)。

由於v0.1只支援Flink Batch,嚴重束縛了Table Store的生產可用性。v0.2將核心支援Hive、Spark和Trino這三個計算引擎。使用方式如圖9所示,對於Hive SQL如果已經配置Metastore的Catalog可以直接查;如果沒有配置Metastore則需要建立外表並指定LOACTION資訊才可以直接查詢。對於Spark SQL可以直接配置Catalog,進而看到Table Store的所有表。

圖9 生態

4. Table Store中Bucket的更新

前面提到,一個分割槽內Bucket的數量使固定的。建表時如果定義的Bucket數量太少,更新效能太差無法滿足吞吐量,Bucket數量太多導致小檔案太多導致查詢效能很差。

v0.2引入了Rescale的能力(如圖10),當Bucket數量太少時通過Rescale實現擴充套件。 Rescale的前提是最好更新當前或者未來的分割槽,而不影響老的分割槽。如果影響老的分割槽就意味著Rescale需要將全部資料重寫一遍,業務上是不允許這種操作的。Rescale 的設計是隻影響新分割槽,新的分割槽會使用更新後的Bucket個數,而老分割槽不動。如果當前分割槽的效能太低怎麼辦?需要暫停流寫作業,使用Batch作業Rescale當前分割槽,然後再恢復流寫作業,從而實現調整當前分割槽的Bucket數量。

圖10 Change Bucket

5. Append Only模式

前面也提到Table Store可以實現類似Kafka的Tiered Storage的能力,因此需要儲存側支援Append Only模式(如圖11)。 該模式的特點是:

  • 由於沒有合併和更新,所以寫入成本非常低,可以作為離線表使用。

  • 流讀的順序與輸入序一樣,提供與Kafka流讀相同的體驗;同時資料是可查詢的,可以通過不同的查詢引擎查詢所有的資料。

  • 提供自動Compaction功能,由於寫時會產生大量小檔案,需要通過Compaction合併小檔案,避免小文過多件導致的效能問題。

圖11 Append Only模式

03

未來展望

接下來分享一下Table Store未來的規劃及長期展望。

1. 滿足Flink SQL對儲存的需求

Table Store首要目的是滿足Flink SQL對儲存的需求。這些需求包括(如圖12):

①最基本的是對訊息佇列的需求。

②表資料實現OLAP可查的功能。

③支援Batch ETL的寫入和大規模Scan。

④在以上三個能力的基礎上 支援Dim Lookup ,也就是Flink Stream SQL中Dim Join的能力。這種點查無法達到Hbase那種低延遲毫秒級的點查,這裡的點查是基於Batch的點查即流計算中批次的點查。

有了以上四個能力就可以滿足Flink SQL對儲存的所有需求。

圖12 滿足Flink SQL對儲存的需求

2. 滿足不同Tradeoff的選擇

Table Store未來的目標是 能在新鮮度、成本、查詢延時(如圖13)這三個指標之間達到一個平衡。

成本包括伺服器、費用以及開發成本等;新鮮度是指資料從產生到查詢整個過程消耗的時間。查詢延時是指使用者的查詢耗時。Table Store的目標是讓使用者在這三個Tradeoff之間靈活的選擇。比如,當需要較短的查詢延時,就要降低對新鮮度低以及成本的要求。如果要更好的新鮮度,那麼資料準備的過程就不應太複雜且需要更高的成本。

圖13 滿足不同Tradeoff的選擇

3. Flink Table Store的架構

目前Table Store只是一個湖儲存,以後作為長期的一個架構如圖14所示,主要包括湖儲存、DFS、Log System三部分。後續的架構正在POC當中,包括在湖儲存基礎上支援加速能力的Service,達到Flink Streaming Pipeline基於Service實現流寫和流讀。同時儲存本身支援很強的OLAP查詢效能。由於實時Pipeline的成本很高,如果要查詢歷史資料比如一年的資料怎麼辦?Service作為湖儲存的加速,所有的資料都會週期性的儲存到湖儲存上,這樣就可以實現Batch Pipeline查詢,比如通過Hive、Spark、Flink Batch或者Presto來查詢一整年的資料。批處理只通過Metastore和湖儲存實現,不會干擾Service,所以不會帶來很大的成本。

整體上想通過這套架構在新鮮度、成本、查詢延時三個指標之間有一定的Tradeoff,在儲存所有的資料的同時滿足實時OLAP的能力。

圖14 Flink Table Store架構

4. Dim Join,存算分離

除了流讀流寫、批讀批寫,Flink Table Store也具備Dim Join的能力,即支援一定程度的點查(如圖15)。由於LSM支援類似點查的能力,Table Store類似於HBase的計算儲存分離的查詢,在某個節點,比如Dim Join的Task上會構建一個點查的Cache(如圖15)。其中的Cache具有從記憶體到本地磁碟,再到DFS的分層結構。

圖15 Dim Join,計算儲存分離

04

專案資訊

Flink Table Store是Apache Flink的子專案。 專案開源地址:

https://github.com/apache/flink-table-store

專案文件:

https://nightlies.apache.org/flink/flink-table-store -doc s-master/

郵件列表:

釘釘群如圖16所示。

圖16 專案資訊

v0.2已在2022年8月份釋出!

05

問答環節

Q:Flink Table Store如何管理資料許可權?有計劃支援庫表的許可權管理嗎?

A:在湖儲存中資料許可權依賴於檔案系統的許可權,目前Flink Table Store的許可權認證只有檔案級別的許可權,或者Hive Metastore的許可權認證,依賴底層儲存的許可權管理,目前沒有支援庫表的許可權管理。

Q:目前有對Hadoop元件,如Hive的Kerberos認證環境支援嗎?

A:目前通過Flink來支援Kerberos認證的環境,後續可能要做一些調整。

Q:對比Hudi、Iceberg等,Flink Table Store的優勢在什麼地方?

A:它們面向的場景不同,第一Flink Table Store的特點是不僅支援Update with PK還支援Update without PK。其二,Flink Table Store是面向更大吞吐更實時的更新,成本低,吞吐大。同時讀端有很好的MOR效能和主鍵索引加速能力,點查的效率非常高。另外,Flink Table Store更新是無狀態的,不依賴Flink SQL,這樣Flink可以隨時啟停。歡迎大家來測試。

Q:LSM也支援快照,是否可以做到像Iceberg的增量讀取?

A:可以的,從場景四可以看到,Log System(Kafka)是可選的,也可以不配置Kafka,直接增量讀取湖儲存。Flink Table Store提供的特點的是,有更多的順序保證,更好的流消費效能。比如在AppendOnly模式下增量讀可以保證輸入序讀取,它可以當作一個Queue使用,只是延時相對比較高。

Q:後期的路線圖是如何規劃的?  

A:從Flink Table Store的架構圖(圖14),目前v0.2的目標是把Lake Store做好。Service的核心在於能提供很好的新鮮度以及線上離線的整合能力,明年計劃釋出成熟的Service版本。

今天的分享就到這裡,謝謝大家。

在文末分享、點贊、在看,給個3連擊唄~

01 / 分享嘉賓

李勁鬆

阿里巴巴 技術專家

目前就職於阿里雲開源大資料,長期從事分散式流 / 批處理系統領域的工作,也對資料湖和 OLAP MPP 有一些研究。我是 Apache Beam / Flink / Iceberg 的 Committer,對底層排程、通訊機制、使用者模型、SQL 流批計算、儲存有一定了解。目前專注於 Flink Table Store 專案的開發,希望給 Flink 帶來一個最適合的儲存。

02 / 免費下載資料

03 / 報名看直播 免費領PPT

04 / 關於我們

DataFun: 專注於大資料、人工智慧技術應用的分享與交流。發起於2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公眾號 DataFunTalk 累計生產原創文章700+,百萬+閱讀,14萬+精準粉絲

  分享、點贊、在看 ,給個 3連擊 :point_down: