應用實踐 | 海量資料,秒級分析!Flink+Doris 構建實時數倉方案

語言: CN / TW / HK

編者薦語:隨著領創集團的快速發展,為了滿足十億級資料量的實時報表統計與決策分析,領創集團選擇了 Flink + Doris 的實時數倉方案。本篇文章詳盡了介紹了此方案的實踐過程。

以下文章來源於領創集團Advance Group, 作者蘇浩

原文連結:https://mp.weixin.qq.com/s/qg_4nsfo5gxwe8_1OiWVSA

業務背景

Advance Intelligence Group(領創集團)成立於 2016 年,是一家以 AI 技術驅動的科技集團,致力於通過科技創新的本地化應用,改造和重塑金融和零售行業,以多元化的業務佈局打造一個服務於消費者、企業和商戶的生態圈。集團旗下包含企業業務和消費者業務兩大板塊,企業業務包含 ADVANCE.AI 和 Ginee,分別為銀行、金融、金融科技、零售和電商行業客戶提供基於 AI 技術的數字身份驗證、風險管理產品和全渠道電商服務解決方案;消費者業務 Atome Financial 包括亞洲領先的先享後付平臺 Atome 和數字金融服務。

2021 年 9 月,領創集團宣佈完成超 4 億美元 D 輪融資,融資完成後領創集團估值已超 20 億美元,成為新加坡最大的獨立科技創業公司之一。業務覆蓋新加坡、印度尼西亞、中國大陸、印度、越南等 17 個國家與地區,服務了 15 萬以上的商戶和 2000 萬消費者。

隨著集團業務的快速發展,為滿足十億級資料量的實時報表統計與決策分析,我們選擇基於 Apache Flink + Apache Doris 構建了實時數倉的系統方案。

Doris 基本原理

Apache Doris 基本架構非常簡單,只有 FE(Frontend)、BE(Backend) 兩種角色,不依賴任何外部元件,對部署和運維非常友好。架構圖如下:

圖片

FE(Frontend)以 Java 語言為主。

主要功能職責:

  • 接收使用者連線請求(MySQL 協議層)
  • 元資料儲存與管理
  • 查詢語句的解析與執行計劃下發
  • 叢集管控

FE 主要有有兩種角色,一個是 Follower,還有一個 Observer,Leader 是經過選舉推選出的特殊 Follower。Follower 主要是用來達到元資料的高可用,保證單節點宕機的情況下,元資料能夠實時地線上恢復,而不影響整個服務。

BE(Backend) 以 C++ 語言為主。

主要功能職責:

  • 資料儲存與管理
  • 查詢計劃的執行

技術架構

整體資料鏈路如下圖:

圖片

  1. 通過 FlinkCDC 採集 MySQL Binlog 到 Kafka 中的 Topic1
  2. 開發 Flink 任務消費上述 Binlog 生成相關主題的寬表,寫入 Topic2
  3. 配置 Doris Routine Load 任務,將 Topic2 的資料匯入 Doris

應用實踐

關於步驟1和步驟2的實踐,“基於 Flink-CDC 資料同步⽅案” 的文章中已有說明,本文將對步驟3展開詳細的說明。

建表

因業務資料經常伴隨有 UPDATE,DELETE 等操作,為了保持實時數倉的資料粒度與業務庫一致,所以選擇 Doris Unique 模型(資料模型在下文有重點介紹)具體建表語句如下:

CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '資料處理時間',
data_status INT COMMENT '資料是否刪除,1表示正常,-1表示資料已經刪除'
) 
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);


可以看到,表結構中有兩個欄位分別是 data_deal_datetime,data_status。

  • data_deal_datetime 主要是相同 key 情況下資料覆蓋的判斷依據
  • data_status 用來相容業務庫對資料的刪除操作

資料匯入任務

Doris 提供了主動拉取 Kafka 資料的功能,配置如下:

CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="["$.key1","$.key2","$.key3","$.value1","$.value2",
            "$.value3","$.data_deal_datetime","$.data_status"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);


匯入語句中:

  • ORDER BY data_deal_datetime 表示根據 data_deal_datetime 欄位去覆蓋 key 相同的資料
  • desired_concurrent_number 表示期望的併發度。

max_batch_interval/max_batch_rows/max_batch_size 這 3 個引數分別表示

  • 每個子任務最大執行時間。
  • 每個子任務最多讀取的行數。
  • 每個子任務最多讀取的位元組數。

任務監控與報警

Doris routine load 如果遇到髒資料會導致任務暫停,所以需要定時監控資料匯入任務的狀態並且自動恢復失敗任務。並且將錯誤資訊發至指定的 lark 群。具體指令碼如下:

import pymysql  #匯入 pymysql
import requests,json
​
​
#開啟資料庫連線
db= pymysql.connect(host="host",user="user",
                    password="passwd",db="database",port=port)
​
# 使用cursor()方法獲取操作遊標
cur = db.cursor()
​
#1.查詢操作
# 編寫sql 查詢語句 
sql = "show routine load"
cur.execute(sql)        #執行sql語句
results = cur.fetchall()        #獲取查詢的所有記錄
for row in results :
    name = row[1]
    state = row[7]
    if state != 'RUNNING':
        err_log_urls = row[16]
        reason_state_changed = row[15]
        msg = "doris 資料匯入任務異常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即將自動恢復,請檢查錯誤資訊" % (name, state,
reason_state_changed, err_log_urls)
        payload_message = {
    "msg_type": "text",
    "content": {
        "text": msg
    }
}
        url = 'lark 報警url'
        s = json.dumps(payload_message)
        r = requests.post(url, data=s)
        cur.execute("resume routine load for " + name)
​
cur.close()
db.close()


現在線上配置的監控 1 分鐘執行一次,如果遇到任務暫停,會自動恢復匯入任務,但是導致任務失敗的髒資料會跳過,此時需要人工排查失敗原因,修復後重新觸發該條資料的匯入。

資料模型

Doris 內部表中,主要有 3 種資料模型,分別是 Aggregate 、Unique 、Duplicate。在介紹資料模型之前,先解釋一下 Column:在 Doris 中,Column 可以分為兩大類:Key 和 Value。從業務角度看,Key 和 Value 分別對應維度列和指標列。

Aggregate

簡單來說,Aggregate 模型就是預聚合模型,類似於 MOLAP,通過提前定義 Key 列及 Value 列的聚合方式,在資料匯入的時候已經將 Key 列相同的資料按照 value 列的聚合方式聚合在一起,即最終表裡 Key 相同的資料只保留一條,Value 按照相應的規則計算。下面舉例說明。

表結構如下:

CREATE TABLE tmp_table_1
    (
        user_id varchar(64) COMMENT "使用者id",
        channel varchar(64) COMMENT "使用者來源渠道",
        city_code varchar(64) COMMENT "使用者所在城市編碼",
        last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "使用者最後一次訪問時間",
        total_cost BIGINT SUM DEFAULT "0" COMMENT "使用者總消費"
    )
ENGINE=OLAP
AGGREGATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
    PROPERTIES("storage_type"="column","replication_num" = "1"):


表結構中,Key 列分別是 user_id、channel、city_code ,Value 列是 last_visit_date、total_cost,他們的聚合方式分別為 REPLACE、SUM。

現在,向該表中插入一批資料:

insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107');


按照我們的理解,現在 tmp_table_1 中雖然我們插入了 3 條資料,但是這 3 條資料的 Key 都是一致的,那麼最終表中應該只有一條資料,並且 last_visit_date 的值應為"2022-03-01 00:00:01",total_cost 的值應為 240。下面我們驗證一下:

圖片

可以看到,結果與我們預期⼀致。

Unique 模型

正如本次建設的實時數倉那樣,我們更加關注的是如何保證主鍵的唯⼀性,即如何獲得 Primary Key 唯⼀性約束。⼤家可以參考上⾯建表的例⼦,在這⾥不再舉例說明。

Duplicate 模型

在某些多維分析場景下,資料既沒有主鍵,也沒有聚合需求。因此引⼊ Duplicate 資料模型來滿⾜這類需求。舉例說明。

表結構如下:

CREATE TABLE tmp_table_2
    (
        user_id varchar(64) COMMENT "使用者id",
        channel varchar(64) COMMENT "使用者來源渠道",
        city_code varchar(64) COMMENT "使用者所在城市編碼",
        visit_date DATETIME COMMENT "使用者登陸時間",
cost BIGINT COMMENT "使用者消費金額"
    )
ENGINE=OLAP
DUPLICATE KEY(user_id, channel, city_code)
DISTRIBUTED BY HASH(user_id) BUCKETS 6
    PROPERTIES("storage_type"="column","replication_num" = "1");


插入資料:

insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57');
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76');
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107');


因為此時資料是 Duplicate 模型,不會進行任何處理,查詢應該能查到 3 條資料

圖片

資料模型的選擇建議

因為資料模型在建表時就已經確定,且無法修改。所以,選擇一個合適的資料模型非常重要。

Aggregate 模型可以通過預聚合,極大地降低聚合查詢時所需掃描的資料量和查詢的計算量,非常適合有固定模式的報表類查詢場景。但是該模型對 count(*) 查詢很不友好。同時因為固定了 Value 列上的聚合方式,在進行其他型別的聚合查詢時,需要考慮語意正確性。

Unique 模型針對需要唯一主鍵約束的場景,可以保證主鍵唯一性約束,但是無法利用 ROLLUP 等預聚合帶來的查詢優勢。

Duplicate 適合任意維度的 Ad-hoc 查詢,雖然同樣無法利用預聚合的特性,但是不受聚合模型的約束,可以發揮列存模型的優勢。

總結

Flink + Doris 構建的實時數倉上線後,報表介面相應速度得到了明顯提高,單表 10 億級聚合查詢響應速度 TP95 為 0.79 秒,TP99 為 5.03 秒。到目前為止,整套數倉體系已平穩執行 8 個多月。

歡迎更多的開源技術愛好者加入 Apache Doris 社群,攜手成長,共建社群生態。

圖片

圖片

圖片

SelectDB 是一家開源技術公司,致力於為 Apache Doris 社群提供一個由全職工程師、產品經理和支援工程師組成的團隊,繁榮開源社群生態,打造實時分析型資料庫領域的國際工業界標準。基於 Apache Doris 研發的新一代雲原生實時數倉 SelectDB,運行於多家雲上,為使用者和客戶提供開箱即用的能力。

相關連結:

SelectDB 官方網站:

https://selectdb.com (We Are Coming Soon)

Apache Doris 官方網站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 開發者郵件組:

[email protected]