ByteHouse 實時匯入技術演進

語言: CN / TW / HK

更多技術交流、求職機會,歡迎關注位元組跳動資料平臺微信公眾號,回覆【1】進入官方交流群

ByteHouse 是火山引擎上的一款雲原生資料倉庫,為使用者帶來極速分析體驗,能夠支撐實時資料分析和海量離線資料分析;便捷的彈性擴縮容能力,極致的分析效能和豐富的企業級特性,助力客戶數字化轉型。

本文將從需求動機、技術實現及實際應用等角度,介紹基於不同架構的 ByteHouse 實時匯入技術演進。

內部業務的實時匯入需求

ByteHouse 實時匯入技術的演進動機,起初於位元組跳動內部業務的需求。

在位元組內部,ByteHouse 主要還是以 Kafka 為實時匯入的主要資料來源(本文都以 Kafka 匯入為例展開描述,下文不再贅述)。對於大部分內部使用者而言,其資料體量偏大;所以使用者更看重資料匯入的效能、服務的穩定性以及匯入能力的可擴充套件性。而對於資料延時性,大多數使用者只要是秒級可見就能滿足其需求。基於這樣的場景,ByteHouse 進行了定製性的優化。

分散式架構下的高可用

社群原生分散式架構

ByteHouse 首先沿用了 Clickhouse 社群的分散式架構,但分散式架構有一些天然性架構層面的缺陷,這些痛點主要表現在三個方面:

  • 節點故障:當叢集機器數量到達一定規模以後,基本每週都需要人工處理節點故障。對於單副本叢集在某些極端 case 下,節點故障甚至會導致資料丟失。

  • 讀寫衝突:由於分散式架構的讀寫耦合,當叢集負載達到一定程度以後,使用者查詢和實時匯入就會出現資源衝突——尤其是 CPU 和 IO,匯入就會受到影響,出現消費 lag。

  • 擴容成本:由於分散式架構資料基本都是本地儲存,在擴容以後,資料無法做 Reshuffle,新擴容的機器幾乎沒有資料,而舊的機器上磁碟可能已經快寫滿,造成叢集負載不均的狀態,導致擴容並不能起到有效的效果。

這些是分散式架構天然的痛點,但是由於其天然的併發特性,以及本地磁碟資料讀寫的極致效能優化,可以說有利有弊。

社群實時匯入設計

  • High-Level 消費模式:依託 Kafka 自身的 rebalance 機制做消費負載均衡。

  • 兩級併發

基於分散式架構的實時匯入核心設計其實就是兩級併發:

一個 CH 叢集通常有多個 Shard,每個 Shard 都會併發做消費匯入,這就是第一級 Shard 間的多程序併發;

每個 Shard 內部還可以使用多個執行緒併發消費,從而達到很高的效能吞吐。

  • 攢批寫入

就單個執行緒來說,基本消費模式是攢批寫入——消費一定的資料量,或者一定時間之後,再一次性寫入。攢批寫入可以更好地實現效能優化,查詢效能提升,並降低後臺 Merge 執行緒的壓力。

無法滿足的需求

上述社群的設計與實現,還是無法滿足使用者的一些高階需求:

  • 首先部分高階使用者對資料的分佈有著比較嚴格的要求,比如他們對於一些特定的資料有特定的 Key,希望相同 key 的資料落盤到同一個 Shard(比如唯一鍵需求)。這種情況下,社群 High Level 的消費模式是無法滿足的。

  • 其次是 High level 的消費形式 rebalance 不可控,可能最終會導致 Clickhouse 叢集中匯入的資料在各個 Shard 之間分配不均。

  • 當然,消費任務的分配不可知,在一些消費異常情景下,想要排查問題也變得非常困難;對於一個企業級應用,這是難以接受的。

自研分散式架構消費引擎 HaKafka

為了解決上述需求,ByteHouse 團隊基於分散式架構自研了一種消費引擎——HaKafka。

高可用(Ha)

HaKafka 繼承了社群原有 Kafka 表引擎的消費優點,再重點做了高可用的 Ha 優化。

就分散式架構來談,其實每個 Shard 內可能都會有多個副本,在每個副本上都可以做 HaKafka 表的建立。但是 ByteHouse 只會通過 ZK 選一個 Leader,讓 Leader 來真正地執行消費流程,其他節點位於 Stand by 狀態。當 Leader 節點不可用了,ZK 可以在秒級將 Leader 切到 Stand by 節點繼續消費,從而實現一種高可用。

Low—Level 消費模式

HaKafka 的消費模式從 High Level 調整到了 Low Level 模式。Low Level 模式可以保證 Topic Partition 有序和均勻地分配到叢集內各個 shard;與此同時,Shard 內部可以再一次用多執行緒,讓每個執行緒來消費不同 Partition。從而完全繼承了社群 Kafka 表引擎兩級併發的優點。

在 Low-Level 消費模式下,上游使用者只要在寫入 Topic 的時候,保證沒有資料傾斜,那麼通過 HaKafka 匯入到 Clickhouse 裡的資料肯定也是均勻分佈在各個 shard 的。

同時,對於有特殊資料分佈需求——將相同 Key 的資料寫到相同 Shard——的高階使用者,只要在上游保證相同 Key 的資料寫入相同 Partition,那麼匯入 ByteHouse 也就能完全滿足使用者需求,很好地支援唯一鍵等場景。

 

場景一:

基於上圖可見,假設有一個雙副本的 Shard,每個副本都會有一張相同的 HaKafka 表處於 Ready 的狀態。但是隻有通過 ZK 選主成功的 leader 節點上,HaKafka 才會執行對應的消費流程。當這個 leader 節點宕機以後, 副本 Replica 2 會自動再被選為一個新的 Leader,繼續消費,從而保證高可用。

場景二:

在節點故障場景下,一般需要執行替換節點流程。對於分散式節點替換有一個很繁重的操作——拷貝資料。

如果是一個多副本的叢集,一個副本故障,另一個副本是完好的。我們很自然希望在節點替換階段,Kafka 消費放在完好的副本 Replica 2 上,因為其上舊資料是完備的。這樣 Replica 2 就始終是一個完備的資料集,可以正常對外提供服務。這一點 HaKafka 是可以保證的。HaKafka 選主的時候,如果確定有某一個節點在替換節點流程當中,會避免將其選為 Leader。

匯入效能優化:Memory Table

HaKafka 還做到了 Memory Table 的優化。

考慮這樣一個場景:業務有一個大寬表,可能有上百列的欄位 或者上千的 Map-Key。由於 ClickHouse 每一個列都會對應落盤為一個具體的檔案,列越多,每次匯入寫的檔案也就越多。那麼,相同消費時間內,就會頻繁地寫很多的碎檔案,對於機器的 IO 是很沉重的負擔,同時給 MERGE 帶來很大壓力;嚴重時甚至導致叢集不可用。為了解決這種場景,我們設計了 Memory Table 實現匯入效能優化。

Memory Table 的做法就是每一次匯入資料不直接刷盤,而是存在記憶體中;當資料達到一定量以後,再集中刷盤,減少 IO 操作。Memory Table 可以提供對外查詢服務的,查詢會路由到消費節點所在的副本去讀 memory table 裡邊的資料,這樣保證了不影響資料匯入的延時性。從內部使用經驗來看,Memory Table 不僅很好地解決了部分大寬表業務匯入需求,而且匯入效能最高可以提升 3 倍左右。

雲原生新架構

鑑於上文描述的分散式架構的天然缺陷,ByteHouse 團隊一直致力於對架構進行升級。我們選擇了業務主流的雲原生架構,新的架構在 2021 年初開始服務位元組內部業務,並於 2023 年初進行了程式碼開源(ByConity)。

雲原生架構本身有著很天然的自動容錯能力以及輕量級的擴縮容能力。同時,因為它的資料是雲端儲存的,既實現了儲存計算分離,資料的安全性和穩定性也得到了提高。當然,雲原生架構也不是沒有缺點,將原來的本地讀寫改為遠端讀寫,必然會帶來一定的讀寫效能損耗。但是,以一定的效能損耗來換取架構的合理性,降低運維成本,其實是利大於弊的。

上圖是 ByteHouse 雲原生架構的架構圖,本文針對實時匯入這塊介紹幾個重要的相關元件。

  • Cloud Service

首先,總架構分為三層,第一層是 Cloud Service,主要包含 Server 和 Catlog 兩個元件。 這一層是服務入口,使用者的所有請求包括查詢匯入都從 Server 進入。 Server 只對請求做預處理,不具體執行;在 Catlog 查詢元資訊後,把預處理的請求和元資訊下發到 Virtual Warehouse 執行。

  • Virtual Warehouse

Virtual Warehouse 是執行層。不同的業務,可以有獨立的 Virtual Warehouse,從而做到資源隔離。現在 Virtual Warehouse 主要分為兩類,一類是 Default,一類是 Write,Default 主要做查詢,Write 做匯入,實現讀寫分離。

  • VFS

最底層是 VFS(資料儲存),支援 HDFS、S3、aws 等雲端儲存元件。

基於雲原生架構的實時匯入設計

在雲原生架構下,Server 端不做具體的匯入執行,只做任務管理。因此在 Server 端,每個消費表會有一個 Manager,用來管理所有的消費執行任務,並將其排程到 Virtual Warehouse 上執行。

因為繼承了 HaKafka 的 Low Level 消費模式,Manager 會根據配置的消費任務數量,將 Topic Partition 均勻分配給各個任務;消費任務的數量是可配置的,上限是 Topic Partition 數目。

基於上圖,大家可以看到左邊是 Manager ,從 catalog 拿到對應的 Offset,然後根據指定的消費任務數目,來分配對應的消費 Partition、並排程到 Virtual Warehouse 的不同節點來執行。

新的消費執行流程

因為雲原生新架構下是有事務 Transaction 保證的,所有操作都希望在一個事務內完成,也更加的合理化。

依託雲原生新架構下的 Transaction 實現,每個消費任務的消費流程主要包括以下步驟:

  • 消費開始前,Worker 端的任務會先通過 RPC 請求,向 Server 端請求建立一個事務;

  • 執行 rdkafka::poll(),消費一定時間(預設 8s)或者足夠大的 block;

  • 將 block 轉化為 Part 並 Dump 到 VFS(此時資料不可見);

  • 通過 RPC 請求向 Server 發起事務 Commit 請求

    (事務中 Commit 的資料包括:dump 完成的 part 元資料以及對應 Kafka offset)

  • 事務提交成功(資料可見

容錯保證

從上述消費流程裡可以看到,雲原生新架構下的消費,容錯保證主要是基於 Manager 和 Task 的雙向心跳以及快速失敗策略:

  • Manager 本身會有一個定期的探活,通過 RPC 檢查排程的 Task 是否在正常執行;

  • 同時每個 Task 會在消費中藉助事務 RPC 請求來校驗自己的有效性,一旦校驗失敗,它可以自動 kill;

  • 而 Manager 一旦探活失敗,則會立即拉起一個新的消費任務,實現秒級的容錯保證。

消費能力

關於消費能力的話,上文提到它是一個可擴充套件性的,消費任務數量可以由使用者來配置,最高可以達到 Topic 的 Partition 數目。如果 Virtual Warehouse 中節點負載高的話,也可以很輕量地擴節點。

當然,Manager 排程任務實現了基本的負載均衡保證——用 Resource Manager 來做任務的管理和排程。

語義增強:Exactly—Once

最後,雲原生新架構下的消費語義也有一個增強——從分佈書架構的 At-Least-Once 升級到 Exactly—Once。

因為分散式架構是沒有事務的,只能做到一個 At-Least-Once,就是任何情況下,保證不丟資料,但是一些極端情況可能會有重複消費發生。到了雲原生架構,得益於 Transaction 的實現,每一次消費都可以通過事務讓 Part 和 Offset 實現原子性提交,從而達到 Exactly—Once 的語義增強。

Memory buffer

對應 HaKafka 的 memory table,雲原生架構同樣實現了匯入記憶體快取 Memory Buffer。

與 Memory Table 不同的是,Memory Buffer 不再繫結到 Kafka 的消費任務上,而是實現為儲存表的一層快取。這樣 Memory Buffer 就更具有通用性,不僅是 Kafka 匯入可以使用,像 Flink 小批量匯入的時候也可以使用。

同時,我們引入了一個新的元件 WAL 。資料匯入的時候先寫 WAL,只要寫成功了,就可以認為資料匯入成功了——當服務啟動後,可以先從 WAL 恢復未刷盤的資料;之後再寫 Memory buffer,寫成功資料就可見了——因為 Memory Buffer 是可以由使用者來查詢的。Memory Buffer 的資料同樣定期刷盤,刷盤後即可從 WAL 中清除。

業務應用及未來思考

最後簡單介紹實時匯入在位元組內部的使用現狀,以及下一代實時匯入技術的可能優化方向。

ByteHouse 的實時匯入技術是以 Kafka 為主,每天的資料吞吐是在 PB 級,匯入的單個執行緒或者說單個消費者吞吐的經驗值在 10-20MiB/s。(這裡之所以強調是經驗值,因為這個值不是一個固定值,也不是一個峰值;消費吞吐很大程度上取決於使用者表的複雜程度,隨著表列數增加,匯入效能可能會顯著降低,無法使用一個準確的計算公式。因此,這裡的經驗值更多的是位元組內部大部分表的匯入效能經驗值。)

除了 Kafka,位元組內部其實還支援一些其他資料來源的實時匯入,包括 RocketMQ、Pulsar、MySQL(MaterializedMySQL)、 Flink 直寫等。

關於下一代實時匯入技術的簡單思考:

  • 更通用的實時匯入技術,能夠讓使用者支援更多的匯入資料來源。

  • 資料可見延時和效能的一個折衷。

 

點選跳轉 ByteHouse雲原生資料倉庫 瞭解更多