杭銀消金基於 Apache Doris 的統一數據查詢網關改造

語言: CN / TW / HK

導讀: 隨着業務量快速增長,數據規模的不斷擴大,杭銀消金早期的大數據平台在應對實時性更強、複雜度更高的的業務需求時存在瓶頸。為了更好的應對未來的數據規模增長,杭銀消金於 2022 年 10 月正式引入 Apache Doris 1.2 對現有的風控數據集市進行了升級改造,利用 Multi Catalog 功能統一了 ES、Hive、GP 等數據源出口,實現了聯邦查詢,為未來統一數據查詢網關奠定了基礎;同時,基於 Apache Doris 高性能、簡單易用、部署成本低等諸多優勢,也使得各大業務場景的查詢分析響應實現了從分鐘級到秒級的跨越。

作者|杭銀消金大數據團隊 周其進,唐海定, 姚錦權

杭銀消費金融股份有限公司,成立於 2015 年 12 月,是杭州銀行牽頭組建的浙江省首家持牌消費金融公司,經過這幾年的發展,在 2022 年底資產規模突破 400 億,服務客户數超千萬。公司秉承“數字普惠金融”初心,堅持服務傳統金融覆蓋不充分的、具有消費信貸需求的客户羣體,以“數據、場景、風控、技術”為核心,依託大數據、人工智能、雲計算等互聯網科技,為全國消費者提供專業、高效、便捷、可信賴的金融服務。

業務需求

杭銀消金業務模式是線上業務結合線下業務的雙引擎驅動模式。為更好的服務用户,運用數據驅動實現精細化管理,基於當前業務模式衍生出了四大類的業務數據需求:

  • 預警類:實現業務流量監控,主要是對信貸流程的用户數量與金額進行實時監控,出現問題自動告警。
  • 分析類:支持查詢統計與臨時取數,對信貸各環節進行分析,對審批、授信、支用等環節的用户數量與額度情況查詢分析。
  • 看板類:打造業務實時駕駛艙與 T+1 業務看板,提供內部管理層與運營部門使用,更好輔助管理進行決策。
  • 建模類:支持多維模型變量的建模,通過算法模型回溯用户的金融表現,提升審批、授信、支用等環節的模型能力。

數據架構 1.0

為滿足以上需求,我們採用 Greenplum + CDH 融合的架構體系創建了大數據平台 1.0 ,如下圖所示,大數據平台的數據源均來自於業務系統,我們可以從數據源的 3 個流向出發,瞭解大數據平台的組成及分工:

  • 業務系統的核心繫統數據通過 CloudCanal 實時同步進入 Greenplum 數倉進行數據實時分析,為 BI 報表,數據大屏等應用提供服務,部分數據進入風控集市 Hive 中,提供查詢分析和建模服務。
  • 業務系統的實時數據推送到 Kafka 消息隊列,經 Flink 實時消費寫入 ES,通過風控變量提供數據服務,而 ES 中的部分數據也可以流入 Hive 中,進行相關分析處理。
  • 業務系統的風控數據會落在 MongoDB,經過離線同步進入風控集市 Hive,Hive 數倉支撐了查詢平台和建模平台,提供風控分析和建模服務。

我們將 ES 和 Hive 共同組成了風控數據集市,從上述介紹也可知,四大類的業務需求基本都是由風控數據集市來滿足的,因此我們後續的改造升級主要基於風控數據集市來進行。在這之前,我們先了解一下風控數據集市 1.0 是如何來運轉的。

風控數據集市 1.0

風控數據集市原有架構是基於 CDH 搭建的,由實時寫入和離線統計分析兩部分組成,整個架構包含了 ES、Hive、Greenplum 等核心組件,風控數據集市的數據源主要有三種:通過 Greenplum 數倉同步的業務系統數據、通過 MongoDB 同步的風控決策數據,以及通過 ES 寫入的實時風控變量數據。

實時流數據: 採用了 Kafka + Flink + ES 的實時流處理方式,利用 Flink 對 Kafka 的實時數據進行清洗,實時寫入ES,並對部分結果進行彙總計算,通過接口提供給風控決策使用。

離線風控數據: 採用基於 CDH 的方案實現,通過 Sqoop 離線同步核心數倉 GP 上的數據,結合實時數據與落在 MongoDB 上的三方數據,經數據清洗後統一彙總到 Hive 數倉進行日常的跑批與查詢分析。

需求滿足情況:

在大數據平台 1.0 的的支持下,我們的業務需求得到了初步的實現:

  • 預警類:基於 ES + Hive 的外表查詢,實現了實時業務流量監控;
  • 分析類:基於 Hive 實現數據查詢分析和臨時取數;
  • 看板類:基於 Tableau +Hive 搭建了業務管理駕駛艙以及T+1 業務看板;
  • 建模類:基於 Spark+Hive 實現了多維模型變量的建模分析;

受限於 Hive 的執行效率,以上需求均在分鐘級別返回結果,僅可以滿足我們最基本的訴求,而面對秒級甚至毫秒級的分析場景,Hive 則稍顯吃力。

存在的問題:

  • 單表寬度過大,影響查詢性能。風控數據集市的下游業務主要以規則引擎與實時風控服務為主,因規則引擎的特殊性,公司在數據變量衍生方面資源投入較多,某些維度上的衍生變量會達到幾千甚至上萬的規模,這將導致 Hive 中存儲的數據表字段非常多,部分經常使用的大寬表字段數量甚至超過上千,過寬的大寬表非常影響實際使用中查詢性能。
  • 數據規模龐大,維護成本高。 目前 Hive 上的風控數據集市已經有存量數據在百 T 以上,面對如此龐大的數據規模,使用外表的方式進行維護成本非常高,數據的接入也成為一大難題。
  • 接口服務不穩定。 由風控數據集市離線跑批產生的變量指標還兼顧為其他業務應用提供數據服務的職責,目前 Hive 離線跑批後的結果會定時推送到 ES 集羣(每天更新的數據集比較龐大,接口調用具有時效性),推送時會因為 IO 過高觸發 ES 集羣的 GC 抖動,導致接口服務不穩定。

除此之外,風控分析師與建模人員一般通過 Hive & Spark 方式進行數據分析建模,這導致隨着業務規模的進一步增大,T+1 跑批與日常分析的效率越來越低,風控數據集市改造升級的需求越發強烈。

技術選型

基於業務對架構提出的更高要求,我們期望引入一款強勁的 OLAP 引擎來改善架構,因此我們於 2022 年 9 月份對 ClickHouse 和 Apache Doris 進行了調研,調研中發現 Apache Doris 具有高性能、簡單易用、實現成本低等諸多優勢,而且 Apache Doris 1.2 版本非常符合我們的訴求,原因如下:

寬表查詢性能優異:從官方公佈的測試結果來看,1.2 Preview 版本在 SSB-Flat 寬表場景上相對 1.1.3 版本整體性能提升了近 4 倍、相對於 0.15.0 版本性能提升了近 10 倍,在 TPC-H 多表關聯場景上較 1.1.3 版本上有近 3 倍的提升、較 0.15.0 版本性能提升了 11 倍以上,多個場景性能得到飛躍性提升。

便捷的數據接入框架以及聯邦數據分析能力: Apache Doris 1.2 版本推出的 Multi Catalog 功能可以構建完善可擴展的數據源連接框架,便於快速接入多類數據源,提供基於各種異構數據源的聯邦查詢和寫入能力。 目前 Multi-Catalog 已經支持了 Hive、Iceberg、Hudi 等數據湖以及 MySQL、Elasticsearch、Greenplum 等數據庫,全面覆蓋了我們現有的組件棧,基於此能力有希望通過 Apache Doris 來打造統一數據查詢網關。

生態豐富: 支持 Spark Doris Connector、Flink Doris Connector,方便離線與實時數據的處理,縮短了數據處理鏈路耗費的時間。

社區活躍: Apache Doris 社區非常活躍,響應迅速,並且 SelectDB 為社區提供了一支專職的工程師團隊,為用户提供技術支持服務。

數據架構 2.0

風控數據集市 2.0

基於對 Apache Doris 的初步的瞭解與驗證,22 年 10 月在社區的支持下我們正式引入 Apache Doris 1.2.0 Preview 版本作為風控數據集市的核心組件,Apache Doris 的 Multi Catalog 功能助力大數據平台統一了 ES、Hive、Greenplum 等數據源出口,通過 Hive Catalog 和 ES Catalog 實現了對 Hive & ES 等多數據源的聯邦查詢,並且支持 Spark-Doris-Connector,可以實現數據 Hive 與 Doris 的雙向流動,與現有建模分析體系完美集成,在短期內實現了性能的快速提升。

大數據平台 2.0

風控數據集市調整優化之後,大數據平台架構也相應的發生了變化,如下圖所示,僅通過 Doris 一個組件即可為數據服務、分析平台、建模平台提供數據服務。

在最初進行聯調適配的時候,Doris 社區和 SelectDB 支持團隊針對我們提出的問題和疑惑一直保持高效的反饋效率,給於積極的幫助和支持,快速幫助我們解決在生產上遇到的問題。

需求實現情況:

在大數據平台 2.0 的加持下,業務需求實現的方式也發生了變更,主要變化如下所示

  • 預警類:基於 ES Catalog+ Doris 實現了對實時數據的查詢分析。在架構 1.0 中,實時數據落在 ES 集羣上,通過 Hive 外表進行查詢分析,查詢結果以分鐘級別返回;而在 Doris 1.2 集成之後, 使用 ES Catalog 訪問 ES,可以實現對 ES 數據秒級統計分析。
  • 分析類:基於 Hive Catalog + Doris 實現了對現有風控數據集市的快速查詢。目前 Hive 數據集市存量表在兩萬張左右,如果通過直接創建 Hive 外部表的方式,表結構映射關係的維護難度與數據同步成本使這一方式幾乎不可能實現。而 Doris 1.2 的 Multi Catalog 功能則完美解決了這個問題,只需要創建一個 Hive Catalog,就能對現有風控數據集市進行查詢分析,既能提升查詢性能,還減少了日常查詢分析對跑批任務的資源影響。
  • 看板類:基於 Tableau + Doris 聚合展示業務實時駕駛艙和 T+1 業務看板,最初使用 Hive 時,報表查詢需要幾分鐘才能返回結果,而 Apache Doris 則是秒級甚至是毫秒級的響應速度。
  • 建模類:基於 Spark+Doris 進行聚合建模。利用 Doris1.2 的 Spark-Doris-Connector功 能,實現了 Hive 與 Doris 數據雙向同步,滿足了 Spark 建模平台的功能複用。同時增加了 Doris 數據源,基礎數據查詢分析的效率得到了明顯提升,建模分析能力的也得到了增強。

在 Apache Doris 引入之後,以上四個業務場景的查詢耗時基本都實現了從分鐘級到秒級響應的跨越,性能提升十分巨大。

生產環境集羣監控

為了快速驗證新版本的效果,我們在生產環境上搭建了兩個集羣,目前生產集羣的配置是 4 個 FE + 8個 BE,單個節點是配置為 64 核+ 256G+4T,備用集羣為 4 個 FE + 4 個 BE 的配置,單個節點配置保持一致。

集羣監控如下圖所示:

可以看出,Apache Doris 1.2 的查詢效率非常高,原計劃至少要上 10 個節點,而在實際使用下來,我們發現當前主要使用的場景均是以 Catalog 的方式查詢,因此集羣規模可以相對較小就可以快速上線,也不會破壞當前的系統架構,兼容性非常好。

數據集成方案

前段時間,Apache Doris 1.2.2 版本已經發布,為了更好的支撐應用服務,我們使用 Apache Doris 1.2.2 與 DolphinScheduler 3.1.4 調度器、SeaTunnel 2.1.3 數據同步平台等開源軟件實現了集成,以便於數據定時從 Hive 抽取到 Doris 中。整體的數據集成方案如下:

在當前的硬件配置下,數據同步採用的是 DolphinScheduler 的 Shell 腳本模式,定時調起 SeaTunnel 的腳本,數據同步任務的配置文件如下:

 env{
  spark.app.name = "hive2doris-template"
  spark.executor.instances = 10
  spark.executor.cores = 5
  spark.executor.memory = "20g"
}
spark {
  spark.sql.catalogImplementation = "hive"
}
source {
  hive {
    pre_sql = "select * from ods.demo_tbl where dt='2023-03-09'"
    result_table_name = "ods_demo_tbl"
  }
}
 
transform {
}
 
sink {
  doris {
      fenodes = "192.168.0.10:8030,192.168.0.11:8030,192.168.0.12:8030,192.168.0.13:8030"
      user = root
      password = "XXX"
      database = ods
      table = ods_demo_tbl
      batch_size = 500000
      max_retries = 1
      interval = 10000
      doris.column_separator = "\t"
    }
}

該方案成功實施後,資源佔用、計算內存佔用有了明顯的降低,查詢性能、導入性能有了大幅提升:

  1. 存儲成本降低

使用前:Hive 原始表包含 500 個字段,單個分區數據量為 1.5 億/天,在 HDFS 上佔用約 810G 的空間。

使用後:我們通過 SeaTunnel 調起 Spark on YARN 的方式進行數據同步,可以在 40 分鐘左右完成數據同步,同步後數據佔用 270G 空間,存儲資源僅佔之前的 1/3

  1. 計算內存佔用降低,性能提升顯著

使用前:上述表在 Hive 上進行 Group By 時,佔用 YARN 資源 720 核 1.44T 內存,需要 162 秒才可返回結果;

使用後:

  • 通過 Doris 調用 Hive Catalog 進行聚合查詢,在設置 set exec_mem_limit=16G 情況下用時 58.531 秒,查詢耗時較之前減少了近 2/3;
  • 在同等條件下,在 Doris 中執行相同的的操作可以在 0.828 秒就能返回查詢結果,性能增幅巨大。

具體效果如下:

(1)Hive 查詢語句,用時 162 秒。

select count(*),product_no   FROM ods.demo_tbl where dt='2023-03-09'
group by product_no;

(2)Doris 上 Hive Catalog 查詢語句,用時 58.531 秒。

set exec_mem_limit=16G;
select count(*),product_no   FROM hive.ods.demo_tbl where dt='2023-03-09'
group by product_no;

(3)Doris 上本地表查詢語句,僅用時0.828秒

select count(*),product_no   FROM ods.demo_tbl where dt='2023-03-09'
group by product_no;
  1. 導入性能提升

使用前:Hive 原始表包含 40 個字段,單個分區數據量 11 億/天,在 HDFS 上佔用約 806G 的空間

使用後:通過 SeaTunnel 調起 Spark on YARN 方式進行數據同步,可以在 11 分鐘左右完成數據同步,即 1 分鐘同步約一億條數據,同步後佔用 378G 空間。

可以看出,在數據導入性能的提升的同時,資源也有了較大的節省,主要得益於對以下幾個參數進行了調整:

push_write_mbytes_per_sec:BE 磁盤寫入限速,300M

push_worker_count_high_priority: 同時執行的 push 任務個數,15

push_worker_count_normal_priority: 同時執行的 push 任務個數,15

架構收益

(1)統一數據源出口,查詢效率顯著提升

風控數據集市採用的是異構存儲的方式來存儲數據,Apache Doris 的 Multi Catalog 功能成功統一了 ES、Hive、GP 等數據源出口,實現了聯邦查詢。 同時,Doris 本身具有存儲能力,可支持其他數據源中的數據通過外表插入內容的方式快速進行數據同步,真正實現了數據門户。此外,Apache Doris 可支持聚合查詢,在向量化引擎的加持下,查詢效率得到顯著提升。

(2) Hive 任務拆分,提升集羣資源利用率

我們將原有的 Hive 跑批任務跟日常的查詢統計進行了隔離,以提升集羣資源的利用效率。目前 YARN 集羣上的任務數量是幾千的規模,跑批任務佔比約 60%,臨時查詢分析佔比 40%,由於資源限制導致日常跑批任務經常會因為資源等待而延誤,臨時分析也因資源未及時分配而導致任務無法完成。當部署了 Doris 1.2 之後,對資源進行了劃分,完全擺脱 YARN 集羣的資源限制,跑批與日常的查詢統計均有了明顯的改善,基本可以在秒級得到分析結果,同時也減輕了數據分析師的工作壓力,提升了用户對平台的滿意度。

(3)提升了數據接口的穩定性,數據寫入性能大幅提升

之前數據接口是基於 ES 集羣的,當進行大批量離線數據推送時會導致 ES 集羣的 GC 抖動,影響了接口穩定性,經過調整之後,我們將接口服務的數據集存儲在 Doris 上,Doris 節點並未出現抖動,實現數據快速寫入,成功提升了接口的穩定性,同時 Doris 查詢在數據寫入時影響較小,數據寫入性能較之前也有了非常大的提升,千萬級別的數據可在十分鐘內推送成功

(4)Doris 生態豐富,遷移方便成本較低。

Spark-Doris-Connector 在過渡期為我們減輕了不少的壓力,當數據在 Hive 與 Doris 共存時,部分 Doris 分析結果通過 Spark 回寫到 Hive 非常方便,當 Spark 調用 Doris 時只需要進行簡單改造就能完成原有腳本的複用,遷移方便、成本較低。

(5)支持橫向熱部署,集羣擴容、運維簡單。

Apache Doris 支持橫向熱部署,集羣擴容方便,節點重啟可以在在秒級實現,可實現無縫對接,減少了該過程對業務的影響; 在架構 1.0 中,當 Hive 集羣與 GP 集羣需要擴容更新時,配置修改後一般需要較長時間集羣才可恢復,用户感知比較明顯。而 Doris 很好的解決了這個問題,實現用户無感知擴容,也降低了集羣運維的投入。

未來與展望

當前在架構 2.0 中的 Doris 集羣在大數據平台中的角色更傾向於查詢優化,大部分數據還集中維護在 Hive 集羣上,未來我們計劃在升級架構 3.0 的時候,完成以下改造:

  • 實時全量數據接入:利用 Flink 將所有的實時數據直接接入 Doris,不再經過 ES 存儲;
  • 數據集數據完整性:利用 Doris 構建實時數據集市的原始層,利用 FlinkCDC 等同步工具將業務庫 MySQL與決策過程中產生的 MongoDB 數據實時同步到 Doris,最大限度將現有數據都接入 Doris 的統一平台,保證數據集數據完整性。
  • 離線跑批任務遷移:將現有 Hive&Spark 中大部分跑批任務遷移至 Doris,提升跑批效率;
  • 統一查詢分析出口:將所有的查詢分析統一集中到 Doris,完全統一數據出口,實現統一數據查詢網關,使數據的管理更加規範化;
  • 強化集羣穩定擴容:引入可視化運維管理工具對集羣進行維護和管理,使 Doris 集羣能夠更加穩定支撐業務擴展。

總結與致謝

Apache Doris1.2 是社區在版本迭代中的重大升級,藉助 Multi Catalog 等優異功能能讓 Doris 在 Hadoop 相關的大數據體系中快速落地,實現聯邦查詢;同時可以將日常跑批與統計分析進行解耦,有效提升大數據平台的的查詢性能。

作為第一批 Apache Doris1.2 的用户,我們深感榮幸,同時也十分感謝 Doris 團隊的全力配合和付出,可以讓 Apache Doris 快速落地、上線生產,併為後續的迭代優化提供了可能。

Apache Doris 1.2 值得大力推薦,希望大家都能從中受益,祝願 Apache Doris 生態越來越繁榮,越來越好!

「其他文章」