火山引擎在行為分析場景下的ClickHouse JOIN優化
更多技術交流、求職機會,歡迎關注位元組跳動資料平臺微信公眾號,回覆【1】進入官方交流群
背景
火山引擎增長分析DataFinder基於ClickHouse來進行行為日誌的分析,ClickHouse的主要版本是基於社群版改進開發的位元組內部版本。主要的表結構:
事件表:儲存使用者行為資料,以使用者ID分shard儲存。
--列出了主要的欄位資訊
CREATE TABLE tob_apps_all
(
`tea_app_id` UInt32, --應用ID
`device_id` String DEFAULT '', --裝置ID
`time` UInt64,--事件日誌接受時間
`event` String,--事件名稱
`user_unique_id` String,--使用者ID
`event_date` Date,--事件日誌日期,由time轉換而來
`hash_uid` UInt64 --使用者ID hash過後的id,用來join降低記憶體消耗
)│
使用者表:儲存使用者的屬性資料,以使用者ID分shard儲存。
--列出了主要的欄位資訊
CREATE TABLE users_unique_all
(
`tea_app_id` UInt32, --應用ID
`user_unique_id` String DEFAULT '', -- 使用者ID
`device_id` String DEFAULT '', -- 使用者最近的裝置ID
`hash_uid` UInt64,--使用者ID hash過後的id,用來join降低記憶體消耗
`update_time` UInt64,--最近一次更新時間
`last_active_date` Date --使用者最後活躍日期
)
裝置表:儲存裝置相關的資料,以裝置ID分shard儲存。
--列出了主要的欄位資訊
CREATE TABLE devices_all
(
`tea_app_id` UInt32, --應用ID
`device_id` String DEFAULT '', --裝置ID
`update_time` UInt64, --最近一次更新時間
`last_active_date` Date --使用者最後活躍日期
)
業務物件表:儲存業務物件相關的資料,每個shard儲存全量的資料
--列出了主要的欄位資訊
CREATE TABLE rangers.items_all
(
`tea_app_id` UInt32,
`hash_item_id` Int64,
`item_name` String, --業務物件名稱。比如商品
`item_id` String, --業務物件ID。比如商品id 1000001
`last_active_date` Date
)
業務挑戰
隨著接入應用以及應用的DAU日益增加,ClickHouse表的事件量增長迅速;並且基於行為資料需要分析的業務指標越來越複雜,需要JOIN的表增多;我們遇到有一些涉及到JOIN的複雜SQL執行效率低,記憶體和CPU資源佔用高,導致分析介面響應時延和錯誤率增加。
關於Clickhouse的JOIN
在介紹優化之前,先介紹一下基本的ClickHouse JOIN的型別和實現方式
分散式JOIN
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
基本執行過程:
-
一個Clickhouse節點作為Coordinator節點,給每個節點分發子查詢,子查詢sql(tob_apps_all替換成本地表,users_unique_all保持不變依然是分散式表)
-
每個節點執行Coordinator分發的sql時,發現users_unique_all是分散式表,就會去所有節點上去查詢以下SQL(一共有N*N。N為shard數量)
SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
-
每個節點從其他N-1個節點拉取2中子查詢的全部資料,全量儲存(記憶體or檔案),進行本地JOIN
-
Coordinator節點從每個節點拉取3中的結果集,然後做處理返回給client
存在的問題:
-
子查詢數量放大
-
每個節點都全量儲存全量的資料
分散式Global JOIN
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
基本執行過程:
-
一個Clickhouse節點作為Coordinator節點,分發查詢。在每個節點上執行sql(tob_apps_all替換成本地表,右表子查詢替換成別名ut)
-
Coordinator節點去其他節點拉取users_unique_all的全部資料,然後分發到全部節點(作為1中別名表ut的資料)
-
每個節點都會儲存全量的2中分發的資料(記憶體or檔案),進行本地local join
-
Coordinator節點從每個節點拉取3中的結果集,然後做處理返回給client
存在的問題:
-
每個節點都全量儲存資料
-
如果右表較大,分發的資料較大,會佔用網路頻寬資源
本地JOIN
SQL裡面只有本地表的JOIN,只會在當前節點執行
SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
(SELECT device_id,
hash_uid
FROM rangers.users_unique
WHERE tea_app_id = 268411
AND last_active_date>='2022-08-06') ut
ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
AND event='app_launch'
AND event_date='2022-08-06'
Hash join
-
右表全部資料載入到記憶體,再在記憶體構建hash table。key為joinkey
-
從左表分批讀取資料,從右表hash table匹配資料
-
優點是:速度快 缺點是:右表資料量大的情況下佔用記憶體
Merge join
-
對右表排序,內部 block 切分,超出記憶體部分 flush 到磁碟上,記憶體大小通過引數設定
-
左表基於 block 排序,按照每個 block 依次與右表 merge
-
優點是:能有效控制記憶體 缺點是:大資料情況下速度會慢
優先使用hash join當記憶體達到一定閾值後再使用merge join,優先滿足效能要求
解決方案
避免JOIN
資料預生成
資料預生成(由Spark/Flink或者Clickhouse物化檢視產出資料),形成大寬表,基於單表的查詢是ClickHouse最為擅長的場景
我們有個指標,實現的SQL比較複雜(如下),每次實時查詢很耗時,我們單獨建了一個表table,由Spark每日構建出這個指標,查詢時直接基於table查詢
SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
(SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple
FROM tob_apps_all et GLOBAL ANY LEFT JOIN
(SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time
FROM users_unique_all
WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
ON et.hash_uid=upt.join_key
WHERE (查詢條件)
GROUP BY uc1,event_date)
GROUP BY event_date;
資料量2300W,查詢時間由7秒->0.008秒。當然這種方式,需要維護額外的資料構建任務。總的思路就是不要讓ClickHouse實時去JOIN
使用IN代替JOIN
JOIN需要基於記憶體構建hash table且需要儲存右表全部的資料,然後再去匹配左表的資料。而IN查詢會對右表的全部資料構建hash set,但是不需要匹配左表的資料,且不需要回寫資料到block
比如
SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
(SELECT hash_uid AS join_key
FROM users_unique_all
WHERE app_id = 10000000
AND last_active_date >= '2022-01-01') upt
ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
AND event_date >= '2022-01-01'
AND event_date <= '2022-08-02'
GROUP BY event_date
可以改成如下形式:
SELECT event_date,
count()
FROM tob_apps_all
WHERE app_id = 10000000
AND event_date >= '2022-01-01'
AND event_date <= '2022-08-02'
AND hash_uid global IN
(SELECT hash_uid
FROM users_unique_all
WHERE (tea_app_id = 10000000)
AND (last_active_date >= '2022-01-01') )
GROUP BY event_date
如果需要從右表提取出屬性到外層進行計算,則不能使用IN來代替JOIN
相同的條件下,上面的測試SQL,由JOIN時的16秒優化到了IN查詢時的11秒
更快的JOIN
優先本地JOIN
資料預先相同規則分割槽
也就是Colocate JOIN。優先將需要關聯的表按照相同的規則進行分佈,查詢時就不需要分散式的JOIN
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
比如事件表tob_apps_all和使用者表users_unique_all都是按照使用者ID來分shard儲存的,相同的使用者的兩個表的資料都在同一個shard上,因此這兩個表的JOIN就不需要分散式JOIN了
distributed_perfect_shard這個settings key是位元組內部ClickHouse支援的,設定過這個引數,指定執行計劃時就不會再執行分散式JOIN了
基本執行過程:
-
一個ClickHouse節點作為Coordinator節點,分發查詢。在每個節點上執行sql(tob_apps_all、users_unique_all替換成本地表)
-
每個節點都執行1中分發的本地表join的SQL(這一步不再分發右表全量的資料)
-
資料再回傳到coordinator節點,然後返回給client
資料冗餘儲存
如果一個表的資料量比較小,可以不分shard儲存,每個shard都儲存全量的資料,例如我們的業務物件表。查詢時,不需要分散式JOIN,直接在本地進行JOIN即可
SELECT count()
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT item_id
FROM items_all
WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
例如這個SQL,items_all表每個shard都儲存同樣的資料,這樣也可以避免分散式JOIN帶來的查詢放大和全表資料分發問題
更少的資料
不論是分散式JOIN還是本地JOIN,都需要儘量讓少的資料參與JOIN,既能提升查詢速度也能減少資源消耗
SQL下推
ClickHouse對SQL的下推做的不太好,有些複雜的SQL下推會失效。因此,我們手動對SQL做了下推,目前正在測試基於查詢優化器來幫助實現下推優化,以便讓SQL更加簡潔
下推的SQL:
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411)
AND (last_active_date >= '2022-08-06'
AND 使用者屬性條件1 OR 使用者屬性條件2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
對應的不下推的SQL:
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM rangers.users_unique_all
WHERE (tea_app_id = 268411)
AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
AND (ut.使用者屬性條件1 OR ut.使用者屬性條件2)
settings distributed_perfect_shard=1
可以看到,不下推的SQL更加簡潔,直接基於JOIN過後的寬表進行過濾。但是ClickHouse可能會將不滿足條件的users_unique_all資料也進行JOIN
我們使用中有一個複雜的case,使用者表過濾條件不下推有1千萬+,SQL執行了3000秒依然執行超時,而做了下推之後60秒內就執行成功了
Clickhouse引擎層優化
一個SQL實際在Clickhouse如何執行,對SQL的執行時間和資源消耗至關重要。社群版的Clickhouse在執行模型和SQL優化器上還要改進的空間,尤其是複雜SQL以及多JOIN的場景下
執行模型優化
社群版的Clickhouse目前還是一個兩階段執行的執行模型。第一階段,Coordinator在收到查詢後,將請求傳送給對應的Worker節點。第二階段,Worker節點完成計算,Coordinator在收到各Worker節點的資料後進行匯聚和處理,並將處理後的結果返回。
有以下幾個問題:
-
第二階段的計算比較複雜時,Coordinator的節點計算壓力大,容易成為瓶頸
-
不支援shuffle join,hash join時右表為大表時構建慢,容易OOM
-
對複雜查詢的支援不友好
位元組跳動ClickHouse團隊為了解決上述問題,改進了執行模型,參考其他的分散式資料庫引擎(例如Presto等),將一個複雜的Query按資料交換情況切分成多個 Stage,各Stage之間則通過Exchange完成資料交換。根據Stage依賴關係定義拓撲結構,產生DAG圖,並根據DAG圖排程Stage。例如兩表Join,會先排程左右表讀取Stage,之後再排程Join這個Stage,Join的Stage依賴於左右表的Stage。
舉個例子
SELECT
et.os_name,
ut.device_id AS user_device_id,
dt.hash_did AS device_hashid
FROM tob_apps_all AS et
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_did
FROM devices_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10
Stage執行模型基本過程(可能的):
-
讀取tob_apps_all資料,按照join key(hash_uid)進行shuffle,資料分發到每個節點。這是一個Stage
-
讀取users_unique_all資料,按照join key(hash_uid)進行shuffle,資料分發到每個節點。這是一個Stage
-
上述兩個表的資料,在每個節點上的資料進行本地join,然後再按照join key(device_id)進行shuffle。這是一個Stage
-
讀取devices_all資料,按照join key(device_id)進行shuffle,這是一個Stage
-
第3步、第4步的資料,相同join key(device_id)的資料都在同一個節點上,然後進行本地JOIN,這是一個Stage
-
彙總資料,返回limit 10的資料。這是一個Stage
統計效果如下:
查詢優化器
有了上面的stage的執行模型,可以靈活調整SQL的執行順序,位元組跳動Clickhouse團隊自研了查詢優化器,根據優化規則(基於規則和代價預估)對SQL的執行計劃進行轉換,一個執行計劃經過優化規則後會變成另外一個執行計劃,能夠準確的選擇出一條效率最高的執行路徑,然後構建Stage的DAG圖,大幅度降低查詢時間
下圖描述了整個查詢的執行流程,從 SQL parse 到執行期間所有內容全部進行了重新實現(其中紫色模組),構建了一套完整的且規範的查詢優化器。
還是上面的三表JOIN的例子,可能的一個執行過程是:
-
查詢優化器發現users_unique_all表與tob_apps_all表的分shard規則一樣(基於使用者ID),所以就不會先對錶按 join key 進行 shuffle,users_unique與tob_apps直接基於本地表JOIN,然後再按照join key(device_id)進行shuffle。這是一個Stage
-
查詢優化器根據規則或者代價預估決定裝置表devices_all是需要broadcast join還是shuffle join
如果broadcast join:在一個節點查到全部的device資料,然後分發到其他節點。這是一個Stage
如果shuffle join:在每個節點對device資料按照join key(device_id)進行shuffle。這是一個Stage
-
彙總資料,返回limit 10的資料。這是一個Stage
效果:
可以看到,查詢優化器能優化典型的複雜的SQL的執行效率,縮短執行時間
總結
ClickHouse最為擅長的領域是一個大寬表來進行查詢,多表JOIN時Clickhouse效能表現不佳。作為業內領先的使用者分析與運營平臺,火山引擎增長分析DataFinder基於海量資料做到了複雜指標能夠秒級查詢。本文介紹了我們是如何優化Clickhouse JOIN查詢的。
主要有以下幾個方面:
-
減少參與JOIN的表以及資料量
-
優先使用本地JOIN,避免分散式JOIN帶來的效能損耗
-
優化本地JOIN,優先使用記憶體進行JOIN
-
優化分散式JOIN的執行邏輯,依託於位元組跳動對ClickHouse的深度定製化
立即跳轉火山引擎DataFinder官網瞭解詳情
- 從銀行數字化轉型來聊一聊,火山引擎 VeDI 旗下 ByteHouse 的應用場景
- ByteHouse 實時匯入技術演進
- 關於 DataLeap 中的 Notebook,你想知道的都在這
- 火山引擎DataLeap:3個關鍵步驟,複製位元組跳動一站式資料治理經驗
- 如何又快又好實現 Catalog 系統搜尋能力?火山引擎 DataLeap 這樣做
- 什麼是A/B實驗,為什麼要開A/B實驗?
- 計算效能提升百倍 火山引擎數智平臺這款產品助力企業員工更好看數用數
- 火山引擎DataLeap資料排程例項的 DAG 優化方案
- 如何快速構建企業級資料湖倉?
- 位元組跳動基於Doris的湖倉分析探索實踐
- 火山引擎在行為分析場景下的ClickHouse JOIN優化
- 位元組跳動資料血緣圖譜升級方案設計與實現
- 提速 10 倍!深度解讀位元組跳動新型雲原生 Spark History Server
- 位元組跳動基於 ClickHouse 優化實踐之“查詢優化器”
- 位元組跳動基於ClickHouse優化實踐之“多表關聯查詢”
- “今日頭條”名字是 AB 測試定的?位元組跳動用九年時間打造出了怎樣的資料平臺
- 位元組跳動基於ClickHouse優化實踐之Upsert
- 位元組跳動資料質量動態探查及相關前端實現
- 位元組跳動資料平臺技術揭祕:基於 ClickHouse 的複雜查詢實現與優化
- 位元組跳動資料平臺技術揭祕:基於ClickHouse的複雜查詢實現與優化