應用實踐 | 物易雲通基於 Apache Doris 的實時資料倉庫建設

語言: CN / TW / HK

導讀: 物易雲通目前已成為國內產融供應鏈運營服務平臺的領軍企業之一,平臺年交易額超過 200 億元 隨著公司業務的快速發展,對資料計算分析的時效要求也越來越高。經資料團隊的調研對比,於 2021 年引入了 Apache Doris 作為實時資料倉庫。實戰過程中獲得一些經驗,在此分享給大家。

作者物易雲通/司機寶大資料負責人 吳凡

業務背景

武漢物易雲通網路科技有限公司成立於 2015 年 6 月,總部位於湖北省武漢市東湖高新區。作為國內產業網際網路的探索先行者,公司致力於將產業網際網路思維與新一代資訊科技深化應用於煤炭、建築、再生資源三大業務領域,以標準化、場景化、數字化的供應鏈綜合服務解決能力,開創網際網路化的“供應鏈技術+物流服務+金融場景”的產融協同新生態。目前公司已成為國內產融供應鏈運營服務平臺的領軍企業之一,平臺年交易額超過 200 億元。公司入選 2020 年中國網際網路企業綜合實力 100 強,2021 年武漢市軟體收入第一名。

隨著公司業務的快速發展,對資料計算分析的時效要求也越來越高。之前的產品已經無法應對龐大的資料量,為解決這一問題,資料團隊通過調研對比,在 2021 年引入了 Apache Doris 作為實時資料倉庫。基於Apache Doris 建設實時數倉的過程中,沉澱了許多經驗,藉此機會分享給大家。

數倉架構演進

公司創業之初,是使用 MySQL 作為 BI 倉庫,每天增量卸數後匯入,通過定時排程儲存過程進行計算。該方案能快速滿足公司的跨庫資料關聯計算的需求,但是隨著業務發展,資料和任務不斷增多,MySQL 已難以支援,另外該方案侷限性比較大,如果業務表存在物理刪除或者沒有資料更新時間的情況下,則會導致資料不準。

為了解決上述問題,我們搭建了一套 CDH 作為資料倉庫。通過 Canal 訂閱 MySQL 的 Binlog 到 Kafka,進行編寫消費程式,將資料寫入 Hbase,然後增量合併到 Hive 中,通過 Oozie 排程計算指令碼。

然而離線 T+1 的資料只能滿足一部分的業務需求,因此我們需要一套能快速查詢實時資料的資料倉庫,同時可以支援離線需求和實時需求,經過許多產品的調研對比,證明 Apache Doris 可以很好地實現我們的業務需求。

img

Doris 數倉架構通過 Flink CDC 實時接入生產庫資料到 Doris,支援實時 OLAP,然後通過海豚排程器定時執行 SQL 指令碼,替代 Hive 的離線資料計算任務。

新架構的優勢

1、資料處理架構簡單 新的架構裡我們使用了 Flink CDC 來做資料同步(Flink CDC 內建了一套 Debezium 和 Kafka 元件,但這個細節對使用者遮蔽),它不但可以讀取增量,還能讀取全量資料,然後將資料通過 Stream load 的方式寫入 Doris。

2、一份資料,實時全量 由於 Hive 查詢很慢,所以之前是把 Hive 的資料通過 Sqoop 推送到 MySQL 進行查詢,即有多份資料儲存在不同的 MySQL 上,維護和儲存成本都很高,並且 Hive 裡只有 T-1 的資料,需要每天寫指令碼合併。Doris 支援 MySQL 協議,可直接查詢,同時 Doris 支援主鍵資料去重及更新,有實時的全量資料,解決了實時報表和線上 OLAP 的需求。

3、架構簡單,易於部署維護 相對於 Hadoop 那一套各種元件來說,Doris 部署維護非常簡單。

4、一鍵全庫接入,結構實時同步 通過自研的資料易平臺,實現了 MySQL 一鍵全庫接入 Doris,即通過頁面選擇後,一鍵生成 Flink CDC 任務在 Yarn 上持續執行。而且通過解析 Binlog 裡面的 DDL 語句,將其轉化為 Doris 語法,利用 Doris 的 Online Schema Change 特性,實時同步生產資料庫的表結構變更,保障了表結構一致,新增欄位資料一致。

5、秒級查詢 Doris 查詢是秒級,Hive 是分鐘級,跑批的效率提升了 20-30 倍。而之前用的是 Impala 加速 Hive 的查詢,每個表在使用前都要 Refresh 一次,非常麻煩,並且 Count Distinct Impala 近似計算不準確。

系統重點功能

資料接入

第一步選擇需要接入的 MySQL 庫型別,預設是 A,即最常見的全域性庫名唯一。 img

另外還存在幾種其他的情況:

B、全域性有多個名稱相同、結構不一致的資料庫。比如:部分大表做了資料切割歸檔到另一臺機器上了。

C、全域性有多個名稱相同、結構一致的資料庫,即分庫。我們需要將資料合併到一個 Doris 庫表中,方便資料分析。

D、全域性有多個名稱不同、結構一致的庫。比如:DB租戶 A 的庫, DB租戶 B 的庫,我們也是要把資料合併分析。

第二步,選擇 MySQL 庫例項,進行提交(如果不想接入全庫,可以勾選部分表)。對應的目標資料庫是 Doris 裡面的 ODS 貼源層,和生產資料保持一致,一個庫一個任務,可以視情況調整記憶體等引數。

img

注意: 通過列表可以進行任務管理,恢復任務是運用了 Flink CDC 的 Checkpoint 機制,在任務異常掛掉時可以恢復執行。Flink CDC 任務目前是執行在 Yarn 上。

img

資料計算

我們在資料易平臺上開發了資料計算任務功能,使用者編寫 SQL 後,點選 SQL 解析,即可自動識別出腳本里用到了哪些來源表,生成了哪些結果表,最終在海豚排程器裡生成對應的任務和上游任務 Depend 關係。

img

說明: 為了保障各個 T+1 報表的資料一致性,我們做了 ODS 層到 DWD 層的一套計算任務,即每天 0 點將 ODS 層中近 2 天的增量資料 insert into 到 DWD 層進行更新,後續 T+1 的計算任務統一從 DWD 層進行計算。

注意: 需要把物理刪除變成邏輯刪除,使用時剔除。如果直接在 ODS 裡面同步物理刪除,會導致 DWD 層裡無法通過增量方式同步刪除。

新架構的收益

降低資源成本

當前我們的叢集配置為 5 臺阿里雲 ESC,16 核 64G。在相同叢集配置下,1000 個表的每日增量資料合併任務,用 Hive 需要 3-5 小時,用 Spark需要 2-3 小時,然而同樣的需求 Drois 運用 Unique Key 模型完成只需要 10 分鐘,大幅提前了後續計算任務的開始時間。

另外,因 Hive 跑得慢,我們後續的幾百個 Hive 計算任務,排隊情況很嚴重,不得不把一些優先順序低的任務排到下午甚至晚上,日任務全部跑完需要十幾個小時。而我們把全部批任務遷移到 Doris 上計算後,全部任務跑完只需要 2 小時不到,後續增加新的需求任務完全無壓力。

總而言之,使用 Doris 後,報表資料的更新時間大幅提前,臨時的資料查詢需求響應時長大幅縮短,至少節約了每年幾萬的大資料叢集擴容成本,同時獲得了各部門的認可。

提升開發效率

隨著公司業務快速的發展,會不斷的有新的資料分析需求,就需要我們接入新庫新表,給老表加欄位等,這對於 Hive 數倉是非常痛苦的,表要重建、全量資料要抽,這就需要每週有半天時間都在處理這些事情。

在使用 Doris 作為數倉後,通過我們的資料易平臺配置 Flink CDC 任務快速接入 MySQL 庫表的全量+增量資料,同時利用 Doris 的 Online Schema Change 特性,實時同步 Binlog 裡的 DDL 表結構變更到 Doris,資料接入數倉零開發成本。

另外因為 Doris 支援 MySQL 協議直接對接資料視覺化應用,我們不需要再把結果資料從 Hive 推到 MySQL 裡提供資料服務,節約了資料庫資源,減少了開發步驟。

體現資料價值

Doris 有審計日誌,我們可以通過日誌,分析出每個表每天的查詢使用情況,以便我們評估跟進資料價值、下線廢棄報表及任務。另外還可以預警資源消耗多、查詢慢的查詢語句,幫助使用者進行語法優化等。

問題與經驗

1、MySQL 和 Doris 欄位型別不一致 MySQL 的 Blob、Mediumint、Year、Text 等欄位型別在 Doris 中沒有,需要我們轉換成 Doris 對應的欄位型別,Varchar 的長度我們對應在 Doris 需擴大成 3 倍。

2、MySQL DDL 語法相容性問題 MySQL 的 Bigint Unsigned、AUTO_INCREMENT、CURRENT_TIMESTAMP 等語法在 Doris 裡不支援。

3、多個大表 Join 計算時,記憶體使用過大,導致 BE 掛掉,影響資料寫入。 目前 Doris 新版本已對記憶體控制這部分進行優化。

4、Hive 和 Doris 差異 將 Hive 計算指令碼改成 Doris 計算指令碼時遇到一些語法問題,如:

  • Doris 不支援 Lateral View ,升級新版本已解決。
  • 之前的一些 Hive UDF 函式是 Java 寫的,Doris 不支援,我們用另外的程式對資料做的二次加工處理,後續 Doris 新版本會支援。
  • Doris 缺少一些函式,如 Last_Day 通過取日期下個月的第一天再減一天來實現, Collect_Set 通過先去重再 Group_Concat 實現等。

5、分析函式問題

  • 分析函式 XX() over(partition by) 在外層和子查詢中同時存在時,報 errCode = 2, detailMessage = can't support。我們通過將子查詢資料放入臨時表解決該問題,後面 Doris 1.0 版本已解決該問題。
  • 多個 lag PARTITION by 函式和 min PARTITION by 一起使用時,有亂碼的情況。撰文時該 Bug 已修復,等待合併發版。

6、Doris 動態分割槽 動態分割槽欄位必須為 Date 到月、周、日,不能根據寫入的資料自動建立分割槽,目前我們通過建表時指定初始化的分割槽數解決此問題。

7、Stream Load 寫入過於頻繁報錯 Stream Load 寫入 Doris,寫入太頻繁會報錯誤碼 235 問題,同樣的表 Routine Load 不會出現這個問題,我們通過批量提交解決,Doris 新版本已優化該問題。

以上問題在向社群反饋後,得到了社群的快速響應。截止目前,上述問題基本上都已經得到修復,並且將在即將釋出的新版中上線。

寫在最後

首先感謝 Apache Doris 社群的 PPMC 張家鋒和多個 Committer 的大力支援,有任何問題都能很快得到響應。也感謝公司領導對我們方案的認可和支援,做技術改造不僅要花費很多金錢和精力,而且還需改變的勇氣和堅定的信念。也感謝各位同行在使用 Apache Doris 上給了我們很多經驗和信心。最後祝願 Apache Doris 社群發展越來越好!

img

SelectDB 是一家開源技術公司,致力於為 Apache Doris 社群提供一個由全職工程師、產品經理和支援工程師組成的團隊,繁榮開源社群生態,打造實時分析型資料庫領域的國際工業界標準。基於 Apache Doris(incubating)研發的新一代雲原生實時數倉 SelectDB,運行於多家雲上,為使用者和客戶提供開箱即用的能力。

相關連結:

SelectDB 官方網站:

https://selectdb.com (We Are Coming Soon)

Apache Doris 官方網站:

http://doris.incubator.apache.org

Apache Doris Github:

https://github.com/apache/incubator-doris

Apache Doris 開發者郵件組:

[email protected]

二維碼.png