最佳實踐|Apache Doris 在小米資料場景的應用實踐與優化

語言: CN / TW / HK

導讀: 小米集團於 2019 年首次引入了 Apache Doris ,目前 Apache Doris 已經在小米內部數十個業務中得到廣泛應用,並且在小米內部已經形成一套以 Apache Doris 為核心的資料生態。 本篇文章轉錄自 Doris 社群線上 Meetup 主題演講,旨在分享 Apache Doris 在小米資料場景的落地實踐與優化實踐。

​作者|小米 OLAP 引擎研發工程師 魏祚

業務背景

因增長分析業務需要,小米集團於 2019 年首次引入了 Apache Doris 。經過三年時間的發展,目前 Apache Doris 已經在廣告投放、新零售、增長分析、資料看板、天星數科、小米有品、使用者畫像等小米內部數十個業務中得到廣泛應用 ,並且在小米內部已經形成一套以 Apache Doris 為核心的資料生態。

當前 Apache Doris 在小米內部已經具有數十個叢集、總體達到數百臺 BE 節點的規模,其中單叢集最大規模達到近百臺節點,擁有數十個流式資料匯入產品線,每日單表最大增量 120 億、支援 PB 級別儲存,單叢集每天可以支援 2W 次以上的多維分析查詢。

架構演進

小米引入 Apache Doris 的初衷是為了解決內部進行使用者行為分析時所遇到的問題。隨著小米網際網路業務的發展,各個產品線利用使用者行為資料對業務進行增長分析的需求越來越迫切。讓每個業務產品線都自己搭建一套增長分析系統,不僅成本高昂,也會導致效率低下。因此能有一款產品能夠幫助他們遮蔽底層複雜的技術細節,讓相關業務人員能夠專注於自己的技術領域,可以極大提高工作效率。基於此,小米大資料和雲平臺聯合開發了增長分析系統 Growing Analytics(下文中簡稱 GA ),旨在提供一個靈活的多維實時查詢和分析平臺,統一資料接入和查詢方案,幫助業務線做精細化運營。(此處內容引用自:基於Apache Doris的小米增長分析平臺實踐

分析、決策、執行是一個迴圈迭代的過程,在對使用者進行行為分析後,針對營銷策略是否還有提升空間、是否需要在前端對使用者進行個性化推送等問題進行決策,幫助小米實現業務的持續增長。這個過程是對使用者行為進行分析-決策-優化執行-再分析-再決策-再優化執行的迭代過程。

歷史架構

增長分析平臺立項於 2018 年年中,當時基於開發時間和成本,技術棧等因素的考慮,小米複用了現有各種大資料基礎元件(HDFS, Kudu, SparkSQL 等),搭建了一套基於 Lamda 架構的增長分析查詢系統。GA 系統初代版本的架構如下圖所示,包含了以下幾個方面:

  • 資料來源:資料來源是前端的埋點資料以及可能獲取到的使用者行為資料。
  • 資料接入層:對埋點資料進行統一的清洗後打到小米內部自研的訊息佇列 Talos 中,並通過 Spark Streaming 將資料匯入儲存層 Kudu 中。
  • 儲存層:在儲存層中進行冷熱資料分離。熱資料存放在 Kudu 中,冷資料則會存放在 HDFS 上。同時在儲存層中進行分割槽,當分割槽單位為天時,每晚會將一部分資料轉冷並存儲到 HDFS 上。
  • 計算層/查詢層:在查詢層中,使用 SparkSQL 對 Kudu 與 HDFS 上資料進行聯合檢視查詢,最終把查詢結果在前端頁面上進行顯示。

在當時的歷史背景下,初代版本的增長分析平臺幫助我們解決了一系列使用者運營過程中的問題,但同時在歷史架構中也存在了兩個問題:

第一個問題: 由於歷史架構是基於 SparkSQL + Kudu + HDFS 的組合,依賴的元件過多導致運維成本較高。原本的設計是各個元件都使用公共叢集的資源,但是實踐過程中發現執行查詢作業的過程中,查詢效能容易受到公共叢集其他作業的影響,容易抖動,尤其在讀取 HDFS 公共叢集的資料時,有時較為緩慢。

第二個問題: 通過 SparkSQL 進行查詢時,延遲相對較高。SparkSQL 是基於批處理系統設計的查詢引擎,在每個 Stage 之間交換資料 Shuffle 的過程中依然需要落盤操作,完成 SQL 查詢的時延較高。為了保證 SQL 查詢不受資源的影響,我們通過新增機器來保證查詢效能,但是實踐過程中發現,效能提升的空間有限,這套解決方案並不能充分地利用機器資源來達到高效查詢的目的,存在一定的資源浪費。 (此處內容引用自:基於Apache Doris的小米增長分析平臺實踐

針對上述兩個問題,我們的目標是尋求一款計算儲存一體的 MPP 資料庫來替代我們目前的儲存計算層的元件,在通過技術選型後,最終我們決定使用 Apache Doris 替換老一代歷史架構。

基於 Apache Doris 的新版架構

當前架構從資料來源獲取前端埋點資料後,通過資料接入層打入 Apache Doris 後可以直接查詢結果並在前端進行顯示。

選擇 Doris 原因:

  • Doris 具有優秀的查詢效能,能夠滿足業務需求。

  • Doris 支援標準 SQL ,使用者使用與學習成本較低。

  • Doris 不依賴於其他的外部系統,運維簡單。

  • Doris 社群擁有很高活躍度,有利於後續系統的維護升級。

新舊架構效能對比

我們選取了日均資料量大約 10 億的業務,分別在不同場景下進行了效能測試,其中包含 6 個事件分析場景,3 個留存分析場景以及 3 個漏斗分析場景。經過對比後,得出以下結論:

  • 在事件分析的場景下,平均查詢所耗時間降低了 85%
  • 在留存分析和漏斗分析場景下,平均查詢所耗時間降低了 50%

應用實踐

隨著接入業務的增多和資料規模的增長,讓我們也遇到不少問題和挑戰,下面我們將介紹在使用 Apache Doris 過程中沉澱出來的一些實踐經驗

資料匯入

小米內部主要通過 Stream Load 與 Broker Load 以及少量 Insert 方式來進行 Doris 的資料匯入。資料一般會先打到 Talos 訊息佇列中,並分為實時資料和離線資料兩個部分。實時資料寫入 Apache Doris 中: 一部分業務在通過 Flink 對資料進行處理後,會通過 Doris 社群提供的 Flink Doris Connector 元件寫入到 Doris 中,底層依賴於 Doris Stream Load 資料匯入方式。也有一部分會通過 Spark Streaming 封裝的 Stream Load 將資料匯入到 Doris 中。離線資料寫入 Apache Doris 中:

離線資料部分則會先寫到 Hive 中,再通過小米的資料工場將資料匯入到 Doris 中。使用者可以直接在資料工場提交 Broker Load 任務並將資料直接匯入 Doris 中,也可以通過 Spark SQL 將資料匯入 Doris 中。Spark SQL 方式則是依賴了 Doris 社群提供的 Spark Doris Connector 元件,底層也是對 Doris 的 Stream Load 資料匯入方式進行的封裝。

資料查詢

使用者通過資料工場將資料匯入至 Doris 後即可進行查詢,在小米內部是通過小米自研的數鯨平臺來做查詢的。使用者可以通過數鯨平臺對 Doris 進行查詢視覺化,並實現使用者行為分析(為滿足業務的事件分析、留存分析、漏斗分析、路徑分析等行為分析需求,我們為 Doris 添加了相應的 UDF 和 UDAF )和使用者畫像分析。

雖然目前依然需要將 Hive 的資料導過來,但 Doris 社群也正在支援湖倉一體能力,在後續實現湖倉一體能力後,我們會考慮直接通過 Doris 查詢 Hive 與 Iceberg 外表。值得一提的是,Doris 1.1 版本已經實現支援查詢 Iceberg 外表能力。 同時在即將釋出的 1.2 版本中,還將支援 Hudi 外表並增加了 Multi Catalog ,可以實現外部表元資料的同步,無論是查詢外部表的效能還是接入外表的易用性都有了很大的提升。

Compaction 調優

Doris 底層採用類似 LSM-Tree 方式,支援快速的資料寫入。每一次的資料匯入都會在底層的 Tablet 下生成一個新的資料版本,每個資料版本內都是一個個小的資料檔案。單個檔案內部是有序的,但是不同的檔案之間又是無序的。為了使資料有序,在 Doris 底層就會存在 Compaction 機制,非同步將底層小的資料版本合併成大的檔案。Compaction 不及時就會造成版本累積,增加元資料的壓力,並影響查詢效能。由於 Compaction 任務本身又比較耗費機器CPU、記憶體與磁碟資源,如果 Compaction 開得太大就會佔用過多的機器資源並影響到查詢效能,同時也可能會造成 OOM。針對以上問題,我們一方面從業務側著手,通過以下方面引導使用者:

  • 通過引導業務側進行合理優化,對錶設定合理的分割槽和分桶,避免生成過多的資料分片。
  • 引導使用者儘量降低資料的匯入頻率 增大單次資料匯入的量,降低 Compaction 壓力。
  • 引導使用者避免過多使用會在底層生成 Delete 版本的 Delete 操作。在 Doris 中 Compaction 分為 Base Compaction 與 Cumulative Compaction。Cumulative Compaction 會快速的把大量新匯入的小版本進行快速的合併,在執行過程中若遇到 Delete 操作就會終止並將當前 Delete 操作版本之前的所有版本進行合併。由於 Cumulative Compaction 無法處理 Delete 版本,在合併完之後的版本會和當前版本一起放到 Base Compaction 中進行。當 Delete 版本特別多時, Cumulative Compaction 的步長也會相應變短,只能合併少量的檔案,導致 Cumulative Compaction 不能很好的發揮小檔案合併效果。

另一方面我們從運維側著手:

  • 針對不同的業務叢集配置不同的 Compaction 引數。 部分業務是實時寫入資料的,需要的查詢次數很多,我們就會將 Compaction 開的大一點以達到快速合併目的。而另外一部分業務只寫今天的分割槽,但是隻對之前的分割槽進行查詢,在這種情況下,我們會適當的將 Compaction 放的小一點,避免 Compaction 佔用過大記憶體或 CPU 資源。到晚上匯入量變少時,之前匯入的小版本能夠被及時合併,對第二天查詢效率不會有很大影響。
  • 適當降低 Base Compaction 任務優先順序並增加 Cumulative Compaction 優先順序。 根據上文提到的內容,Cumulative Compaction 能夠快速合併大量生成的小檔案,而 Base Compaction 由於合併的檔案較大,執行的時間也會相應變長,讀寫放大也會比較嚴重。所以我們希望 Cumulative Compaction 優先、快速的進行。
  • 增加版本積壓報警。 當我們收到版本積壓報警時,動態調大 Compaction 引數,儘快消耗積壓版本。
  • 支援手動觸發指定表與分割槽下資料分片的 Compaction 任務。 由於 Compaction 不及時,部分表在查詢時版本累積較多並需要能夠快速進行合併。所以,我們支援對單個表或單個表下的某個分割槽提高 Compaction 優先順序。

目前 Doris 社群針對以上問題已經做了 一系列的優化 ,在 1.1 版本中 大幅增強了資料 Compaction 能力,對於新增資料能夠快速完成聚合,避免分片資料中的版本過多導致的 -235 錯誤以及帶來的查詢效率問題。
首先,在 Doris 1.1 版本中,引入了 QuickCompaction,增加了主動觸發式的 Compaction 檢查,在資料版本增加的時候主動觸發 Compaction。同時通過提升分片元資訊掃描的能力,快速的發現數據版本多的分片,觸發 Compaction。通過主動式觸發加被動式掃描的方式,徹底解決資料合併的實時性問題。

同時,針對高頻的小檔案 Cumulative Compaction,實現了 Compaction 任務的排程隔離,防止重量級的 Base Compaction 對新增資料的合併造成影響。

最後,針對小檔案合併,優化了小檔案合併的策略,採用梯度合併的方式,每次參與合併的檔案都屬於同一個資料量級,防止大小差別很大的版本進行合併,逐漸有層次的合併,減少單個檔案參與合併的次數,能夠大幅的節省系統的 CPU 消耗。在社群 1.1 新版本的測試結果中,不論是Compaction 的效率、CPU 的資源消耗,還是高頻匯入時的查詢抖動,效果都有了大幅的提升。

具體可以參考: Apache Doris 1.1 特性揭祕:Flink 實時寫入如何兼顧高吞吐和低延時

監控報警

Doris 的監控主要是通過 Prometheus 以及 Grafana 進行。對於 Doris 的報警則是通過 Falcon 進行。

小米內部使用 Minos 進行叢集部署。Minos 是小米內部自研並開源的大資料服務程序管理工具。在完成 Doris 叢集部署後會更新至小米內部的輕舟數倉中。在輕舟數倉中的節點註冊到 ZooKeeper 後,Prometheus 會監聽 ZooKeeper 註冊的節點,同時訪問對應埠,拉取對應 Metrics 。在這之後,Grafana 會在面板上對監控資訊進行顯示,若有指標超過預設的報警閾值,Falcon 報警系統就會在報警群內報警,同時針對報警級別較高或某些無法及時響應的警告,可直接通過電話呼叫值班同學進行報警。

另外,小米內部針對每一個 Doris 叢集都有 Cloud - Doris 的守護程序。Could - Doris 最大功能是可以對 Doris 進行可用性探測。比如我們每一分鐘對 Doris 傳送一次 select current timestamp(); 查詢,若本次查詢 20 秒沒有返回,我們就會判斷本次探測不可用。小米內部對每一個叢集的可用性進行保證,通過上述探測方法,可以在小米內部輸出 Doris可用性指標。

小米對Apache Doris的優化實踐

在應用 Apache Doris 解決業務問題的同時,我們也發現了 Apache Doris 存在的一些優化項,因此在與社群進行溝通後我們開始深度參與社群開發,解決自身問題的同時也及時將開發的重要 Feature 回饋給社群,具體包括 Stream Load 兩階段提交(2PC)、單副本資料匯入、Compaction 記憶體限制等。

Stream Load 兩階段提交(2PC)

遇到的問題

在 Flink 和 Spark 匯入資料進 Doris 的過程中,當某些異常狀況發生時可能會導致如下問題:

Flink 資料重複匯入 Flink 通過週期性 Checkpoint 機制處理容錯並實現 EOS,通過主鍵或者兩階段提交實現包含外部儲存的端到端 EOS。Doris-Flink-Connector 1.1 之前 UNIQUE KEY 表通過唯一鍵實現了EOS,非 UNIQUE KEY 表不支援 EOS。

Spark SQL 資料部分匯入 通過 SparkSQL 從 Hive 表中查出的資料並寫入 Doris 表中的過程需要使用到 Spark Doris Connector 元件,會將 Hive 中查詢的資料通過多個 Stream Load 任務寫入 Doris 中,出現異常時會導致部分資料匯入成功,部分匯入失敗。

Stream Load 兩階段提交設計

以上兩個問題可以通過匯入支援兩階段提交解決,第一階段完成後確保資料不丟且資料不可見,這就能保證第二階段發起提交時一定能成功,也能夠保證第二階段發起取消時一定能成功。

Doris 中的寫入事務分為三步:

  1. 在  FE 上開始事務,狀態為 Prepare ;
  2. 資料寫入 BE;
  3. 多數副本寫入成功的情況下,提交事務,狀態變成 Committed,並且 FE 向 BE 下發 Publish Version 任務,讓資料立即可見。

引入兩階段提交之後,第 3 步變為狀態修改為 Pre Commit,Publish Version 在第二階段完成。使用者在第一階段完成後(事務狀態為 Pre Commit ),可以選擇在第二階段放棄或者提交事務。

支援 Flink Exactly-Once 語義

Doris-Flink-Connector 1.1 使用兩階段 Stream Load 並支援 Flink 兩階段提交實現了 EOS,只有全域性的 Checkpoint 完成時,才會發起 Sream Load 的第二階段提交,否則發起第二階段放棄。

解決 SparkSQL 資料部分匯入

Doris-Spark-Connector 使用兩階段 Stream Load 之後,成功的 Task 通過 Stream Load 第一階段將寫入資料到 Doris (Pre Commit 狀態,不可見),當作業成功後,發起所有 Stream Load 第二階段提交,作業失敗時,發起所有 Stream Load 第二階段取消。這就確保了不會有資料部分匯入的問題。

單副本資料匯入優化

單副本資料匯入設計

Doris 通過多副本機制確保資料的高可靠以及系統高可用。 寫入任務可以按照使用的資源分為計算和儲存兩類:排序、聚合、編碼、壓縮等使用的是 CPU 和記憶體的計算資源,最後的檔案儲存使用儲存資源,三副本寫入時計算和儲存資源會佔用三份。

那能否只寫一份副本資料在記憶體中,待到單副本寫入完成並生成儲存檔案後,將檔案同步到另外兩份副本呢?答案是可行的,因此針對三副本寫入的場景,我們做了單副本寫入設計。單副本資料在記憶體中做完排序、聚合、編碼以及壓縮後,將檔案同步至其他兩個副本,這樣很大程度上可以節省出 CPU 和記憶體資源。

效能對比測試

Broker Load 匯入 62G 資料效能對比 匯入時間: 三副本匯入耗時 33 分鐘,單副本匯入耗時 31 分鐘。

記憶體使用: 記憶體使用上優化效果十分明顯,三副本資料匯入的記憶體使用是單副本匯入的三倍。單副本匯入時只需要寫一份記憶體,但是三副本匯入時需要寫三份記憶體,記憶體優化達到了 3 倍。

CPU 消耗對比: 三副本匯入的 CPU 消耗差不多是單副本的三倍。

併發場景效能對比

測試中向  100 個表併發匯入資料,每個表有 50 個匯入任務,任務總數為 5000 個。單個 Stream Load 任務匯入的資料行是 200 萬行,約為 90M 的資料。測試中開了 128 個併發, 單副本匯入和三副本匯入進行了對比:

匯入時間: 3 副本匯入耗時 67 分鐘,而後單副本耗時 27 分鐘完成。匯入效率相當提升兩倍以上。

記憶體使用: 單副本的匯入會更低。

CPU消耗對比: 由於都已經是開了併發在匯入,CPU開銷都比較高,但是單副本匯入吞吐提升明顯。

Compaction 記憶體限制

之前 Doris 在單機磁碟一次匯入超過 2000 個 Segment 的情況下,Compaction 有記憶體 OOM 的問題。對於當天寫入但不查當天資料而是查詢之前的資料業務場景,我們會把 Compaction 稍微放的小一點,避免佔用太大的記憶體,導致程序 OOM。Doris 之前每個磁碟有固定的執行緒做儲存在這個盤上的資料的 Compaction,沒有辦法在全域性進行管控。因為我們要限制單個節點上面記憶體的使用,所以我們將該模式改成了生產者-消費者模式:

生產者不停的從所有的磁碟上面生產任務,之後將生產任務提交到執行緒池中。我們可以很好的把控執行緒池的入口,達到對 Compaction 的限制。我們在合併時會把底層的小檔案進行歸併排序,之後在記憶體裡給每一個檔案開闢 Block,所以我們可以近似認為佔用的記憶體量與檔案的數量是相關的,從而可以通過對單節點上同時執行合併的檔案數量做限制,來達到控制記憶體的效果。

我們增加了對單個 BE Compaction 合併的檔案數量的限制。 若正在進行的 Compaction 的檔案數量超過或等於當前限制時,後續提交上來的任務就需要等待,等到前面的 Compaction 任務做完並將指標釋放出來後,後邊提交進來的那些任務才可以進行。

通過這種方式,我們對某些業務場景做了記憶體的限制,很好的避免叢集負載高時佔用過多記憶體導致 OOM 的問題。

總結

自從 Apache Doris 從 2019 年上線第一個業務至今,目前 Apache Doris 已經在小米內部服務了數十個業務、叢集數量達到數十個、節點規模達到數百臺、每天完成數萬次使用者線上分析查詢,承擔了包括增長分析和報表查詢等場景絕大多數線上分析的需求。
與此同時,以上所列小米對於 Apache Doris 的優化實踐,已經有部分功能已經在 Apache Doris 1.0 或 1.1 版本中釋出,有部分 PR 已經合入社群 Master,在不久後釋出的 1.2 新版本中應該就會與大家見面。隨著社群的快速發展,有越來越多小夥伴參與到社群建設中,社群活躍度有了極大的提升。Apache Doris 已經變得越來越成熟,並開始從單一計算儲存一體的分析型 MPP 資料庫走向湖倉一體的道路,相信在未來還有更多的資料分析場景等待去探索和實現。

加入社群

最後,歡迎更多的開源技術愛好者加入 Apache Doris 社群,攜手成長,共建社群生態。

SelectDB 是一家開源技術公司,致力於為 Apache Doris 社群提供一個由全職工程師、產品經理和支援工程師組成的團隊,繁榮開源社群生態,打造實時分析型資料庫領域的國際工業界標準。基於 Apache Doris 研發的新一代雲原生實時數倉 SelectDB,運行於多家雲上,為使用者和客戶提供開箱即用的能力。

相關連結:

SelectDB 官方網站:

http://selectdb.com

Apache Doris 官方網站:

http://doris.apache.org

Apache Doris Github:

http://github.com/apache/doris

Apache Doris 開發者郵件組:

[email protected]

「其他文章」