如何基於 Apache Doris 與 Apache Flink 快速構建極速易用的實時數倉

語言: CN / TW / HK

隨着大數據應用的不斷深入,企業不再滿足離線數據加工計算的時效,實時數據需求已成為數據應用新常態。伴隨着實時分析需求的不斷膨脹,傳統的數據架構面臨的成本高、實時性無法保證、組件繁宂、運維難度高等問題日益凸顯。為了適應業務快速迭代的特點,幫助企業提升數據生產和應用的時效性、進一步挖掘實時數據價值,實時數倉的構建至關重要。

本文將分享如何基於 Apache Doris 和 Apache Flink 快速構建一個極速易用的實時數倉,包括數據同步、數據集成、數倉分層、數據更新、性能提升等方面的具體應用方案,在這之前,我們先可以先了解一下傳統的數據架構如何設計的、又存在哪些痛點問題。

#  實時數倉的需求與挑戰

圖片

上圖所示為傳統的數據架構,如果我們從數據流的⻆度分析傳統的數據處理架構,會發現從源端採集到的業務數據和日誌數據主要會分為實時和離線兩條鏈路:

  • 在實時數據部分,通過 Binlog 的⽅式,將業務數據庫中的數據變更 (CDC,Change Data Capture)採集到實時數倉。同時,通過 Flume-Kafka-Sink 對日誌數據進⾏實時採集。當不同來源的數據都採集到實時存儲系統後,便可以基於實時存儲系統來構建實時數倉。在實時數倉的內部,我們仍然會遵守傳統數倉分層理論,將數據分為 ODS 層、DWD 層、DWS 層、 ADS 層以實現最大程度的模型複用。

  • 在離線數據部分,通過 DataX 定時同步的⽅式,批量同步業務庫 RDS 中的數據。當不同來源的數據進⼊到離線數倉後,便可以在離線數倉內部,依賴 Spark SQL 或 Hive SQL 對數據進⾏定時處理,分離出不同層級 (ODS 、DWD 、ADS 等)的數據,並將這些數據存在⼀個存儲介質上,⼀般會採用如 HDFS 的分佈式文件系統或者 S3 對象存儲上。通過這樣的⽅式,離線數倉便構建起來了。與此同時,為了保障數據的⼀致性,通常需要數據清洗任務使⽤離線數據對實時數據進⾏清洗或定期覆蓋,保障數據最終的⼀致性。

從技術架構的⻆度對傳統數據技術棧進行分析,我們同樣會發現,為了迎合不同場景的需求,往往會採用多種技術棧,例如在湖倉部分通常使用的是 Hive 、Iceberg 、Hudi 等數據湖;面向湖上數據的 Ad-hoc 查詢一般選擇 Impala 或 Presto;對於 OLAP 場景的多維分析,一般使⽤ Doris 或 Kylin、 Druid。除此之外,為應對半結構化數據的分析需求,例如日誌分析與檢索場景,通常會使⽤ ES 進行分析;面對高併發點查詢的 Data Serving 場景會使⽤ HBase;在某些場景下可能還需要對外提供統⼀的數據服務,這時可能會使⽤基於 Presto/Trino 的查詢⽹關,對⽤户提供統一查詢服務。其中涉及到的數據組件有數十種,高昂的使用成本和組件間兼容、維護及擴展帶來的繁重壓力成為企業必須要面臨的問題。

圖片

從上述介紹即可知道,傳統的數據架構存在幾個核心的痛點問題:

  • 傳統數據架構組件繁多,維護複雜,運維難度非常高。
  • 計算、存儲和研發成本都較高,與行業降本提效的趨勢背道而馳。
  • 同時維護兩套數據倉庫(實時數倉和離線數倉)和兩套計算(實時數據量和實時計算任務),數據時效性與一致性無法保證。

在此背景下,我們亟需⼀個“極速、易用、統一、實時”的數據架構來解決這些問題

  • 極速:更快的查詢速度,最大化提升業務分析人員的效率;
  • 易用:對於用户側的使用和運維側的管控都提供了極簡的使用體驗;
  • 統⼀:異構數據與分析場景的統一,半結構化和結構化數據可以統⼀存儲,多分析場景可以統一技術棧;
  • 實時:端到端的高時效性保證,發揮實時數據的價值。

#  如何構建極速易用的實時數倉架構

基於以上的需求,我們採取 Apache Doris 和 Apache Flink 來構建極速易用的實時數倉,具體架構如下圖所示。多種數據源的數據經過 Flink CDC 集成或 Flink Job 加⼯處理後,⼊庫到 Doris 或者 Hive/Iceberg 等湖倉中,最終基於 Doris 提供統⼀的查詢服務。

圖片

在數據同步上,通過 Flink CDC 將 RDS 的數據實時同步到 Doris;通過 Routine Load 將 Kafka 等消息系統中的數據實時同步到 Doris 。在數倉分層上,ODS 層通常選擇使用明細模型構建,DWD 層可以通過 SQL 調度任務對 ODS 數據抽取並獲取,DWS 和 ADS 層則可以通過 Rollup 和物化視圖進行構建。在數據湖上, Doris ⽀持為 Hive、Iceberg 、Hudi 以及Delta Lake(todo)提供聯邦分析和湖倉加速的能⼒。在數據應用上,Apache Doris 既可以承載批量數據加工處理的需求,也可以承載高吞吐的 Adhoc 和高併發點查詢等多種應⽤場景。

#  解決方案

如何實現數據的增量與全量同步

1. 增量及全量數據同步

在全量數據和增量的同步上,我們採取了 Flink CDC 來實現。其原理非常簡單,Flink CDC 實現了基於 Snapshot 的全量數據同步、基於 BinLog 的實時增量數據同步,全量數據同步和增量數據同步可以⾃動切換,因此我們在數據遷移的過程中,只需要配置好同步的表即可。當 Flink 任務啟動時,優先進⾏歷史表的數據同步,同步完後⾃動切換成實時同步。

圖片

2. 數據一致性保證

如何保證數據一致性是大家重點關注的問題之一,那麼在新架構是如何實現的呢?

數據⼀致性⼀般分為“最多⼀次” 、“⾄少⼀次”和“精確⼀次”三種模型。

  • 最多⼀次(At-Most-Once):發送⽅僅發送消息,不期待任何回覆。在這種模型中,數據的⽣產和消費過程中可能出現數據丟失的問題。
  • ⾄少⼀次(At-Least-Once):發送⽅不斷重試,直到對⽅收到為⽌。在這個模型中,⽣產和消費過程都可能出現數據重複。
  • 精確⼀次(Exactly-Once):能夠保證消息只被嚴格發送⼀次,並且只被嚴格處理⼀次。這種數據模型能夠嚴格保證數據⽣產和消費過程中的準確⼀致性。
  • Flink CDC 通過 Flink Checkpoint 機制結合 Doris 兩階段提交可以實現端到端的 Exactly Once 語義。具體過程分為四步:

  • 事務開啟(Flink Job 啟動及 Doris 事務開啟):當 Flink 任務啟動後, Doris 的 Sink 會發起 Precommit 請求,隨後開啟寫⼊事務。

  • 數據傳輸(Flink Job 的運⾏和數據傳輸):在 Flink Job 運⾏過程中, Doris Sink 不斷從上游算⼦獲取數據,並通過 HTTP Chunked 的⽅式持續將數據傳輸到 Doris。

  • 事務預提交:當 Flink 開始進⾏ Checkpoint 時,Flink 會發起 Checkpoint 請求,此時 Flink 各個算⼦會進⾏ Barrier 對⻬和快照保存,Doris Sink 發出停⽌ Stream Load 寫⼊的請求,併發起⼀個事務提交請求到 Doris。這步完成後,這批數據已經完全寫⼊ Doris BE 中,但在 BE 沒有進⾏數據發佈前對⽤户是不可⻅的。

  • 事務提交:當 Flink 的 Checkpoint 完成之後,將通知各個算⼦,Doris 發起⼀次事務提交到 Doris BE ,BE 對此次寫⼊的數據進⾏發佈,最終完成數據流的寫⼊。

圖片

綜上可知,我們利用 Flink CDC 結合 Doris 兩階段事務提交保證了數據寫入一致性。需要注意的是,在該過程中可能遇到一個問題:如果事務預提交成功、但 Flink Checkpoint 失敗了該怎麼辦?針對該問題,Doris 內部支持對寫⼊數據進⾏回滾(Rollback),從⽽保證數據最終的⼀致性。

3. DDL 和 DML 同步

隨着業務的發展,部分⽤户可能存在 RDS Schema 的變更需求。當 RDS 表結構變更時,⽤户期望 Flink CDC 不但能夠將數據變化同步到 Doris,也希望將 RDS 表結構的變更同步到 Doris,⽤户則無需擔⼼ RDS 表結構和 Doris 表結構不⼀致的問題。

Light Schema Change

目前,Apache Doris 1.2.0 已經實現了  Light Schema Change 功能,可滿⾜ DDL 同步需求,快速⽀持 Schema 的變更。

圖片

Light Schema Change 的實現原理也比較簡單,對數據表的加減列操作,不再需要同步更改數據文件,僅需在 FE 中更新元數據即可,從而實現毫秒級的 Schema Change 操作,且存在導入任務時效率的提升更為顯著。在這個過程中,由於 Light Schema Change 只修改了 FE 的元數據,並沒有同步給 BE。因此會產⽣ BE 和 FE Schema 不⼀致的問題。為了解決這種問題,我們對 BE 的寫出流程進⾏了修改,具體包含三個⽅⾯。

  • 數據寫⼊:FE 會將 Schema 持久化到元數據中,當 FE 發起導⼊任務時,會把最新的 Schema 一起發給 Doris BE,BE 根據最新的 Schema 對數據進⾏寫⼊,並與 RowSet 進⾏綁定。將該 Schema 持久化到 RowSet 的元數據中,實現了數據的各⾃解析,解決了寫⼊過程中 Schema 不⼀致的問題。
  • 數據讀取:FE ⽣成查詢計劃時,會把最新的 Schema 附在其中⼀起發送給 BE,BE 拿到最新的 Schema 後對數據進⾏讀取,解決讀取過程中 Schema 發⽣不⼀致的問題。
  • 數據 Compaction:當數據進⾏ Compaction 時,我們選取需要進⾏ Compaction 的 RowSet 中最新的 Schema 作為之後 RowSet 對應的 Schema,以此解決不同 Schema 上 RowSet 的合併問題。

經過對 Light Schema Change 寫出流程的優化後, 單個 Schema Chang 從 310 毫秒降低到了 7 毫秒,整體性能有近百倍的提升,徹底的解決了海量數據的 Schema Change 變化難的問題。

Flink CDC DML 和 DDL 同步

有了 Light Schema Change 的保證,  Flink CDC 能夠同時⽀持 DML 和 DDL 的數據同步。那麼是如何實現的呢?

圖片

  • 開啟 DDL 變更配置:在 Flink CDC 的 MySQL Source 側開啟同步 MySQL DDL 的變更配置,在 Doris 側識別 DDL 的數據變更,並對其進⾏解析。
  • 識別及校驗:當 Doris Sink 發現 DDL 語句後,Doris Sink 會對錶結構進⾏驗證,驗證其是否⽀持 Light Schema Change。
  • 發起 Schema Change :當表結構驗證通過後,Doris Sink 發起 Schema Change 請求到 Doris,從⽽完成此次 Schema Change 的變化。

解決了數據同步過程中源數據⼀致性的保證、全量數據和增量數據的同步以及 DDL 數據的變更後,一個完整的數據同步⽅案就基本形成了。

如何基於 Flink 實現多種數據集成

圖片

除了上文中所提及的基於 Flink CDC 進行數據增量/全量同步外,我們還可以基於 Flink Job 和 Doris 來構建多種不同的數據集成方式:

  • 將 MySQL 中兩個表的數據同步到 Flink 後,在 Flink 內部進⾏多流 Join 完成數據打寬,後將⼤寬表同步到 Doris 中。
  • 對上游的 Kafka 數據進⾏清洗,在 Flink Job 完成清洗後通過 Doris-Sink 寫⼊ Doris 中。
  • 將 MySQL 數據和 Kafka 數據在 Flink 內部進⾏多流 Join,將 Join 後的寬表結果寫⼊ Doris中。
  • 在 Doris 側預先創建寬表,將上游 RDS 中的數據根據 Key 寫入, 使⽤ Doris 的部分列更新將多列數據分別寫⼊到 Doris 的⼤寬表中。

如何選擇數據模型

Apache Doris 針對不同場景,提供了不同的數據模型,分別為聚合模型、主鍵模型、明細模型。

圖片

AGGREGATE 聚合模型

在企業實際業務中有很多需要對數據進行統計和彙總操作的場景,如需要分析網站和 APP 訪問流量、統計用户的訪問總時長、訪問總次數,或者像廠商需要為廣告主提供廣告點擊的總流量、展示總量、消費統計等指標。在這些不需要召回明細數據的場景,通常可以使用聚合模型,比如上圖中需要根據門店 ID 和時間對每個門店的銷售額實時進行統計。

UNIQUE KEY 主鍵模型

在某些場景下用户對數據更新和數據全局唯一性有去重的需求,通常使用 UNIQUE KEY 模型。在 UNIQUE 模型中,會根據表中的主鍵進⾏ Upsert 操作:對於已有的主鍵做 Update 操作,更新 value 列,沒有的主鍵做 Insert 操作,比如圖中我們以訂單id為唯一主鍵,對訂單上的其他數據(時間和狀態)進行更新。

DUPLICATE 明細模型

在某些多維分析場景下,數據既沒有主鍵,也沒有聚合需求,Duplicate 數據模型可以滿足這類需求。明細模型主要用於需要保留原始數據的場景,如日誌分析,用户行為分析等場景。明細模型適合任意維度的 Ad-hoc 查詢。雖然同樣無法利用預聚合的特性,但是不受聚合模型的約束,可以發揮列存模型的優勢(只讀取相關列,而不需要讀取所有 Key 列)。

如何構建數倉分層

由於數據量級普遍較大,如果直接查詢數倉中的原始數據,需要訪問的表數量和底層文件的數量都較多,體現在日常工作中就是 SQL 異常複雜、計算耗時增高。而分層要做的就是對原始數據重新做歸納整理,在不同層級對數據或者指標做不同粒度的抽象,通過複用數據模型來簡化數據管理壓力,利用血緣關係來定位數據鏈路的異常,同時進一步提升數據分析的效率。在 Apache Doris 可以通過以下多種思路來構建數據倉庫分層:

微批調度

通過 INSERT INTO SELECT 可以將原始表的數據進行處理和過濾並寫入到目標表中,這種 SQL 抽取數據的行為一般是以微批形式進行(例如 15 分鐘一次的 ETL 計算任務),通常發生在從 ODS 到 DWD 層數據的抽取過程中,因此需要藉助外部的調度工具例如 DolphinScheduler 或 Airflow 等來對 ETL SQL 進行調度。

Rollup 與物化視圖

物化視圖本質是一個預先計算的過程。我們可以在 Base 表上,創建不同的 Rollup 或者物化視圖來對 Base 表進行聚合計算。通常在明細層到彙總層(例如 DWD 層到 DWS 層或從 DWS 層到 ADS 層)的匯聚過程中可以使用物化視圖,以此實現指標的高度聚合。同時物化視圖的計算是實時進行的,因此站在計算的角度也可以將物化視圖理解為一個單表上的實時計算過程。

多表物化視圖

Apache Doris 2.0 將實現多表物化視圖這一功能,可以將帶有 Join 的查詢結果固化以供用户直接查詢,支持定時自動或手動觸發的方式進行全量更新查詢結果,未來還將進一步支持更加完善的自動增量刷新。基於多表物化視圖這一功能的實現,我們可以做更復雜的數據流處理,比如數據源側有 TableA、TableB、TableC,在多表物化視圖的情況下,用户就可以將 TableA 和 TableB 的數據進行實時Join 計算後物化到 MV1 中。在這個角度上來看,多表物化視圖更像一個多流數據實時 Join 的過程。

圖片

如何應對數據更新

在實時數據倉庫構建的過程中,還需要面臨高併發寫入和實時更新的挑戰。如何在億級數據中快速找到需要更新的數據,並對其進⾏更新,⼀直都是⼤數據領域不斷追尋的答案。

1. 高併發數據更新

在 Apache Doris 中通過 Unique Key 模型來滿足數據更新的需求,同時通過 MVCC 多版本併發機制來實現數據的讀寫隔離。當新數據寫入時,如果不存在相同 Key 的數據則會直接寫⼊;如果有相同 Key 的數據則增加版本,此時數據將以多個版本的形式存在。後台會啟動異步的 Compaction 進程對歷史版本數據進⾏清理,當⽤户在查詢時 Doris 會將最新版本對應的數據返回給⽤户,這種設計解決了海量數據的更新問題。

圖片

在 Doris 中提供了 Merge-on-Read 和 Merge-on-Write 兩種數據更新模式。

圖片

在此我們以訂單數據的寫入為例介紹 Merge-on-Read 的數據寫入與查詢流程,三條訂單數據均以 Append 的形式寫⼊ Doris 表中:

  • 數據 Insert:首先我們寫入 ID 為 1,2,3 的三條數據;
  • 數據 Update:當我們將訂單 1 的 Cost 更新為 30 時,其實是寫⼊⼀條 ID 為 1,Cost 為 30 的新版本數據,數據通過 Append 的形式寫⼊ Doris;
  • 數據 Delete:當我們對訂單 2 的數據進⾏刪除時,仍然通過 Append ⽅式,將數據多版本寫⼊ Doris ,並將 _DORIS_DELETE_SIGN 字段變為 1 ,則表示這條數據被刪除了。當 Doris 讀取數據時,發現最新版本的數據被標記刪除,就會將該數據從查詢結果中進⾏過濾。

Merge-on-Read 的特點是寫⼊速度比較快,但是在數據讀取過程中由於需要進⾏多路歸併排序,存在着大量非必要的 CPU 計算資源消耗和 IO 開銷。

因此在 1.2.0 版本中,Apache Doris 在原有的 Unique Key 數據模型上增加了 Merge-on-Write 的數據更新模式。Merge-on-Write 兼顧了寫入和查詢性能。在寫⼊的過程中引⼊了 Delete Bitmap 數據結構,使⽤ Delete Bitmap 標記 RowSet 中某⼀⾏是否被刪除,為了保持 Unique Key 原有的語義, Delete Bitmap 也⽀持多版本。另外使⽤了兼顧性能和存儲空間的 Row Bitmap,將 Bitmap 中的 MemTable ⼀起存儲在 BE 中,每個 Segment 會對應⼀個 Bitmap。

圖片

  • 寫入流程:

  • DeltaWriter 先將數據 Flush 到磁盤

  • 批量檢查所有 Key,在點查過程中經過區間樹,查找到對應的 RowSet。

  • 在 RowSet 內部通過 BloomFilter 和 index 進行⾼效查詢。

當查詢到 Key 對應的 RowSet 後,便會覆蓋 RowSet Key 對應的 Bitmap,接着在 Publish 階段更新 Bitmap,從⽽保證批量點查 Key 和更新 Bitmap 期間不會有新的可⻅ RowSet,以保證 Bitmap 在更新過程中數據的正確性。除此之外,如果某個 Segment 沒有被修改,則不會有對應版本的 Bitmap 記錄。

  • 查詢流程:

  • 當我們查詢某⼀版本數據時, Doris 會從 LRU Cache Delete Bitmap 中查找該版本對應的緩存。

  • 如果緩存不存在,再去 RowSet 中讀取對應的 Bitmap。

  • 使⽤ Delete Bitmap 對 RowSet 中的數據進⾏過濾,將結果返回。

該模式不需要在讀取的時候通過歸併排序來對主鍵進行去重,這對於高頻寫入的場景來説,大大減少了查詢執行時的額外消耗。此外還能夠支持謂詞下推,並能夠很好利用 Doris 豐富的索引,在數據 IO 層面就能夠進行充分的數據裁剪,大大減少數據的讀取量和計算量,因此在很多場景的查詢中都有非常明顯的性能提升。在真實場景的測試中,通過 Merge-on-Write 可以在保證數萬 QPS 的高頻 Upset 操作的同時實現性能 3-10 倍的提升。

2. 部分列更新

部分列更新是一個比較普遍的需求,例如廣告業務中需要在不同的時間點對同一個廣告行為(展示、點擊、轉換等)數據的更新。這時可以通過 Aggregate Key 模型的replace_if_not_null實現。具體建表語句如下:

CREATE TABLE IF NOT EXISTS request_log
(
    `session_id` LARGEINT NOT NULL COMMENT "id",

    `imp_time` DATE REPLACE_IF_NOT_NULL COMMENT "展示",  #展示數據更新
    `imp_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT "",

    `click_time` DATE REPLACE_IF_NOT_NULL COMMENT "點擊",#點擊數據更新
    `click_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT "",

    `conv_time` DATE REPLACE_IF_NOT_NULL COMMENT "轉化",#轉換數據更新
    `conv_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT ""
)
AGGREGATE KEY(`session_id`)
DISTRIBUTED BY HASH(`session_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

具體更新過程如下:

(1)更新展示數據

mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query OK, 1 row affected (0.05 sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | NULL       | NULL       | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

(2)更新點擊數據

mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query OK, 1 row affected (0.05 sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | NULL       | NULL       | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

(3)更新轉化數據

ysql> insert into request_log(session_id,click_time,click_data)VALUES(1,'2022-12-21','click');
Query OK, 1 row affected (0.03 sec)
{'label':'insert_2649087d8dc046bd_a39d367af1f93ab0', 'status':'VISIBLE', 'txnId':'385667'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | 2022-12-21 | click      | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

mysql>

同時部分列更新還可用於支持畫像場景的寬表列實時更新。

圖片

另外值得期待的是 Apache  Doris 的 Unique Key 模型也即將實現部分列更新的功能,可以通過 Apache Doris GitHub 代碼倉庫及官網,關注新版本或新功能的發佈(相關地址可下滑至文章底部獲取)。

如何進一步提升查詢性能

1. 智能物化視圖

物化視圖除了可以作為高度聚合的彙總層外,其更廣泛的定位是加速相對固定的聚合分析場景。物化視圖是指根據預定義的 SQL 分析語句執⾏預計算,並將計算結果持久化到另一張對用户透明但有實際存儲的表中,在需要同時查詢聚合數據和明細數據以及匹配不同前綴索引的場景,命中物化視圖時可以獲得更快的查詢性能。在使用物化視圖時需要建⽴ Base 表並基於此建⽴物化視圖,同⼀張 Base 表可以構建多個不同的物化視圖,從不同的維度進⾏統計。物化視圖在查詢過程中提供了智能路由選擇的能力,如果數據在物化視圖中存在會直接查詢物化視圖,如果在物化視圖中不存在才會查詢 Base 表。對於數據寫入或更新時,數據會在寫入 Base 表的同時寫入物化視圖,從⽽讓 Doris 保證物化視圖和 Base 表數據的完全⼀致性。

圖片

智能路由選擇遵循最⼩匹配原則,只有查詢的數據集⽐物化視圖集合⼩時,才可能⾛物化視圖。如上圖所示智能選擇過程包括選擇最優和查詢改寫兩個部分:

選擇最優

  • 在過濾候選集過程中,被執行的 SQL 語句通過 Where 條件進⾏判斷,Where 條件為advertiser=1。由此可⻅,物化視圖和 Base 表都有該字段,這時的選集是物化視圖和 Base 表。
  • Group By 計算,Group By 字段是 advertiser 和 channel,這兩個字段同時在物化視圖和 Base 表中。這時過濾的候選集仍然是物化視圖和 Base 表。
  • 過濾計算函數,⽐如執⾏ count(distinctuser_id),然後對數據進⾏計算,由於 Count Distinct 的字段 user_id 在物化視圖和 Base 表中都存在,因此過濾結果仍是物化視圖和 Base 表。
  • 選擇最優,通過⼀系列計算,我們發現查詢條件⽆論是 Where 、Group By 還是 Agg Function 關聯的字段,結果都有 Base 表和物化視圖,因此需要進⾏最優選擇。Doris 經過計算髮現 Base 表的數據遠⼤於物化視圖,即物化視圖的數據更⼩。

由此過程可⻅,如果通過物化視圖進行查詢,查詢效率更⾼。當我們找到最優查詢計劃,就可以進⾏⼦查詢改寫,將 Count Distinct 改寫成 Bitmap ,從⽽完成物化視圖的智能路由。完成智能路由之後,我們會將 Doris ⽣成的查詢 SQL 發送到 BE 進⾏分佈式查詢計算。

2. 分區分桶裁剪

Doris 數據分為兩級分區存儲, 第一層為分區(Partition),目前支持 RANGE 分區和 LIST 分區兩種類型, 第二層為 HASH 分桶(Bucket)。我們可以按照時間對數據進⾏分區,再按照分桶列將⼀個分區的數據進行 Hash 分到不同的桶⾥。在查詢時則可以通過分區分桶裁剪來快速定位數據,加速查詢性能的同時實現高併發。

3. 索引查詢加速

除了分區分桶裁剪, 還可以通過存儲層索引來裁剪需要讀取的數據量,僅以加速查詢:

  • 前綴索引:在排序的基礎上快速定位數據
  • Zone Map 索引:維護列中 min/max/null 信息
  • Bitmap 索引:通過 Bitmap 加速去重、交併查詢
  • Bloom Filter 索引:快速判斷元素是否屬於集合;
  • Invert 倒排索引:支持字符串類型的全文檢索;

4. 執行層查詢加速

同時 Apache Doris 的 MPP 查詢框架、向量化執行引擎以及查詢優化器也提供了許多性能優化方式,在此僅列出部分、不做詳細展開:

  • 算子下推:Limit、謂詞過濾等算子下推到存儲層;
  • 向量化引擎:基於 SIMD 指令集優化,充分釋放 CPU 計算能力;
  • Join 優化:Bucket Shuffle Join、Colocate Join 以及 Runtime Filter 等;

行業最佳實踐

截止目前,Apache Doris 在全球範圍內企業用户規模已超過 1500 家,廣泛應用於數十個行業中。在用户行為分析、AB 實驗平台、日誌檢索分析、用户畫像分析、訂單分析等方向均有着豐富的應用。在此我們列出了幾個基於 Doris 構建實時數據倉庫的真實案例作為參考:

圖片

第 1 個案例是較為典型的基於 Doris 構建實時數倉,下層數據源來自 RDS 業務庫、⽂件系統數據以及埋點日誌數據。在數據接⼊過程中通過 DataX 進⾏離線數據同步以及通過 Flink CDC 進⾏實時數據同步,在 Doris 內部構建不同的數據分層;最後在上層構建不同的數據應⽤,⽐如⾃助報表、⾃助數據抽取、數據⼤屏。除此之外,它還結合了⾃⼰的應⽤平台構建了數據開發與治理平台,完成了源數據管理、數據分析等操作。

使用收益:

  • 業務計算耗時從之前的兩⼩時降低到三分鐘。
  • 全鏈路的更新報表的時間從周級別更新到⼗分鐘級別。
  • Doris ⾼度兼容 MySQL,報表遷移無壓力,開發週期從周級別降至⾄天級別。

圖片

第 2 個案例是在某運營服務商的應用,其架構是通過 Flink CDC 將 RDS 的數據同步到 Doris 中,同時通過 Routine Load 直接訂閲 Kafka 中接入的日誌數據,然後在 Doris 內部構建實時數倉。在數據調度時, 通過開源 DolphinScheduler 完成數據調度;使⽤ Prometheus+Grafana 進⾏數據監控。

使用收益: 採⽤ Flink+Doris 架構體系後,架構簡潔、組件減少,解決了多架構下的數據的宂餘存儲,服務器資源節省了 30%,數據存儲磁盤佔⽤節省了 60%,運營成本⼤幅降低。該案例每天在⽤户的業務場景上,⽀持數萬次的⽤户的在線查詢和分析。

圖片

第 3 個應用是在供應鏈企業,在過去該企業採取了 Hadoop 體系,使用組件⽐較繁多,有 RDS、HBase、Hive、HDFS、Yarn、Kafka 等多個技術棧,在該架構下,查詢性能無法得到有效快速的提升,維護和開發成本一直居高不下。

使用收益: 引入 Doris 之後,將 RDS 的數據通過 Flink CDC 實時同步到 Doris ⾥,服務器資源成本得到了很⼤的降低。數據的查詢時間從 Spark 的 2~5 ⼩時,縮短到⼗分鐘,查詢效率也⼤⼤提升。在數據的同步過程中,使⽤了 Flink CDC+MySQL 全量加增量的數據同步⽅式,同時還利⽤ Doris 的 Light Schema Change 特性實時同步 Binlog ⾥的 DDL 表結構變更,實現數據接⼊數倉零開發成本。

#  總結

憑藉 Apache Doris 豐富的分析功能和 Apache Flink 強大的實時計算能力,已經有越來越多的企業選擇基於 Apache Doris 和 Flink 構建極速易用的實時數倉架構,更多案例歡迎關注 SelectDB 公眾號以及相關技術博客。後續我們仍會持續提升 Apache Doris 在實時數據處理場景的能力和性能,包括 Unique 模型上的部分列更新、單表物化視圖上的計算增強、自動增量刷新的多表物化視圖等,後續研發進展也將在社區及時同步。在構建實時數據倉庫架構中遇到任何問題,歡迎聯繫社區進行支持。同時也歡迎加入 Apache Doris 社區,一起將 Apache Doris 建設地更加強大!

作者介紹:

王磊, SelectDB 資深大數據研發專家、Apache Doris Contributor、阿里雲 MVP,具有超 10 年大數據領域工作經驗,對數據治理、數據湖和實時數倉有深入理解和實踐,人氣技術暢銷書《圖解 Spark 大數據快速分析實戰》、《offer 來了:Java 面試核心知識點精講(原理篇&架構篇)》作者。

# 相關鏈接:

SelectDB 官網

http://selectdb.com

Apache Doris 官網

http://doris.apache.org

Apache Doris Github

http://github.com/apache/doris

「其他文章」