資料湖 | 如何打造一款極速資料湖分析引擎

語言: CN / TW / HK

作者

阿里雲 EMR 開源大資料 OLAP 團隊  

StarRocks 社群資料湖分析團隊

隨著數字產業化和產業數字化成為經濟驅動的重要動力,企業的資料分析場景越來越豐富,對資料分析架構的要求也越來越高。新的資料分析場景催生了新的需求,主要包括三個方面:

  • 使用者希望用更加低廉的成本,更加實時的方式匯入並存儲任何數量的關係資料資料(例如,來自業務線應用程式的運營資料庫和資料)和非關係資料(例如,來自移動應用程式、IoT 裝置和社交媒體的運營資料庫和資料)

  • 使用者希望自己的資料資產受到嚴密的保護

  • 使用者希望資料分析的速度變得更快、更靈活、更實時 

資料湖的出現很好的滿足了使用者的前兩個需求,它允許使用者匯入任何數量的實時獲得的資料。使用者可以從多個來源收集資料,並以其原始形式儲存到資料湖中。資料湖擁有極高的水平擴充套件能力,使得使用者能夠儲存任何規模的資料。同時其底層通常使用廉價的儲存方案,使得使用者儲存資料的成本大大降低。資料湖通過敏感資料識別、分級分類、隱私保護、資源許可權控制、資料加密傳輸、加密儲存、資料風險識別以及合規審計等措施,幫助使用者建立安全預警機制,增強整體安全防護能力,讓資料可用不可得和安全合規。

為了進一步滿足使用者對於資料湖分析的要求,我們需要一套適用於資料湖的分析引擎,能夠在更短的時間內從更多來源利用更多資料,並使使用者能夠以不同方式協同處理和分析資料,從而做出更好、更快的決策。本篇文章將向讀者詳細揭祕這樣一套資料湖分析引擎的關鍵技術,並通過StarRocks 來幫助使用者進一步理解系統的架構。

之後我們會繼續發表兩篇文章,來更詳細地介紹極速資料湖分析引擎的核心和使用案例:

  • 程式碼走讀篇: 通過走讀 StarRocks 這個開源分析型資料庫核心的關鍵資料結構和演算法,幫助讀者進一步理解極速資料湖分析引擎的原理和具體實現。

  • Case Study 篇: 介紹大型企業如何使用 StarRocks 在資料湖上實時且靈活的洞察資料的價值,從而幫助業務進行更好的決策,幫助讀者進一步理解理論是如何在實際場景落地的。

什麼是資料湖

什麼是資料湖,根據 Wikipedia 的定義,“A data lake is a system or repository of data stored in its natural/raw format, usually object blobs or files”。通俗來說可以將資料湖理解為在廉價的物件儲存或分散式檔案系統之上包了一層,使這些儲存系統中離散的 object 或者 file 結合在一起對外展現出一個統一的語義,例如關係型資料庫常見的“表”語義等。

瞭解完資料湖的定義之後,我們自然而然地想知道資料湖能為我們提供什麼獨特的能力,我們為什麼要使用資料湖?

在資料湖這個概念出來之前,已經有很多企業或組織大量使用 HDFS 或者 S3 來存放業務日常運作中產生的各式各樣的資料(例如一個製作 APP 的公司可能會希望將使用者所產生的點選事件事無鉅細的記錄)。因為這些資料的價值不一定能夠在短時間內被發現,所以找一個廉價的儲存系統將它們暫存,期待在將來的一天這些資料能派上用場的時候再從中將有價值的資訊提取出來。然而 HDFS 和 S3 對外提供的語義畢竟比較單一(HDFS 對外提供檔案的語義,S3對外提供物件的語義),隨著時間的推移工程師們可能都無法回答他們到底在這裡面儲存了些什麼資料。為了防止後續使用資料的時候必須將資料一一解析才能理解資料的含義,聰明的工程師想到將定義一致的資料組織在一起,然後再用額外的資料來描述這些資料,這些額外的資料被稱之為“元”資料,因為他們是描述資料的資料。這樣後續通過解析元資料就能夠回答這些資料的具體含義。這就是資料湖最原始的作用。

隨著使用者對於資料質量的要求越來越高,資料湖開始豐富其他能力。例如為使用者提供類似資料庫的 ACID 語義,幫助使用者在持續寫入資料的過程中能夠拿到 point-in-time 的檢視,防止讀取資料過程中出現各種錯誤。或者是提供使用者更高效能的資料匯入能力等,發展到現在,資料湖已經從單純的元資料管理變成現在擁有更加豐富,更加類似資料庫的語義了。

用一句不太準確的話描述資料湖,就是一個儲存成本更廉價的“AP 資料庫”。但是資料湖僅僅提供資料儲存和組織的能力,一個完整的資料庫不僅要有資料儲存的能力,還需要有資料分析能力。因此怎麼為資料湖打造一款高效的分析引擎,為使用者提供洞察資料的能力,將是本文所要重點闡述的部分。下面通過如下幾個章節一起逐步拆解一款現代的 OLAP 分析引擎的內部構造和實現:

  • 怎麼在資料湖上進行極速分析

  • 現代資料湖分析引擎的架構

怎麼在資料湖上進行極速分析?

從這一節開始,讓我們開始回到資料庫課程,一個用於資料湖的分析引擎和一個用於資料庫的分析引擎在架構上別無二致,通常我們認為都會分為下面幾個部分:

  • Parser:將使用者輸入的查詢語句解析成一棵抽象語法樹

  • Analyzer:分析查詢語句的語法和語義是否正確,符合定義

  • Optimizer:為查詢生成效能更高、代價更低的物理查詢計劃

  • Execution Engine:執行物理查詢計劃,收集並返回查詢結果

對於一個數據湖分析引擎而言,Optimizer 和 Execution Engine 是影響其效能兩個核心模組,下面我們將從三個維度入手,逐一拆解這兩個模組的核心技術原理,並通過不同技術方案的對比,幫助讀者理解一個現代的資料湖分析引擎的始末。

RBO vs CBO

基本上來講,優化器的工作就是對給定的一個查詢,生成查詢代價最低(或者相對較低)的執行計劃。不同的執行計劃性能會有成千上萬倍的差距,查詢越複雜,資料量越大,查詢優化越重要。

Rule Based Optimization (RBO) 是傳統分析引擎常用的優化策略。RBO 的本質是核心是基於關係代數的等價變換,通過一套預先制定好的規則來變換查詢,從而獲得代價更低的執行計劃。常見的 RBO 規則謂詞下推、Limit 下推、常量摺疊等。在 RBO 中,有著一套嚴格的使用規則,只要你按照規則去寫查詢語句,無論資料表中的內容怎樣,生成的執行計劃都是固定的。但是在實際的業務環境中,資料的量級會嚴重影響查詢的效能,而 RBO 是沒法通過這些資訊來獲取更優的執行計劃。

為了解決 RBO 的侷限性,Cost Based Optimization (CBO) 的優化策略應運而生。CBO 通過收集資料的統計資訊來估算執行計劃的代價,這些統計資訊包括資料集的大小,列的數量和列的基數等資訊。舉個例子,假設我們現在有三張表 A,B 和 C,在進行 A join B join C 的查詢時如果沒有對應的統計資訊我們是無法判斷不同 join 的執行順序代價上的差異。如果我們收集到這三張表的統計資訊,發現 A 表和 B 表的資料量都是 1M 行,但是 C 表的 資料量僅為 10 行,那麼通過先執行 B join C 可以大大減少中間結果的資料量,這在沒有統計資訊的情況下基本不可能判斷。

隨著查詢複雜度的增加,執行計劃的狀態空間會變的非常巨大。刷過演算法題的小夥伴都知道,一旦狀態空間非常大,通過暴力搜尋的方式是不可能 AC 的,這時候一個好的搜尋演算法格外重要。通常 CBO 使用動態規劃演算法來得到最優解,並且減少重複計運算元空間的代價。當狀態空間達到一定程度之後,我們只能選擇貪心演算法或者其他一些啟發式演算法來得到區域性最優。本質上搜索演算法是一種在搜尋時間和結果質量做 trade-off 的方法。

(常見 CBO 實現架構)

Record Oriented vs Block Oriented

執行計劃可以認為是一串 operator(關係代數的運算子)首尾相連串起來的執行流,前一個 operator 的 output 是下一個 operator 的 input。傳統的分析引擎是 Row Oriented 的,也就是說 operator 的 output 和 input 是一行一行的資料。

舉一個簡單的例子,假設我們有下面一個表和查詢:

CREATE TABLE t (n int, m int, o int, p int); 


SELECT o FROM t WHERE m < n + 1;

例子來源: GitHub - jordanlewis/exectoy

上述查詢語句展開為執行計劃的時候大致如下圖所示: 

通常情況下,在 Row Oriented 的模型中,執行計劃的執行過程可以用如下偽碼錶示: 

next: 
for:
row = source.next()
if filterExpr.Eval(row):
// return a new row containing just column o
returnedRow row
for col in selectedCols:
returnedRow.append(row[col])
return returnedRow

根據 DBMSs On A Modern Processor: Where Does Time Go? 的評估,這種執行方式存在大量的 L2 data stalls 和 L1 I-cache stalls、分支預測的效率低等問題。

隨著磁碟等硬體技術的蓬勃發展,各種通過 CPU 換 IO 的壓縮演算法、Encoding 演算法和儲存技術的廣泛使用,CPU 的效能逐漸成為成為分析引擎的瓶頸。為了解決 Row Oriented 執行所存在的問題,學術界開始思考解決方案, Block oriented processing of Relational Database operations in modern Computer Architectures 這篇論文提出使用按 block 的方式在 operator 之間傳遞資料,能夠平攤條件檢查和分支預測的工作的耗時, MonetDB/X100: Hyper-Pipelining Query Execution 在此基礎上更進一步,提出將通過將資料從原來的 Row Oriented,改變成 Column Oriented,進一步提升 CPU Cache 的效率,也更有利於編譯器進行優化。在 Column Oriented 的模型中,執行計劃的執行過程可以用如下偽碼錶示:

// first create an n + 1 result, for all values in the n column 
projPlusIntIntConst.Next():
batch = source.Next()


for i < batch.n:
outCol[i] = intCol[i] + constArg


return batch


// then, compare the new column to the m column, putting the result into
// a selection vector: a list of the selected indexes in the column batch


selectLTIntInt.Next():
batch = source.Next()


for i < batch.n:
if int1Col < int2Col:
selectionVector.append(i)


return batch with selectionVector


// finally, we materialize the batch, returning actual rows to the user,
// containing just the columns requested:


materialize.Next():
batch = source.Next()


for s < batch.n:
i = selectionVector[i]
returnedRow row
for col in selectedCols:
returnedRow.append(cols[col][i])
yield returnedRow

可以看到,Column Oriented 擁有更好的資料區域性性和指令區域性性,有利於提高 CPU Cache 的命中率,並且編譯器更容易執行 SIMD 優化等。

Pull Based vs Push Based

資料庫系統中,通常是將輸入的 SQL 語句轉化為一系列的運算元,然後生成物理執行計劃用於實際的計算並返回結果。在生成的物理執行計劃中,通常會對運算元進行 pipeline。常見的 pipeline 方式通常有兩種:

    • 基於資料驅動的 Push Based 模式,上游運算元推送資料到下游運算元

    • 基於需求的 Pull Based 模式,下游運算元主動從上游運算元拉取資料。經典的火山模型就是 Pull Based 模式。

Push Based 的執行模式提高了快取效率,能夠更好地提升查詢效能。

參考:Push vs. Pull-Based Loop Fusion in Query Engines

現代資料湖分析引擎的架構

通過上一節的介紹,相信讀者已經對資料湖分析引擎的前沿理論有了相應瞭解。在本節中,我們以 StarRocks 為例,進一步介紹資料湖分析引擎是怎麼有機的結合上述先進理論,並且通過優雅的系統架構將其呈現給使用者。 

如上圖所示,StarRocks 的架構非常簡潔,整個系統的核心只有 Frontend (FE)、Backend (BE) 兩類程序,不依賴任何外部元件,方便部署與維護。其中 FE 主要負責解析查詢語句(SQL),優化查詢以及查詢的排程,而 BE 則主要負責從資料湖中讀取資料,並完成一系列的 Filter 和 Aggregate 等操作。

Frontend

FE 的主要作用將 SQL 語句通過一系列轉化和優化,最終轉換成 BE 能夠認識的一個個 Fragment。一個不那麼準確但易於理解的比喻,如果把 BE 叢集當成一個分散式的執行緒池的話,那麼 Fragment 就是執行緒池中的 Task。從 SQL 文字到 Fragment,FE 的主要工作包含以下幾個步驟:

  • SQL Parse:將 SQL 文字轉換成一個 AST(抽象語法樹)

  • Analyze:基於 AST 進行語法和語義分析

  • Logical Plan:將 AST 轉換成邏輯計劃

  • Optimize:基於關係代數,統計資訊,Cost 模型對邏輯計劃進行重寫,轉換,選擇出 Cost “最低” 的物理執行計劃

  • 生成 Fragment:將 Optimizer 選擇的物理執行計劃轉換為 BE 可以直接執行的 Fragment

  • Coordinate:將 Fragment 排程到合適的 BE 上執行 

Backend

BE 是 StarRocks 的後端節點,負責接收 FE 傳下來的 Fragment 執行並返回結果給 FE。StarRocks 的 BE 節點都是完全對等的,FE 按照一定策略將資料分配到對應的 BE 節點。常見的 Fragment 工作流程是讀取資料湖中的部分檔案,並呼叫對應的 Reader (例如,適配 Parquet 檔案的 Parquet Reader 和適配 ORC 檔案的 ORC Reader等)解析這些檔案中的資料,使用向量化執行引擎進一步過濾和聚合解析後的資料後,返回給其他 BE 或 FE。 

本篇文章主要介紹了極速資料湖分析引擎的核心技術原理,從多個維度對比了不同技術實現方案。為方便接下來的深入探討,進一步介紹了開源資料湖分析引擎 StarRocks 的系統架構設計。希望和各位同仁共同探討、交流。

附錄

基準測試

本次測試採用的 TPCH 100G 的標準測試集,分別對比測試了 StarRocks 本地表,StarRocks On Hive 和 Trino(PrestoSQL) On Hive 三者之間的效能差距。

在 TPCH 100G規模的資料集上進行對比測試,共22個查詢,結果如下: 

StarRocks 使用本地儲存查詢和 Hive 外表查詢兩種方式進行測試。其中,StarRocks On Hive 和 Trino On Hive 查詢的是同一份資料,資料採用 ORC 格式儲存,採用 zlib 格式壓縮。測試環境使用  阿里雲 EMR 進行構建。

最終,StarRocks 本地儲存查詢總耗時為21s,StarRocks Hive 外表查詢總耗時92s。Trino 查詢總耗時307s。可以看到 StarRocks On Hive 在查詢效能方面遠遠超過 Trino,但是對比本地儲存查詢還有不小的距離,主要的原因是訪問遠端儲存增加了網路開銷,以及遠端儲存的延時和 IOPS 通常都不如本地儲存,後面的計劃是通過 Cache 等機制彌補問題,進一步縮短 StarRocks 本地表和 StarRocks On Hive 的差距。

參考資料

[1] GitHub - jordanlewis/exectoy

[2] DBMSs On A Modern Processor: Where Does Time Go?

[3] Block oriented processing of Relational Database operations in modern Computer Architectures

[4] MonetDB/X100: Hyper-Pipelining Query Execution

[5]

阿里雲 EMR StarRocks 官方文件

https://help.aliyun.com/document_detail/404790.html

招聘

阿里雲智慧計算平臺事業部-開源大資料-OLAP 團隊招聘實習生,重點參與 StarRocks、ClickHouse 等開源專案,和社群深度合作。有機會參與核心特性開發,觸達海量客戶場景。歡迎感興趣的同學們通過如下二維碼投遞:

我們會在釘群推送 精彩文章 ,邀請技術大牛 直播分享

歡迎

釘釘掃碼 加入交流群一起參與討論~

“閱讀原文” 立即參與測試!