位元組跳動的 Flink OLAP 作業排程和查詢執行優化實踐

語言: CN / TW / HK

一、背景

位元組跳動內部有很多混合計算的需求,需要一套既支援 TP 計算,也支援 AP 計算的系統。下圖是位元組跳動 HTAP 系統的總體架構。系統使用內部自研的資料庫作為 TP 計算引擎,使用 Flink 作為 AP 的計算引擎。

位元組跳動 HTAP 系統的總體架構

HTAP 系統對外支援 MySQL 協議,MySQLProxy 接收到查詢後根據查詢的複雜度和特點(是否使用索引等),將查詢分發給 TP 或者 AP 計算引擎。Flink SQL Gateway 是 AP 計算引擎的查詢入口,接收到 AP 查詢後生成 Flink 作業執行計劃,並提交到 Flink 叢集排程和執行。AP 計算引擎有一個列式儲存,Flink 叢集通過 Catalog 和 Connector 的介面,分別與儲存層的元資訊和資料查詢介面進行互動。AP 計算引擎完成計算後,Client 端會向 Flink Gateway 發起讀取結果資料請求,Gateway 再向 Flink 叢集讀取結果資料,所有結果資料返回給 Client 後作業就完成了整個 AP 計算流程。

Flink 是流批一體的計算引擎,在業界通常作為流式計算引擎。在 OLAP 計算引擎的選型上,我們主要考慮和對比了 Flink 與 Presto。

首先從架構上看,Flink 支援多種不同的部署模式,Flink 的 Session 叢集是一個非常典型的 MPP 架構,這是 Flink 可以支援 OLAP 計算的前提和基礎。Flink對作業的計算執行總體上可以分為執行計劃、作業 Runtime 管理、計算任務執行管理、叢集部署和 Failover 管理 4 大部分。從上圖 Presto 和 Flink OLAP 的總體架構以及功能模組圖來看,兩套系統在支援這些計算功能的具體實現上有很大的差異,但他們提供的系統能力和模組功能基本上是一致的。所以 Flink 引擎在架構及功能實現上,可以支援完整的 Flink OLAP 的計算需求。

在位元組跳動內部,Flink 最初被用作流式計算。後來由於 Flink 流批一體計算能力的發展,在一些實時數倉場景我們也使用 Flink 作為批式計算引擎。我們最終選用 Flink 作為 AP 計算引擎,主要基於三個方面的考慮:

  • 統一引擎降低運維成本。位元組跳動流式計算團隊對 Flink 有非常豐富的運維和優化開發經驗,在流批一體的基礎上,使用 Flink 作為 AP 計算引擎可以降低開發和運維成本;

  • 生態支援。位元組內部有很多業務方使用 Flink SQL 來開發流式和批式作業,我們對 Flink 的 Connector 也進行了很多功能拓展,對接了很多位元組內部的儲存系統,所以使用者使用 Flink 執行 OLAP 計算非常方便;

  • 效能優勢。位元組 Flink 團隊曾進行過 TCP-DS 相關的基準測試 Benchmark,Flink 計算引擎相比 Presto 和 Spark SQL,在計算效能上並不遜色,在某些查詢方面甚至是佔優的。

 

二、問題和分析

在具體應用中,Flink 引擎如何支援 OLAP 計算?

首先在接入層,我們使用 Flink SQL Gateway 作為接入層,提供 Rest 協議直接接收 SQL 語句查詢;

架構上,在 K8s 上拉起 Flink 的 Session 整合,這是一個非常典型的 MPP 架構;

計算模式上,使用 Batch 模式以及計算全拉起的排程模式,減少了計算節點之間的資料落盤且能提升 OLAP 計算的效能。

在 Flink OLAP 計算過程中,主要存在以下幾個問題:

  • Flink OLAP 計算相比流式和批式計算,最大的特點是 Flink OLAP 計算是一個面向秒級和毫秒級的小作業,作業在啟動過程中會頻繁申請記憶體、網路以及磁碟資源,導致 Flink 叢集內產生大量的資源碎片;

  • OLAP 最大的特點是查詢作業對 Latency 和 QPS 有要求的,需要保證作業在 Latency 的前提下提供比較高的併發排程和執行能力,這就對 Flink 引擎提出了一個新的要求。

為了測試 Flink 執行 OLAP 計算的能力,我們對 Flink 作業排程進行 Benchmark 測試:

  • 測試作業:設計了三組不同複雜度的作業,分別是單節點作業、兩個節點的 Wordcount 作業以及 6 個節點的 Join 作業。每組作業的計算節點併發度均為 128;

  • 測試環境:選取了 5 臺物理機啟動一個 Flink Session 叢集,叢集內有 1 萬多個 Slot;

  • 測試步驟:開發了一個作業提交的Client,Client 多執行緒併發提交作業,統計 10 分鐘之內完成的作業數量及完成作業的平均 Latency。

排程 Benchmark 測試結果

測試結果如上圖所示,分為兩方面:QPS 和 Latency 。

QPS 測試結果分析:作業 QPS 隨著 Client 執行緒數增加而小幅增加,並很快達到瓶頸。

  • 單節點作業,Client 單執行緒提交作業時 QPS 為 7.81;Client 執行緒數為 4 時,達到 QPS 極限,接近 17 ;

  • Wordcount 兩節點作業,Client 單執行緒提交作業時 QPS 為 1.38;Client 執行緒數為 32 時, QPS 為 7.53;

  • Join 的作業表現最差,Client 單執行緒提交作業時 QPS 只有 0.44;Client 執行緒數增加到 32 時,QPS 也只有 2.17。

Latency測試結果分析:作業 Latency 隨著 Client 執行緒數增加而大幅度增加。

  • 單作業的 Latency 從 100 多毫秒增加到 2 秒;

  • Wordcount 作業的從 700 多毫秒增加到 4 秒;

  • Join 的作業從 2 秒增加到了 15 秒多,有數倍的增長。

Flink 引擎這樣的作業排程效能在線上業務使用過程中是不可接受的

 

三、排程執行優化

針對 Flink 併發作業排程的效能問題,我們也曾嘗試針對一些效能的瓶頸點進行簡單的優化,但效果並不理想。所以我們針對 Flink 作業的排程執行全鏈路進行了分析,將 Flink 作業的執行分為作業管理、資源申請、計算任務三個主要階段,隨後針對每個階段進行了相應的效能優化和改進。

作業管理優化

Flink 通過 Dispatcher 模組接收和管理作業,整個作業的執行過程可以分為 4 個步驟:初始化、作業執行準備、啟動作業執行、結束作業執行。

Dispatcher 內部有 3 個執行緒池負責執行作業的 4 個步驟,分別是 Netty/Rest、Dispatcher Actor 以及 Akka 執行緒池。根據測試和分析:

  • Netty/Rest 執行緒池預設執行緒數量太少;

  • Dispatcher Actor 單點處理且執行了一些非常重量級的作業操作;

  • Akka 執行緒池過於繁忙,不僅要負責 Dispatcher 內的作業管理,還負責了很多 每個作業 JobMaster 的具體執行。

針對上述問題,我們分別進行了相應的優化。

  • 加大了 Netty/Rest 執行緒池的大小;

  • 對作業管理流程進行拆解,建立了兩個獨立的執行緒池:IO 執行緒池和 Store 執行緒池,分別負責執行作業管理過程中比較重量級的操作,減輕 Dispatcher Actor 和 Akka 執行緒池的工作壓力。

在 Flink 執行作業計算過程中,會有很多定時任務,包括 Flink 各個模組和元件間的心跳檢查、作業資源申請過程中的超時檢查等。Flink 會將這些定時任務放到 Akka 執行緒池裡定時排程執行,當一個作業已經結束時,這個作業相關的定時任務無法被及時回收和釋放。這會使 Akka 執行緒池裡快取的定時任務過多,導致 JobManager 節點產生大量的 FullGC,根據我們的測試分析,在高 QPS 場景下,JobManager 程序有 90% 左右的記憶體被這些定時任務佔用。

針對這個問題我們也進行相應的優化,在每一個作業啟動時會為它建立一個作業級別的本地執行緒池,作業相關的定時任務會先提交到本地執行緒池,當這些任務需要被真正執行時,本地執行緒池會將它們傳送到 Akka 執行緒池直接執行。作業結束後會直接關閉本地執行緒池,快速釋放定時任務資源。

資源申請優化

位元組跳動內部目前使用的是 Flink 1.11 版本,Flink 資源申請主要是基於 Slot 維度,我們使用全拉起的作業排程模式,所以作業會等待 Slot 資源全部申請完成之後才會進行計算任務排程。比如,Resource Manager 有 4 個 Slot ,現有兩個作業併發申請資源,每個作業都需要三個 Slot,如果它們都只申請到兩個 Slot ,就會導致兩個作業相互等待 Slot 資源而產生死鎖。

針對這個問題,我們選擇將資源申請從 Slot 粒度優化為作業 Batch 粒度,每個作業會將它的資源申請打成 Batch 一次性申請。作業 Batch 粒度的資源申請主要需要解決兩個難點:

  • 和原先 Slot 粒度的資源申請如何相容的問題。因為許多機制是基於 Slot 粒度,如資源申請超時處理等,我們實現了兩個機制的無縫融合;

  • Batch 資源申請的事務性。我們需要保證一個 Batch 內的資源能夠同時申請成功或申請失敗時同時釋放,如果有異常情況,這些資源申請會同時取消。

任務執行優化

  • 作業間連線複用

Flink 上下游計算任務通過 Channel 傳輸資料,在一個 Flink 作業內部,相同計算節點的網路連線是可以複用的,但是不同作業間的網路連線無法複用。一個作業所有的計算任務結束之後,它在 TaskManager 之間的網路連線會被關閉並且釋放。當另外一個作業執行計算時,TaskManager 需要建立新的網路連線,這樣就會出現在Flink引擎支援 OLAP 計算時,頻繁建立和關閉網路連線,最終影響計算任務的執行效能和作業的 Latency 與 QPS。同時這個過程也會導致資源使用的不穩定,增加 CPU 的使用率和 CPU 的波峰波谷抖動。

TaskManager 的多作業網路連線複用,主要存在以下幾個難點:

  • 穩定性問題。Channel 不僅用來做資料傳輸,而且還與計算任務的反壓相關,所以直接複用網路連線可能會導致計算任務餓死以及死鎖等問題;

  • 髒資料問題。不同的作業複用網路連線有可能引起計算任務在執行過程中產生髒資料;

  • 網路連線膨脹和回收問題。對於不再使用的網路連線,我們需要及時探測並且關閉,釋放資源。

我們實現的 Flink 作業間網路連線複用,主要方案是在 TaskManager 內增加一個 Netty 連線池,計算任務需要建立網路連線時,先向連線池發起請求,連線池根據需要建立或複用已經存在的連線;計算任務完成計算後,會向連線池釋放連線。

為了保證系統的穩定性,Flink 現有作業內的網路連線使用機制保持不變,上下游計算任務互動時增加發送連線校驗。每個 Netty 連線有三個狀態,分別是 Idle、Busy 以及 Invalid。網路連線池會管理網路連線的三個狀態,後臺有定時任務會檢查連線狀態,同時根據需要建立和回收網路連線。

  • PartitionRequest 優化

Partition Request 優化主要分為兩個方面:Batch 優化和通知機制優化。

一個作業內上下游計算任務建立連線後,下游的計算任務會向上遊傳送一個 Partition Request 訊息,告訴上游任務需要接收哪些 Partition 資料的資訊。Partition Request 訊息最大的問題是訊息量太大,是上下游計算節點併發度的平方量級。

Batch 優化的主要目的是將相同 TaskManager 內上下游計算任務間的 Partition Request 訊息數量進行打包處理,降低 Partition Request 的量級。優化過後,在一個計算節點 100 併發的情況下,兩個 TaskManager Partition Request 數量可以從原先的 100*100 降低到現在的 2*2,由併發的平方降為 TaskManager 數量的平方,改善非常明顯。

由於上下游的計算任務是並行部署的,所以會存在下游計算任務部署完成之後,上游的計算任務還沒有開始部署的情況。當下遊的計算任務向上遊傳送一個 Partition Request 時,上游的 TaskManager 會返回一個 Partition Not Found 的異常,下游的計算任務根據這個異常會不斷重試和輪詢,直到請求完成。

這個過程存在兩個問題,一個是 Partition Request 數量過多,另外一個是下游的計算任務在輪詢重試的過程中有時間差,導致計算任務的 Latency 加大。所以我們為上下游計算任務互動實現了一個 Listen+Notify 機制。上游的 TaskManager 接受到下游計算任務傳送的 Partition Request 時,如果上游的計算任務還未部署,則會將 Partition Request 放入到一個 Listen 列表裡,計算任務部署完成再從計算佇列裡面獲取 Partition Request ,並且回撥執行完成整個互動。

  • 網路記憶體池優化

TaskManager 啟動後,會預先分配一塊記憶體作為網路記憶體池,計算任務在 TaskManager 部署時會從網路記憶體池裡分配一個本地記憶體池,並加入到網路記憶體池列表。計算任務建立本地記憶體池後,申請記憶體分片以及釋放本地記憶體池等所有操作時,網路記憶體池會遍歷本地記憶體池列表,TaskManager 中並行執行的計算任務很多時,這個遍歷的次數會非常大,是 Slot 的數量乘以上游併發度的數量,甚至會達到千萬量級。

遍歷的主要目的是提前釋放其他本地記憶體池中空閒的記憶體分片,提升記憶體的使用率。我們的主要優化是將遍歷操作刪去,雖然這會造成一部分的記憶體浪費,但能夠極大地提升計算任務的執行性。

除了上述優化,我們還做了很多其他的優化和改造。

  • 在計算排程方面,我們支援實現全拉起和 Block 結合的排程模式;

  • 在執行計劃方面,我們優化和實現了很多計算下推,將計算下推到儲存去執行;

  • 在任務執行方面,我們針對任務的拉起和初始化都做了很多相關的優化和實現。

排程 Benchmark 優化

排程 Benchmark 的優化效果

完成作業排程和執行優化後,我們對優化後的 Flink 叢集進行 Benchmark 測試。

QPS 測試結果:最大QPS提升顯著

  • 單節點作業 QPS 從原先的 17 提升到現在的 33;

  • Wordcount 兩節點作業最高 QPS 從 7.5 提升到 20 左右;

  • Join 作業從原先最高的 2 左右提升到現在 11 左右。

Latency 測試結果:32 個執行緒下 Latency 下降明顯

  • 單節點作業的 Latency 從原先的 1.8 秒降低到 200 毫秒左右;

  • Wordcount 兩節點作業從 4 秒降低到 2 秒不到;

  • Join 作業從原先的 15 秒降為 2.5 秒左右。

 

四、未來計劃

現在 Flink OLAP 雖然在實際業務場景中已經投入使用,但在很多方面還需要繼續打磨和優化,我們將這塊主要分為三大部分:穩定性、效能和功能。

穩定性方面

  • 提升單點的穩定性,包括資源管理單點以及作業管理單點;

  • 優化執行時的資源使用及計算執行緒等管理,優化 OLAP 計算結果管理;

  • 其他更多穩定性相關優化。

效能方面

  • 包括計劃優化、細粒度計算任務執行和管理優化等;

  • 面向行和列的計算優化,包括向量化計算等。

功能方面

  • 持續完善產品化建設,包括 History Server 的持續完善和建設;

  • 完善 Web 分析工具,幫助業務方更好地定位在查詢過程中發現的問題。

 

火山引擎流式計算 Flink 版現已上線。

新增小助手微信,瞭解更多 Flink OLAP 相關資訊👇