技術內幕 | 阿里雲EMR StarRocks 極速資料湖分析

語言: CN / TW / HK

作者:阿里雲智慧技術專家 周康,StarRocks Active Contributor 鄭志銓(本文為作者在 StarRocks Summit Asia 2022 上的分享)

為了能夠滿足更多使用者對於極速分析資料的需求,同時讓 StarRocks 強大的分析能力應用在更加廣泛的資料集上,阿里雲EMR OLAP 團隊與 StarRocks 社群在 2021 年就開始合作。

雙方聯手增強 StarRocks 的資料湖分析能力,使其不僅能夠分析儲存在 StarRocks 本地的資料,還能夠以同樣出色的表現分析儲存在 Apache Hive(以下簡稱 Hive)、Apache Iceberg(以下簡稱 Iceberg)和 Apache Hudi(以下簡稱 Hudi)等開源資料湖或資料倉庫的資料。

阿里雲EMR StarRocks 正是 StarRocks 授權阿里雲的一款開源 OLAP 產品,致力於構建極速統一分析體驗,滿足企業使用者的多種資料分析場景。本文將主要闡釋阿里雲EMR StarRocks 在資料湖方向已經做過的工作、實際的效果體現,以及 StarRocks 在資料湖分析方向的規劃。

 

#01

阿里雲EMR StarRocks 整體架構

在儲存層,有阿里雲的物件儲存 OSS 作為資料湖的統一儲存,可以儲存常見的 Parquet/ORC/CSV 等檔案格式。 

在湖管理與優化層,EMR 會通過資料湖構建(DLF),去進行整體資料湖的元資料管理和一體化構建。同時在資料湖分析實踐過程中,物件儲存相對於傳統的 Apache Hadoop(以下簡稱 Hadoop),HDFS 會存在一些效能問題。為了解決這個問題,在阿里雲EMR,我們自研了 Jindo FS 系統,以便對資料湖儲存層訪問進行加速和優化。

同時針對常見的資料湖儲存格式,包括 Parquet、ORC 的格式。比如像 Hudi、Iceberg,在索引統計版本資訊、版本維護、小檔案合併以及生命週期等方面,都做了優化和增強。有了儲存以及針對資料庫管理的優化等工作,就可以在這之上去構建分析層,也就是資料開發與治理層。

在資料開發與治理層,StarRocks 在阿里雲EMR 分為兩個角色,一部分是固定節點,一部分是彈性節點。有了 StarRocks 資料湖分析引擎之後,就可以去對接 EMR 上開源的 Apache Airflow(以下簡稱 Airflow)以及 Jupyter 等,也可以對接阿里雲的 Dataworks,來做資料開發和排程。

 

1、StarRocks 在 Iceberg 的實現

StarRocks 主要包含 FE 和 BE 兩個元件,兩者之間再通過 RPC 進行通訊,以實現查詢的排程和分發、結果彙總等一系列工作。

為了支援 Iceberg 的資料湖分析,我們在 FE 側以及 BE 側都做了大量的改造。首先是 FE 側,增加了外表型別 IcebergTable;在執行計劃生成之後,通過修改 RPC 協議(Thrift 協議),把執行計劃相關資訊傳送給 BE;在 BE 側,再通過通過 HDFS scanner 來支援實際的資料掃描。

在做了上面這一系列的研發工作之後,我們基於 TPCH 和 Trino 做了效能對比測試。可以看到,StarRocks 相對於 Trino 效能表現非常突出。

那麼為什麼 StarRocks 相比 Trino 的效能要好這麼多?

 

2、StarRocks 的效能分析

藉助 StarRocks 已有的全面向量化執行引擎、全新的 CBO 優化器等,這些能力能夠極大地提升我們在單表以及多表層面的效能表現。在這個基礎之上,針對資料湖分析的場景,我們也增加了新的優化規則。 

​​​​​​​​

首先在優化規則的方面,舉幾個簡單的例子,比如常見的謂詞下推,通過支援謂詞下推,能夠把 col_a>x 等謂詞條件下推到 scan 運算元。這樣實際在掃描資料時,就能夠減少掃描的資料量。

如果沒有做謂詞下推(如上圖左上角),通過整體掃描,會把資料先掃上來,然後再通過引擎本身上游的一些 Filter 運算元去做資料的過濾。這會帶來很大的 IO 開銷。

為了進一步減少掃描資料量,我們也支援了分割槽裁剪,詳見上圖中間區域。在沒有做優化之前,需要去掃描三個分割槽。通過分割槽裁剪的優化,在 FE 側就可以把不需要的兩個分割槽裁剪掉。只需要告訴 BE 掃剩餘一個分割槽的資料。在 BE 我們也支援了 Global Runtime Filter,針對 Join 這種場景,能夠有比較大的效能提升。藉助於 StarRocks 優異的執行引擎,就能夠在 CPU 密集型的資料湖分析場景下有很好的效能表現。但在一些實際場景落地過程中,基於 FE 側的一些優化規則,或者是前面提到的全域性 Runtime Filter 還不能夠完全減少 IO開銷。

如何降低 IO 開銷非常關鍵。在大部分情況下,資料湖中需要分析的資料和計算節點,基本上不會在同一臺物理機器上。那麼在分析過程中,我們就面臨著非常大的網路 IO 挑戰,為此 StarRocks 社群針對 IO 方面做了非常多的優化,包括延遲物化、IO 合併、支援 Native Parquet/Orc Reader、針對物件儲存的 SDK 優化等工作。

接下來,我通過兩個例子展開介紹實際的優化細節是怎麼實現的。

(1)IO 合併

在沒有 IO 合併以前,若要讀取一個 Parquet 檔案相關的資料,首先需要基於 FE 側發給 BE 的掃描資料路徑去構建針對檔案級別的 File Reader,在 FE 側規劃的時候,也能告知實際掃哪幾列資料。在實際客戶落地過程中遇到小檔案導致 IO 耗時高的問題。

針對於 ColumnReader,假設一個 SQL 同時要讀取三列,有可能有兩列的資料量會比較小。這個時候可以對這兩列 IO 合併。比如以前要通過兩次的網路 IO,現在可以一次就把這兩列的資料讀取。針對於 Row Group ,也可以對小的 Row Group 做 IO 的合併,從而減少 IO 的次數。

​​​​​​​​

對於檔案本身,如果這個檔案特別小,我們也支援一次把檔案載入到記憶體中。實際在測試過程中,在這種小 IO 特別多的場景下,會有一個非常明顯的提升。

(2)延遲物化

什麼是延遲物化?延遲物化需要解決什麼問題?

在沒有延遲物化之前,回到 Parquet 的實現原理,比如要讀取三列,就需要把這三列同時給讀上來,然後再去運用一些謂詞,再返回給上游運算元。這裡可以看到一個明顯的問題,就是假設沒有針對第三列的謂詞,那其實第三列不需要把所有資料都讀進來。

可以看上圖左邊部分,因為 SQL 針對於前兩列 c0 和 c1 是有謂詞的。這個時候會先把這兩列資料讀取到記憶體。然後基於這兩列構建 Selection mask,這兩個 Mask 叫標記陣列。有了這兩個標記陣列之後,會把第三列定義為一個 Lazy column。

拿到了前兩列的標記陣列之後,基於這兩個標記陣列去構建一個新的過濾標記陣列。然後再基於這個新的過濾標記陣列讀取 Lazy column。那在實際使用過程中,Lazy column 裡邊可能會有多列,這樣能夠極大地減少很多不必要的 IO 讀取。因為有了前面的引擎賦能,包括全面向量化、CBO 優化器以及針對 IO 本身的優化資料湖分析,在測試和實際落地的過程中已經有一個很好的效能表現。

在實踐過程中,另外一個問題就是元資料訪問。在資料湖場景之下,對檔案的 List 操作可能會成為整個網路訪問的瓶頸。為了解決這個問題,在 StarRocks 的 FE 側設計了一套完整的細粒度智慧快取方案,能夠快取 Hive 的分割槽資訊,以及檔案資訊。

​​​​​​​​

在設計快取中,快取更新是一個比較大的挑戰。基於事件驅動的模式,能夠解決快取更新的問題,在保證使用者查詢的效能基礎之上,也能夠有非常好的使用體驗,而不需要手動更新快取。同時,為了加速查詢的規劃和排程,也支援了統計資訊的快取。

 

3、StarRocks的生態分析

​​​​​​​​

早期版本中,如果要支援新的資料來源需要做很多冗餘的開發,開發者需要對很多其他模組有深入的理解,用於使用的時候也需要去建立外表。如何解決這個問題呢?我們的解決思路是設計一套全新的 Connector 框架。

在以前的版本中,假設使用者有一個庫包含一兩百張表,需要在 StarRocks 上去分析,那麼他需要手動建立 100 多張的外表,然後通過 FE 管理元資料,再讓使用者去使用。如果說使用者做了一些 Schema change,外表可能又得重建,就極大增加了使用負擔。

在 Connector 框架設計中我們引入了 Catalog 的概念,使用者不在需要手動建立外表。比如說現在有 Hive Catalog、Iceberg Catalog,使用者不需要去建立外表,只需要建立一個 Catalog,就能實時地獲取到表的元資料資訊。我們已經對 Hive、Iceberg、Hudi 做了完整的支援。同時在 EMR 產品生態裡也已經整合好了前面提到的元資料管理的 DLF 以及 OSS、 Max Compute 等產品。

 

4、StarRocks的彈性分析

前面在做產品整體介紹的時候,提到了我們有一個比較關鍵的產品特性是彈性。彈性是怎麼實現的呢?其實最核心的解決方案就是在 StarRocks 支援了 Compute Node(以下簡稱 CN)。下圖左邊部分就是一個固定的 StarRocks 叢集,這些固定的 BE 節點都有實際的 SSD 儲存。

​​​​​​​​

 綠色部分是 CN。CN 和 BE 共享同一套執行引擎程式碼,是一個無狀態的節點。CN 可以部署在 K8S 上,資料可以儲存在物件儲存或 HDFS 上。通過 K8S HPA 的能力,在叢集負載高的時候動態擴容 CN,在叢集負載低的時候縮容。

​​​​​​​​

經過上面的改造,EMR StarRocks 能夠支援彈性伸縮,從而支援最大程度地降本。有了彈性之後,我們還需要解決另一個問題,那就是資源隔離。資料湖上的查詢 workload 通常多種多樣,有直接對接 BI 出報表的,也有分析師查詢明細的 Ad-Hoc 等等。通常使用者都希望通過軟性的隔離,而不是物理隔離,來實現小租戶資源的彈性隔離。例如在叢集資源空閒的時候,允許查詢充分利用叢集資源,但是當叢集資源緊張時,各個租戶按照自己的資源限制使用資源。因此 StarRocks 還實現了基於 ResourceGroup 的資源隔離,這樣使用者可以從使用者、查詢和 IP 等層面,限制其對 CPU/MEM/IO 等資源的使用。

通過對效能優化、生態整合彈性等幾方面的介紹,我們知道阿里雲EMR StarRocks 在資料湖分析場景具體是怎麼做的、做到了什麼程度。歸納起來,阿里雲EMR StarRocks 資料分析的核心就是“極速”、“統一”兩個關鍵詞。

​​​​​​​​

極速:相對於 Trino 有數倍的效能提升,上圖這一頁的測試資料是針對於 Hudi。

統一:支援多種多樣的資料來源,包括上圖沒有提到的 JDBC 資料來源。目前從 Trino 遷移到 StarRocks 已經有不少落地實踐,基本可以實現無痛的遷移。

 

#03

阿里雲EMR StarRocks資料湖規劃

通過不斷與使用者交流探討,我們認為,資料湖分析至少達到以下四點要求,才能成為一項大眾化的資料分析技術:

1. Single Source of Truth 。只有一份資料,使用者無需顯示地進行資料流轉。

2. 高效能。接近秒級別,甚至亞秒級的查詢延時。

3. 彈性。分解儲存和計算架構。

4. 經濟高效。按需擴充套件和擴充套件。

​​​​​​​​

 當前阻礙資料湖分析達到上述四點要求的情況有以下三種:

1. 資料湖儲存系統普遍存在 IO 效能差的問題,無法滿足使用者對於低延遲查詢的要求。

2. 資料湖、資料倉界限分明。通常為了加速資料湖查詢,我們還需要在其上去搭一層資料倉,破壞了 Single Source of Truth 的原則。

3. 複雜的資料棧結構使我們無法保證彈性、高性價比以及易用性。

經過多次思考、開放討論以及仔細論證,我們提出了資料湖分析的新方式,希望通過資料湖分析的新方式攻克以上難題、達到理想的資料湖分析狀態。

​​​​​​​​

我們認為,資料湖分析的新方式等於快取+物化檢視。

由於資料湖儲存系統包括 OSS 等,通常 IO 效能都比較差,導致資料湖分析的瓶頸通常落在 Scan 資料上。

為了能夠進一步提升資料湖分析的效能,我們希望能夠利用本地磁碟或記憶體快取這些資料加速 IO 效能,使遠端儲存不再成為效能的瓶頸。引入快取對於使用者來說是透明的,使用者無需額外的運維工作就能夠享受到快取加速的好處。

相比於遠端儲存,本地磁碟或記憶體的價格一般都比較昂貴。我們希望好鋼用在刀刃上:只有使用者分析所需要用到的列資料才會進入到快取當中來,並且對於逐漸變冷的資料,我們會將其自動淘汰掉,從而提高快取的空間利用率。

類似於 CPU 的快取架構,我們也採用分級快取的策略。第一級是記憶體,第二級是本地磁碟,對於快取到記憶體的極熱資料,所有的讀取都能夠直接引用快取本身的記憶體,無需進行記憶體拷貝,在資料不斷更新的場景下,新增資料通常會導致 Cache miss,從而導致查詢延遲出現抖動。

​​​​​​​​

目前我們已經做了一些 POC。POC 顯示,在 SSB 多表效能測試的情況下,快取的效能比不快取快了三倍以上,並且已經基本接近 StarRocks 本地表。快取幫助我們保證 Single Source of Truth 的同時達到高效能,由於快取的特性,使用者可以真正做到彈性伸縮、cost effective。對於延遲敏感的場景,提高快取空間來降低查詢延遲。對於延遲不敏感的場景,減少或不使用快取,從而節約成本。

使用者通常希望對資料進一步加工、預聚合或建模,使其進一步滿足業務對資料分析的效能和質量要求,同時也能夠節省重複計算的開銷。然而不管是 Lambda 架構還是 Kappa 架構,使用者都需要搭建複雜的資料棧,用於進一步加工資料湖上的資料。同時使用者還需要分別維護元資料和加工後的多份資料,處理資料之間的一致性問題。

為了滿足使用者對資料加工、建模的需求,進一步融合湖和倉,我們將為使用者帶來更加強大的物化檢視能力解決上述問題。

首先,物化檢視通過 SQL 定義,資料的加工和建模變得極其簡單。其次,物化檢視能夠融合不同資料的元資料,對外提供一個統一的檢視,使用者無需改寫查詢 SQL 即可做到查詢自動路由透明加速。StarRocks 的檢視支援實時增量更新,為使用者提供更實時的分析能力。最後,物化檢視作為 StarRocks 的原生能力,極大地降低了運維成本。通過物化檢視,資料湖能夠真正做到 Single Source of Truth,幫助使用者更加簡單地在資料湖上進行資料的加工建模,打破了湖和倉的次元壁,簡化整個資料棧的架構。

 

#04

總結和展望

StarRocks 資料湖分析的核心是:極速、統一、簡單、易用。

通過 Connector、資料 Catalogs,資料來源的接入變得極其簡單。通過快取,資料湖儲存系統的 IO 效能將不再成為瓶頸。通過物化檢視,湖、倉資料的流轉更加自然,湖、倉檢視一致,查詢可以透明加速,資料棧的架構變得更加簡約。最後藉助雲上和 K8S 的彈效能力,StarRocks 資料湖分析能夠做到真正的彈性、cost effective。 

 

關於 StarRocks 

面世兩年多來,StarRocks 一直專注打造世界頂級的新一代極速全場景 MPP 資料庫,幫助企業建立“極速統一”的資料分析新正規化,助力企業全面數字化經營。

當前已經幫助騰訊、攜程、順豐、Airbnb 、滴滴、京東、眾安保險等超過 170 家大型使用者構建了全新的資料分析能力,生產環境中穩定執行的 StarRocks 伺服器數目達數千臺。 

2021 年 9 月,StarRocks 原始碼開放,在 GitHub 上的星數已超 3600 個。StarRocks 的全球社群飛速成長,至今已有超百位貢獻者,社群使用者突破 7000 人,吸引幾十家國內外行業頭部企業參與共建。