火山引擎在行為分析場景下的ClickHouse JOIN優化

語言: CN / TW / HK

更多技術交流、求職機會,歡迎關注位元組跳動資料平臺微信公眾號,回覆【1】進入官方交流群

 

背景

火山引擎增長分析DataFinder基於ClickHouse來進行行為日誌的分析,ClickHouse的主要版本是基於社群版改進開發的位元組內部版本。主要的表結構:

事件表:儲存使用者行為資料,以使用者ID分shard儲存。

--列出了主要的欄位資訊
CREATE TABLE tob_apps_all
(
    `tea_app_id`                UInt32,  --應用ID
    `device_id`             String DEFAULT '', --裝置ID
    `time`                  UInt64,--事件日誌接受時間
    `event`                 String,--事件名稱
    `user_unique_id`        String,--使用者ID
    `event_date`            Date,--事件日誌日期,由time轉換而來
    `hash_uid`              UInt64 --使用者ID hash過後的id,用來join降低記憶體消耗
)│

使用者表:儲存使用者的屬性資料,以使用者ID分shard儲存。

--列出了主要的欄位資訊
CREATE TABLE users_unique_all
(
    `tea_app_id`            UInt32,            --應用ID
    `user_unique_id`        String DEFAULT '', -- 使用者ID
    `device_id`             String DEFAULT '', -- 使用者最近的裝置ID
    `hash_uid`              UInt64,--使用者ID hash過後的id,用來join降低記憶體消耗
    `update_time`           UInt64,--最近一次更新時間
    `last_active_date`      Date   --使用者最後活躍日期
)

裝置表:儲存裝置相關的資料,以裝置ID分shard儲存。

--列出了主要的欄位資訊
CREATE TABLE devices_all
(
    `tea_app_id`            UInt32,            --應用ID
    `device_id`             String DEFAULT '', --裝置ID    
    `update_time`           UInt64,            --最近一次更新時間
    `last_active_date`      Date               --使用者最後活躍日期
)

業務物件表:儲存業務物件相關的資料,每個shard儲存全量的資料

--列出了主要的欄位資訊
CREATE TABLE rangers.items_all
(
    `tea_app_id`            UInt32,
    `hash_item_id`          Int64,
    `item_name`             String, --業務物件名稱。比如商品
    `item_id`               String, --業務物件ID。比如商品id 1000001
    `last_active_date`      Date
) 

業務挑戰

隨著接入應用以及應用的DAU日益增加,ClickHouse表的事件量增長迅速;並且基於行為資料需要分析的業務指標越來越複雜,需要JOIN的表增多;我們遇到有一些涉及到JOIN的複雜SQL執行效率低,記憶體和CPU資源佔用高,導致分析介面響應時延和錯誤率增加。

關於Clickhouse的JOIN

在介紹優化之前,先介紹一下基本的ClickHouse JOIN的型別和實現方式

分散式JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

基本執行過程:

  1. 一個Clickhouse節點作為Coordinator節點,給每個節點分發子查詢,子查詢sql(tob_apps_all替換成本地表,users_unique_all保持不變依然是分散式表)

  2. 每個節點執行Coordinator分發的sql時,發現users_unique_all是分散式表,就會去所有節點上去查詢以下SQL(一共有N*N。N為shard數量)

    SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')

  3. 每個節點從其他N-1個節點拉取2中子查詢的全部資料,全量儲存(記憶體or檔案),進行本地JOIN

  4. Coordinator節點從每個節點拉取3中的結果集,然後做處理返回給client

存在的問題:

  1. 子查詢數量放大

  2. 每個節點都全量儲存全量的資料

分散式Global JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

基本執行過程:

  1. 一個Clickhouse節點作為Coordinator節點,分發查詢。在每個節點上執行sql(tob_apps_all替換成本地表,右表子查詢替換成別名ut)

  2. Coordinator節點去其他節點拉取users_unique_all的全部資料,然後分發到全部節點(作為1中別名表ut的資料)

  3. 每個節點都會儲存全量的2中分發的資料(記憶體or檔案),進行本地local join

  4. Coordinator節點從每個節點拉取3中的結果集,然後做處理返回給client

存在的問題:

  1. 每個節點都全量儲存資料

  2. 如果右表較大,分發的資料較大,會佔用網路頻寬資源

本地JOIN

SQL裡面只有本地表的JOIN,只會在當前節點執行

SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
    (SELECT device_id,
         hash_uid
    FROM rangers.users_unique
    WHERE tea_app_id = 268411
            AND last_active_date>='2022-08-06') ut
    ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
        AND event='app_launch'
        AND event_date='2022-08-06' 

Hash join

  • 右表全部資料載入到記憶體,再在記憶體構建hash table。key為joinkey

  • 從左表分批讀取資料,從右表hash table匹配資料

  • 優點是:速度快 缺點是:右表資料量大的情況下佔用記憶體

Merge join

  • 對右表排序,內部 block 切分,超出記憶體部分 flush 到磁碟上,記憶體大小通過引數設定

  • 左表基於 block 排序,按照每個 block 依次與右表 merge

  • 優點是:能有效控制記憶體 缺點是:大資料情況下速度會慢

優先使用hash join當記憶體達到一定閾值後再使用merge join,優先滿足效能要求

解決方案

避免JOIN

資料預生成

資料預生成(由Spark/Flink或者Clickhouse物化檢視產出資料),形成大寬表,基於單表的查詢是ClickHouse最為擅長的場景

我們有個指標,實現的SQL比較複雜(如下),每次實時查詢很耗時,我們單獨建了一個表table,由Spark每日構建出這個指標,查詢時直接基於table查詢

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
    (SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple
    FROM tob_apps_all et GLOBAL ANY LEFT JOIN
        (SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time
        FROM users_unique_all
        WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
            ON et.hash_uid=upt.join_key
        WHERE (查詢條件)
        GROUP BY  uc1,event_date)
GROUP BY event_date;

資料量2300W,查詢時間由7秒->0.008秒。當然這種方式,需要維護額外的資料構建任務。總的思路就是不要讓ClickHouse實時去JOIN

使用IN代替JOIN

JOIN需要基於記憶體構建hash table且需要儲存右表全部的資料,然後再去匹配左表的資料。而IN查詢會對右表的全部資料構建hash set,但是不需要匹配左表的資料,且不需要回寫資料到block

比如

SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
    (SELECT hash_uid AS join_key
    FROM users_unique_all
    WHERE app_id = 10000000
            AND last_active_date >= '2022-01-01') upt
    ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
GROUP BY  event_date 

可以改成如下形式:

SELECT event_date,
         count()
FROM tob_apps_all
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
        AND hash_uid global IN 
    (SELECT hash_uid
    FROM users_unique_all
    WHERE (tea_app_id = 10000000)
            AND (last_active_date >= '2022-01-01') )
 GROUP BY event_date

如果需要從右表提取出屬性到外層進行計算,則不能使用IN來代替JOIN

相同的條件下,上面的測試SQL,由JOIN時的16秒優化到了IN查詢時的11秒

更快的JOIN

優先本地JOIN

資料預先相同規則分割槽

也就是Colocate JOIN。優先將需要關聯的表按照相同的規則進行分佈,查詢時就不需要分散式的JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

比如事件表tob_apps_all和使用者表users_unique_all都是按照使用者ID來分shard儲存的,相同的使用者的兩個表的資料都在同一個shard上,因此這兩個表的JOIN就不需要分散式JOIN了

distributed_perfect_shard這個settings key是位元組內部ClickHouse支援的,設定過這個引數,指定執行計劃時就不會再執行分散式JOIN了

基本執行過程:

  1. 一個ClickHouse節點作為Coordinator節點,分發查詢。在每個節點上執行sql(tob_apps_all、users_unique_all替換成本地表)

  2. 每個節點都執行1中分發的本地表join的SQL(這一步不再分發右表全量的資料)

  3. 資料再回傳到coordinator節點,然後返回給client

資料冗餘儲存

如果一個表的資料量比較小,可以不分shard儲存,每個shard都儲存全量的資料,例如我們的業務物件表。查詢時,不需要分散式JOIN,直接在本地進行JOIN即可

SELECT count()
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT item_id
    FROM items_all 
    WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

例如這個SQL,items_all表每個shard都儲存同樣的資料,這樣也可以避免分散式JOIN帶來的查詢放大和全表資料分發問題

更少的資料

不論是分散式JOIN還是本地JOIN,都需要儘量讓少的資料參與JOIN,既能提升查詢速度也能減少資源消耗

SQL下推

ClickHouse對SQL的下推做的不太好,有些複雜的SQL下推會失效。因此,我們手動對SQL做了下推,目前正在測試基於查詢優化器來幫助實現下推優化,以便讓SQL更加簡潔

下推的SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06'
        AND 使用者屬性條件1  OR 使用者屬性條件2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

對應的不下推的SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM rangers.users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
AND (ut.使用者屬性條件1  OR ut.使用者屬性條件2)
settings distributed_perfect_shard=1

可以看到,不下推的SQL更加簡潔,直接基於JOIN過後的寬表進行過濾。但是ClickHouse可能會將不滿足條件的users_unique_all資料也進行JOIN

我們使用中有一個複雜的case,使用者表過濾條件不下推有1千萬+,SQL執行了3000秒依然執行超時,而做了下推之後60秒內就執行成功了

Clickhouse引擎層優化

一個SQL實際在Clickhouse如何執行,對SQL的執行時間和資源消耗至關重要。社群版的Clickhouse在執行模型和SQL優化器上還要改進的空間,尤其是複雜SQL以及多JOIN的場景下

執行模型優化

社群版的Clickhouse目前還是一個兩階段執行的執行模型。第一階段,Coordinator在收到查詢後,將請求傳送給對應的Worker節點。第二階段,Worker節點完成計算,Coordinator在收到各Worker節點的資料後進行匯聚和處理,並將處理後的結果返回。

有以下幾個問題:

  1. 第二階段的計算比較複雜時,Coordinator的節點計算壓力大,容易成為瓶頸

  2. 不支援shuffle join,hash join時右表為大表時構建慢,容易OOM

  3. 對複雜查詢的支援不友好

位元組跳動ClickHouse團隊為了解決上述問題,改進了執行模型,參考其他的分散式資料庫引擎(例如Presto等),將一個複雜的Query按資料交換情況切分成多個 Stage,各Stage之間則通過Exchange完成資料交換。根據Stage依賴關係定義拓撲結構,產生DAG圖,並根據DAG圖排程Stage。例如兩表Join,會先排程左右表讀取Stage,之後再排程Join這個Stage,Join的Stage依賴於左右表的Stage。

舉個例子

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id, 
    dt.hash_did AS device_hashid
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_did
    FROM devices_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10

Stage執行模型基本過程(可能的):

  1. 讀取tob_apps_all資料,按照join key(hash_uid)進行shuffle,資料分發到每個節點。這是一個Stage

  2. 讀取users_unique_all資料,按照join key(hash_uid)進行shuffle,資料分發到每個節點。這是一個Stage

  3. 上述兩個表的資料,在每個節點上的資料進行本地join,然後再按照join key(device_id)進行shuffle。這是一個Stage

  4. 讀取devices_all資料,按照join key(device_id)進行shuffle,這是一個Stage

  5. 第3步、第4步的資料,相同join key(device_id)的資料都在同一個節點上,然後進行本地JOIN,這是一個Stage

  6. 彙總資料,返回limit 10的資料。這是一個Stage

統計效果如下:

查詢優化器

有了上面的stage的執行模型,可以靈活調整SQL的執行順序,位元組跳動Clickhouse團隊自研了查詢優化器,根據優化規則(基於規則和代價預估)對SQL的執行計劃進行轉換,一個執行計劃經過優化規則後會變成另外一個執行計劃,能夠準確的選擇出一條效率最高的執行路徑,然後構建Stage的DAG圖,大幅度降低查詢時間

下圖描述了整個查詢的執行流程,從 SQL parse 到執行期間所有內容全部進行了重新實現(其中紫色模組),構建了一套完整的且規範的查詢優化器。

還是上面的三表JOIN的例子,可能的一個執行過程是:

  1. 查詢優化器發現users_unique_all表與tob_apps_all表的分shard規則一樣(基於使用者ID),所以就不會先對錶按 join key 進行 shuffle,users_unique與tob_apps直接基於本地表JOIN,然後再按照join key(device_id)進行shuffle。這是一個Stage

  2. 查詢優化器根據規則或者代價預估決定裝置表devices_all是需要broadcast join還是shuffle join

    如果broadcast join:在一個節點查到全部的device資料,然後分發到其他節點。這是一個Stage

    如果shuffle join:在每個節點對device資料按照join key(device_id)進行shuffle。這是一個Stage

  3. 彙總資料,返回limit 10的資料。這是一個Stage

效果:

可以看到,查詢優化器能優化典型的複雜的SQL的執行效率,縮短執行時間

總結

ClickHouse最為擅長的領域是一個大寬表來進行查詢,多表JOIN時Clickhouse效能表現不佳。作為業內領先的使用者分析與運營平臺,火山引擎增長分析DataFinder基於海量資料做到了複雜指標能夠秒級查詢。本文介紹了我們是如何優化Clickhouse JOIN查詢的。

主要有以下幾個方面:

  1. 減少參與JOIN的表以及資料量

  2. 優先使用本地JOIN,避免分散式JOIN帶來的效能損耗

  3. 優化本地JOIN,優先使用記憶體進行JOIN

  4. 優化分散式JOIN的執行邏輯,依託於位元組跳動對ClickHouse的深度定製化

立即跳轉火山引擎DataFinder官網瞭解詳情