揭祕位元組跳動解決ClickHouse複雜查詢問題的技術實踐

語言: CN / TW / HK

分享嘉賓 :董一峰 位元組跳動

編輯整理:胡勝達 蔚來汽車

出品平臺:DataFunTalk

導讀: ClickHouse已經成為行業主流且熱門的開源引擎。 隨著業務資料量擴大,場景覆蓋變廣泛,在複雜query場景下,ClickHouse容易存在查詢異常問題,影響業務正常推進。 本次主要分享位元組跳動如何解決ClickHouse複雜查詢問題,並詳細解讀技術實現細節,目前該能力已經通過火山引擎ByteHouse面向開發者輸出。

全文將圍繞以下幾方面展開:

  • 專案背景

  • 技術方案

  • 優化與診斷

  • 效果及展望

01

專案背景

1. ClickHouse執行模式

ClickHouse 的執行模式相對比較簡單,和Druid、ES 類似,其基本查詢模式分為兩個階段:

第一階段 ,Coordinator 收到查詢後將請求傳送給對應的 worker 節點;

第二階段 ,Coordinator 收到各個 worker 節點的結果後匯聚起來處理後返回。

以下面的SQL為例:

Select name from student_distribute where id = 5

①當 Coordinator 收到請求後,由於student_distribute是一個分散式表,因此需要將SQL 改寫為對local表查詢,並轉發請求給每一個shard的worker;

②Worker收到請求後查詢本地的local表資料,返回結果給coordinator;

③Coordinator彙總每一個shard的資料並把結果返回給client。

Select name from student_local where id = 5

第二階段執行的模式 能夠高效地支援很多常見場景,比如常見的針對大寬表的各類查詢,但是隨著業務場景的複雜化, 也存在以下三點問題:

其一, 第一階段返回的 資料 比較多且第二階段的計算比較複雜時,對於Coordinator的壓力會比較大, 容易成為query的瓶頸,且shard越多可能計算越慢,瓶頸越大 。例如一些重計算的agg運算元count distinct。如果我們使用hash表去重時,第二階段需要在coordinator單機上merge各個worker的hash表,計算量很重且不能並行;又比如說group by基數比較大或者window計算。

其二,join是SQL的重要場景。由於不支援Shuffle操作, 對於Join來說右表必須是全量資料 。無論是普通Join還是Global Join,當Join的右表比較大時都放到記憶體裡容易OOM,而Spill到磁碟雖然解決記憶體問題,可能會因為有磁碟 io和序列化計算的開銷影響效能。特別是當Join為最常見的Hash Join 時,右表如果是大表構建也比較慢。雖然社群最近也做了一些右表構建的優化,通過單機按照 join key split 來達到並行構建hash table。但是額外的代價是左右表都增加了一次 split 操作。

其三, 對於複雜查詢 (如多表 Join、巢狀多個子查詢、window function等) 的支援並不友好 ,由於不能通過shuffle來分散資料,生成的pipeline在一些case下不能充分並行,難以充分發揮叢集的全部資源。

2. 其他MMP資料庫

目前主流的MPP資料庫基本都 支援Stage執行的方式 。以Presto為例,如下圖所示,一個兩表join的agg sql可拆分為5個 Stage。

其中 Stage3、Stage4分別對應左右表資料讀取,Stage2完成兩表Join和partial agg 計算,Stage1完成final agg計算,Stage0收集Stage1的資料後彙總和輸出。在這個過程中,Stage 3、4、2、1可以在多個節點上並行執行,單個複雜的query被拆分成若干Stage,從而實現了Stage之間,不同worker的資料傳輸。

3. 業務背景和目標

隨著業務複雜程度提高,業務並不希望所有的資料都通過etl 產生大寬表;複雜查詢(特別是多輪分散式 Join和比較多的agg)的需求越來越強烈,而整體的資料量又在不斷增長。在叢集資源有限的情況下,我們希望能夠充分利用機器資源,基於ClickHouse 高效地支援複雜查詢。

ByteHouse是位元組跳動研發同學基於開源ClickHouse 進行了深度優化和改造的版本,提供海量資料上更強的查詢服務和資料寫入效能,支援多種應用場景。如圖所示,ByteHouse在內部多個場景如行為分析、畫像分析、智慧營銷分析、APP 日誌分析上得到充分的驗證和使用,並在多個方面進行了增強,具備特有的能力。

02

技術方案

1. 設計思想

基於 ClickHouse 的複雜查詢的實現採用分Stage的方式,替換目前 ClickHouse的兩階段執行方式。類似其他分散式資料庫引擎(如 Presto、Impala 等),將一個複雜的Query按照資料交換情況切分成多個Stage,Stage和Stage之間通過 exchange完成資料的交換,單個Stage內不存在資料交換。 Stage間的資料交換主要有以下三種形式:

①按照單(多)個 key 進行 Shuffle(shuffle)

②由1個或者多個節點匯聚到一個節點(我們稱為 gather)

③同一份資料複製到多個節點(也稱為 broadcast 或者說廣播)

按照不同的功能切分不同的模組, 設計目標如下:

①各個模組約定好介面,儘量減少彼此的依賴和耦合。一旦某個模組有變動不會影響別的模組,例如Stage生成邏輯的調整不影響排程的邏輯。

②模組採用外掛的架構,允許模組根據配置靈活支援不同的策略。

2. 相關術語

  • ExchangeNode 在語法樹中表示資料交換的節點

  • PlanSegment 單個 Stage 對應的執行的計劃片段

  • ExchangeManager 管理資料的 exchange,負責不同 Stage 節點之間的資料交換

  • SegmentScheduler 計劃片段排程器,負責下發計劃片段給 worker,由 Coordinator 節點呼叫

  • InterpreterPlanSegment 計劃片段執行器,執行一個具體的計劃片段

3. 執行流程

①Coordinator 接受複雜查詢後,在目前 ClickHouse 語法樹的基礎上,根據節點型別和資料分佈情況插入 Exchange 節點並生成分散式 Plan。

②Coordinator 根據 Exchange Node 型別,切分分散式 Plan 生成每個 Stage 的執行片段 PlanSegment。

③Coordinator 呼叫 SegmentScheduler 將各階段的 PlanSegment 傳送到 Worker 節點。

④Worker 節點接受 PlanSegment 通過 InterpreterPlanSegment 完成資料的讀取和執行,通過 ExchangeManager 完成資料的互動。

⑤Coordinator 從最後一輪 Stage 對應節點的 ExchangeManager 讀取資料後處理後返回給 client。

4. Plan切分

下面是一個Plan切分的例子,這是1個2表Join的查詢場景,根據Exchange資訊,將整個分散式 Plan切分成4個Stage。

5. 查詢片段排程器(SegmentScheduler)

查詢片段排程器SegmentScheduler 根據上下游依賴關係和資料分佈,以及 Stage 並行度和worker 分佈和狀態資訊,按照一定的排程策略,將 PlanSemgent 發給不同的 Worker 節點。

目前支援的2種策略是:

①依賴排程 :根據 Stage 依賴關係定義拓撲結構,產生 DAG 圖,根據 DAG 圖排程 stage,類似於拓撲排序,等到依賴的 Stage 啟動後再啟動新的 Stage。例如剛才的兩表 join,會先排程左右表讀取 stage,再排程 join stage。

②AllAtOnce :類似於Presto的AllAtOnce策略,會先計算每一個 Stage 的相關資訊,一次性排程所有的Stage。

相比而言,這兩種策略是在容錯、資源使用和延時上做取捨。

第一種排程策略可以實現更好的容錯 ,由於 ClickHouse 可以有多個副本,當前一個 Stage 部分節點連線失敗時可以嘗試切換到副本節點,對後續依賴 stage 無感知。這裡指的是讀資料的 Stage,我們稱為 Source Stage,非 Source Stage 因為沒有資料依賴,容錯能力會更強,只要保證並行度的節點數即可,甚至極端情況下可以降低 stage 並行度來支援更好的容錯。缺點是排程有依賴,不能完全並行,會增加排程時長,對於一些資料量和計算量小,但是 stage 多的節點排程延時可能會佔 SQL 整體時間不小的比例。我們也做了一些針對性的優化,對於無依賴關係的儘可能支援並行。

第二種排程策略通過並行可以極大降低排程延時 ,為防止大量網路 io 執行緒,我們通過非同步化並且控制執行緒數目;這種策略的缺點是容錯性沒有依賴排程好,因為每一個 stage 的 worker 在排程前就已經確定,如果有一個 worker 出現連線異常則整個查詢會直接失敗。並且可能有一些 Stage 上游資料還沒有 Ready 就被排程執行了,需要長時間等資料。例如 final agg stage,需要等 partial agg 完成後才能拿到資料。雖然我們做了一些優化,並不會長時間空跑浪費 cpu 資源,但是畢竟也消耗了一部分資源,比如建立了執行的執行緒。

6. 查詢片段執行器(InterpreterPlanSegment)

下面介紹下計劃片段是如何執行的,原本 ClickHouse的查詢和節點執行主要是 SQL 形式,切分Stag後需要支援執行一個單獨的PlanSemgent。因此 InterpreterPlanSegment 的主要功能就是接受一個序列化後的 PlanSemgent,能夠在 Worker 節點上執行整個 PlanSemgent 的邏輯。 主要的步驟為:

①根據 input 資訊讀取資料,如果 input 是具體的 table,則從本地讀取資料;如果 input 是一個 exchange input,則從對應的 ExchangeManager 讀取資料;

②執行 PlanSemgent 的邏輯;

③輸出處理後的結果資料,如果是 Coordinator 節點,就將資料發給 Client;如果是非Coordinator 節點,就按照資料的exchange方式寫給本例項對應的 ExchangeManager。

Interpreter部分我們儘量複用當前ClickHouse的執行邏輯,例如processor 執行方式,process list管理等等。相比於InterpreterSelect邏輯要更簡單一些,可以認為1 個Stage只有1個階段。當然我們也做了很多功能和效能的增強,例如我們支援1個 stage處理多個join等,這樣可以減少stage數目和不必要的資料傳輸,在一張大表(通常情況下是事實表) join 多個維度表的場景有比較好的幫助。

InterpreterPlan Segment執行完會向coordinator上報對應的狀態資訊。執行異常的時候會將異常資訊報告給查詢片段排程器,取消Query其他worker的執行。

7. 資料交換(ExchangeManager)

ExchangeManager 是PlanSegment資料交換的媒介,更是平衡資料上下游處理能力的重要元件。整體上採用 push 的方式,當上遊資料 ready 時主動推送給下游,並支援反壓。 其架構如下圖所示:

具體的流程如下:

下游 PlanSegment 執行時,當input為exchange input時,根據一定的 token 規則 (通常由 query_id+segment_id+index_id 等組成)和資料 source 資訊,向上遊 ExchangeManager 註冊對應的資料請求;

上游 ExchangeManager 收到請求後,建立上下游資料通道,並將上游的資料推送到下游,如果通道一直建立不了會 block 上游的執行。

在這個過程中,上下游都會通過佇列來優化傳送和讀取,當佇列飽和的時候通過反壓的機制控制上游的執行速度。由於採用了 push 和佇列,這裡我們要考慮一個特殊的場景,在某些 case 下下游的 Stage 並不需要讀取全部的上游資料,一個典型的場景是 limit。例如 limit 100,下游 stage 是需要讀取 100 條資料即可,而上游可能會輸出更大規模的資料,因此在這種情況下,當下遊 stage 讀到足夠的資料後,需要能主動取消上游資料的執行並清空佇列。這是一個特定場景的優化,能夠大大加速查詢時間。

ExchangeManager 需要考慮和優化的點還有:

細粒度的記憶體控制,能夠按照例項、query、segment 多層次進行記憶體控制,避免 OOM,更長期的考慮是支援 spill 到磁碟上,降低對記憶體的使用。為了提升傳輸效率,小資料需要進行 merge,大資料要 split。同時,網路處理在某些場景要保證有序性,比如 sort 時,partial sort 和 merge sort 的網路傳輸必須有序,否則資料可能是有問題的。

②連線複用和網路優化,包括針對上下游在同一個節的場景下選擇走記憶體的交換不走網路,可以減少網路的開銷和減少資料序列化、反序列化的代價。另外,由於 ClickHouse 在計算方面做了非常充足的優化,有些場景下甚至記憶體頻寬成為瓶頸,我們在ExchangeManager的一些場景上也應用zero copy等技術來減少記憶體的拷貝。

③異常處理和監控,相比於單機執行,分散式情況下異常情況更復雜且不好感知。通過重試能避免一些節點的暫時高負載或者異常,以及出問題時能夠快速感知、排查和做針對性解決和優化。這裡的工程實踐更多一些。

03

優化與診斷

1. Join 多種實現

根據資料的規模和分佈,我們支援了多種Join實現,目前已經支援的有:

①Shuffle Join,最通用的 Join;

②Broadcast Join,針對大表Join小表的場景,通過把右表廣播到左表的所有 worker 節點來減少左表的傳輸;

③Colocate Join,針對左右表根據Join key保持相通分佈的場景,減少左右表資料傳輸。

2. 網路連線優化

網路連線的優化的核心本質就是減少連線的使用 。特別是資料需要Shuffle 的時候,下一輪 Stage的每一個節點需要從上一輪Stage的每一個節點拉取資料。當一個叢集的節點比較多的時候,如果存在比較多的複雜 Query(Stage多,並行度(節點數)比較大),叢集的Worker節點會建立非常多的連線,如下圖所示,單節點建立的連線數與叢集節點數、併發stage數成正比。

位元組內部的clickhouse叢集規模非常大,最大的叢集(單叢集幾千臺規模)在目前 ClickHouse 的執行模式下單機最大可能會建立上幾萬個網路連線。因此如果支援複雜 Query 執行,由於stage變多了,需要優化網路連線,特別是支援連線複用。我們通過儘可能複用連線,在不同節點之間只會建立固定數目的連線,不同的查詢會複用這些連線,不隨 query 和 stage 的規模而增長。

3. 網路傳輸優化

在資料中心領域,遠端直接記憶體訪問(RDMA)是一種繞過遠端主機作業系統核心訪問其記憶體中資料的技術,由於不經過作業系統,不僅節省了大量CPU資源,同樣也提高了系統吞吐量、降低了系統的網路通訊延遲,尤其適合在大規模平行計算機叢集中有廣泛應用。

由於ClickHouse在計算層面做了很多優化,而網路頻寬相比於記憶體頻寬要小不少,在一些資料量傳輸特別大的場景,網路傳輸會成為一定的瓶頸。為了提升網路傳輸的效率和提升資料exchange的吞吐,一方面我們引入壓縮來降低傳輸資料量,另一方面我們引入 RDMA 來減少一定的開銷。經過測試,在一些資料傳輸量大的場景,有不小的收益。

4. Runtime Filter

Join運算元通常是OLAP引擎中最耗時的運算元。如果想優化 Join 運算元,可以有兩種思路,一方面可以提升Join運算元的效能,例如更好的Hash Table實現和Hash演算法,以及更好的並行。另一方面可以儘可能減少參與Join計算的資料。

Runtime Filter在一些場景,特別是事實表join維度表的星型模型場景下會有比較大的效果。因為這種情況下通常事實表的規模比較大,而大部分過濾條件都在維度表上,事實表可能要全量join維度表。Runtime Filter的作用是通過在 Join 的 probe 端(就是左表)提前過濾掉那些不會命中Join的輸入資料來大幅減少 Join 中的資料傳輸和計算,從而減少整體的執行時間。以下圖為例:

左表並沒有直接過濾條件,右錶帶有過濾條件item.proce > 1000。當完成右表查詢時,可以確定item.id 的範圍和集合,根據join型別inner join和join條件sales.item_id=item.id可以推斷出sales.item的範圍和集合。我們可以把sales.item 的範圍和集合作為一個過濾條件,在join前過濾sales的資料。

我們在複雜查詢上支援了Runtime Filter,目前主要支援minmax和bloomfilter。

總體執行流程如下:

①build plan segment worker(right table)會將生成的單節點 runtime filter 傳送到coordinator節點;

②coordinator 在等待各個 worker的 runtime filter 都發送完成之後進行一次merge操作,將合併好的 runtime filter 分發到各個 execute plan segment worker(left table)節點中去;

③在 runtime filter 構造期間,execute plan segment(left table) 需要等待一定的時間,在超時之前如果runtime filter已經下發,則通過 runtime filter 執行過濾。

這裡需要思考一個問題, Runtime filter column 是否構建索引(主鍵、skip index等)和命中prewhere? 如果runtime filter的列(join column)構建了索引是需要重新生成 pipeline 的。因為命中索引後,可能會減少資料的讀取,pipeline並行度和對應資料的處理range都可能發生變化。如果runtime filter的列跟索引無關,可以在計劃生成的時候預先帶上過濾條件,只不過一開始作為佔位是空的,runtime filter下發的時候把佔位資訊改成真正的過濾條件即可。這樣即使runtime filter 下發超時了,查詢片段已經開始執行了,只要查詢片段沒有執行完,之後的資料仍然可以進行過濾。

需要注意的是,runtime filter 是一種特殊場景下的優化,其針對的場景是右表資料量不大,且構建的 runtime filter 對左表有比較強的過濾效果。如果右表資料量比較大,構建runtime filter比較慢,或者對左表的資料過濾效果很差甚至沒有,那麼 runtime filter 反而會增加查詢的耗時。因此,要根據資料的特徵和規模來決定是否開啟。

5. 診斷和分析

引入複雜查詢的多Stage 執行模型後,SQL的執行模式變得複雜了。特別是當用戶查詢一些非常複雜的查詢,幾百行的sql生成的stage會非常多,把stage都看一遍並理解sql的含義要花比較長的時間。題外話:我們很早之前就完整的跑通了所有的tpcds query,這裡面就有一些sql可能會產生幾十個 stage。那麼在這種情況下, 如何定位 SQL 的瓶頸並加以優化是一個難題。

我們做了如下兩點優化:

首先,最常見的做法是 增加各類完善的metrics ,包括整個Query的執行時間和不同Stage的執行時間、IO資料量、運算元處理資料和執行情況、運算元 metrics 和profile event等。

其次,我們 記錄了反壓資訊和上下游佇列長度,以此來推斷 stage 執行情況和瓶頸

坦率地說,SQL 場景包括永珍,很多非常複雜的場景目前還是需要對引擎比較熟悉的同學才能診斷和分析SQL才能給出優化建議。在不斷積累經驗的過程中,我們希望通過能夠不斷完善 metrics 和分析路徑,不斷減輕oncall的負擔,並且在某些場景下可以更智慧的給出優化提示,這對於使用同學來說也是有好處的。

04

效果及展望

1. 複雜查詢效果

根據上面的執行模型的三個缺點,分別測試如下三個場景:

①第二階段的計算比較複雜

②Hash Join 右表為大表

③多表 Join

以SSB 1T資料作為資料集,叢集包含8個節點。

2. 第二階段的計算比較複雜

這個case SQL 如下圖所示

uniqExact是count distinct的預設演算法,採用hash table進行資料去重。使用複雜查詢後,query 執行時間從 8.514s=>2.198s,第二階段 agg uniqExact 運算元的合併原本由 coordinator單點合併,現在通過按照group by key shuffle 後可以由多個節點並行完成。因此通過shuffle減輕了coordinator的merge agg 壓力。

3. Hash Join 右表為大表

這個 case 演示了右表是一個大表的場景,由於 ClickHouse 對多表的優化做的還不是很到位。這裡採用子查詢來下推過濾的條件。

在這個case中,採用複雜查詢模式後,query 執行時間從17.210=>1.749s。 lineorder 是一張大表,通過shuffle可以將大表資料按照join key shuffle到每個worker節點,減少了右表構建的壓力。

4. 多表 Join

這個 case 是一個 5 表 join 的 case。

開啟複雜查詢模式後,query 執行時間從8.583s=>4.464s,所有的右表可同時開始資料讀取和構建 。為了和現有模式對比,針對複雜查詢沒有開啟 runtime filter,開啟 runtime filter後效果會更快。

這裡還要重點說一下,今天的分享主要是從執行模式上講解如何支援複雜查詢。實際上, 優化器對於複雜查詢的效能提升也非常大 。通過一些rbo的規則,比如常見的謂詞下推、相關子查詢處理等。實際上這裡的優化規則非常多,可以極大的提升 SQL 的執行效率。上面的 SQL 其實原本比較簡單,5 表 join 和一些維表的過濾條件,這裡寫成子查詢是為了在 ClickHouse 現有模式下右表過濾條件更好下推。其實對於我們來說,在複雜查詢的模式下,由於有優化器的存在,使用者不用寫的這麼複雜,優化器會自動完成下推和rbo優化。

上面是一些規則的優化,實際上在複雜查詢中, cbo 的優化也有很大作用 。舉一個例子,在 ClickHouse 中,相同的兩個表,大表 join 小表的效能比小表 join 大表要好很多。前一個效果 2 中如果把表順序調整一下會快很多;另外,選用哪一種 join 的實現對 join 效能影響比較大,如果滿足 join key 分佈,colcate join 比 shuffle join 來說完全減少了資料的 shuffle。多表 join 中,join 的順序和 join 的實現方式對執行的時長影響會比 2 表 join 影響更大。藉助資料的統計資訊,通過一些 cbo 優化,可以得到一個比較優的執行模式。

有了優化器,業務同學可以按照業務邏輯來寫任何的 SQL,引擎自動計算出相對最優的 SQL 計劃並執行,加速查詢的執行。

5. 展望

CLickHouse 目前的模式其實在很多單表查詢的場景上表現優異。我們主要是針對複雜的查詢場景做優化,主要是實現多stage的執行模式,並實現了stage之間資料傳輸。工程實踐上來說,做了比較多的嘗試和優化來提升執行和網路傳輸的效能,並且希望通過完善metrics和智慧診斷來降低SQL分析和調優的門檻,並減少oncall 的壓力。

目前的實現只是第一步,未來我們還有很多努力的方向。

首先,肯定是繼續提升執行和 Exchange 的效能。這裡不談論引擎執行通用的優化,比如更好的索引或者運算元的優化,主要是跟複雜查詢模式有關。

其次是Metrics 和智慧診斷加強,就如同剛才提到的,SQL 的靈活度太高了,對於一些複雜的查詢沒有 metrics 幾乎難以診斷和調優,這個我們會長期持續的去做。

歡迎大家使用 ByteHouse 產品,可以掃碼或者搜尋ByteHouse進入官網點選立刻試用來免費試用,複雜查詢的能力也已經輸出到ByteHouse中。

歡迎關注二維碼,進入位元組跳動資料平臺官方交流群,群內可以獲取技術乾貨、精彩直播活動,同時針對本場直播問題,也可以在群裡與我交流。除此之外,我們團隊在大量招募研發工程師,歡迎關注位元組跳動資料平臺公眾號,獲取相關資訊。

今天的分享就到這裡,謝謝大家。

在文末分享、點贊、在看,給個3連擊唄~

01 / 分享嘉賓

董一峰

位元組跳動 火山引擎ByteHouse資深研發工程師

位元組跳動資料平臺資深研發工程師,2016 年加入位元組跳動 OLAP 團隊,一直從事大資料查詢引擎的開發和推廣工作,先後負責 Hive、Spark、Druid、ClickHouse 等大資料引擎,目前主要聚焦於 ClickHouse 執行層相關的研發。

02 / 免費下載資料

03 / 報名看直播 免費領PPT

04 / 關於我們

DataFun: 專注於大資料、人工智慧技術應用的分享與交流。發起於2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公眾號 DataFunTalk 累計生產原創文章700+,百萬+閱讀,14萬+精準粉絲

  分享、點贊、在看 ,給個 3連擊 :point_down: