Flink OLAP 在資源管理和執行時的優化

語言: CN / TW / HK

本文整理自位元組跳動基礎架構工程師曹帝胄在 Flink Forward Asia 2022 核心技術專場的分享。Flink OLAP 作業 QPS 和資源隔離是 Flink OLAP 計算面臨的最大難題,也是位元組跳動內部業務使用 Flink 執行 OLAP 計算需要解決的最大痛點。本次分享將圍繞 Flink OLAP 難點和瓶頸分析、作業排程、Runtime 執行、收益以及未來規劃五個方面展開介紹。

 

Flink OLAP in ByteDance

 

針對內部許多混合計算的需求場景,位元組跳動提出了整合 AP 和 TP 計算的 ByteHTAP 系統,同時將 Flink OLAP 作為ByteHTAP 的 AP 計算引擎。在位元組跳動一年多的發展中, Flink OLAP 已經部署支援了 20+的 ByteHTAP 線上叢集,叢集規模達到 16000+Cores,每天承擔 50w Query 的AP流量。

上圖是 Flink OLAP 在位元組跳動的服務架構,Flink OLAP 通過 SQL Gateway 提供 Restfull 介面,使用者可以通過 Client 向 SQL Gateway 叢集提交 Query,SQL Gateway 負責 SQL 解析並生成執行計劃後提交給 Flink 叢集。Flink 叢集接收到請求後,由 Dispatcher 建立 JobMaster,根據叢集內的 TM 按照一定的排程規則將 Task 部署到對應的 TaskManager 上,最後 Task 將結果推回 Dispatcher,並且最終由 Dispatcher 推給 Client。

挑戰

Flink OLAP 在發展期間也遇到了很多挑戰。不同於流式計算任務,OLAP 任務大部分都是秒級、毫秒級的小作業,具有 QPS 高、時延小的特點。以內部業務為例,業務方要求在高峰期支援大於 200 的 QPS,並且 Lantency p99 < 2s,而優化前的 Flink 排程效能還不能滿足業務方需求,因此我們針對 Flink 的排程效能全鏈路進行了瓶頸分析。

首先通過設計針對排程效能的一系列 Benchmark,從業務出發根據複雜度構建 3 組測試作業。每個 Source 節點只會產生一條資料,資料量可以忽略不計。測試環境使用 了5 臺物理機啟動了一個 Flink Serssion 叢集,總共約 500 Cores CPU,大約 1.25w 個 Slot,實現了一個 Benchamrk 的 Client 可以根據不同的併發度批量提交作業。我們在benchmark結果中統計了 10min 內完成的作業數量,並計算作業完成的平均 Latency。

為了更好的分析 Flink 排程階段的效能瓶頸,將排程階段分成了三個階段。第一個階段是叢集 Dispatcher 收到作業請求後直接完成作業並返回結果;第二階段是作業在 JobMaster 中申請完資源後直接完成並返回結果。第三個階段是 JobMaster 將 Task 部署到 TaskManager 後,TaskManager 不執行邏輯直接將 Task 置為完成並返回,jobMaster接收所有Task完成的訊息後,將作業置為結束。在實踐中發現從資源申請到作業部署的過程中 QPS 效能下降明顯。

在 E2E 的測試場景中,可以看到在 WordCount 作業中 Client 併發度從 16 提升到 32 後 Latency 上升明顯,Join 作業更是在 4 併發到 16 併發時 Latency 明顯上漲。

通過上面的 Benchmark 和 Flink 部署的全流程分析可以發現主要有兩個問題,一塊是作業在資源管理和部署上的瓶頸,一塊是任務在執行時延遲瓶頸。

針對OLAP場景,在作業資源管理和部署方面,目前 Flink 資源管理流程和部署互動流程過於複雜。在執行方面,Flink 的作業拉取結果流程存在較多限制,另外大量的小查詢會導致資源頻繁的建立銷燬。針對上面這些問題,我們分別從作業排程和執行時兩個大方向進行優化。

 

作業排程

 

資源管理流程優化

目前位元組 OLAP 的改造是基於 Flink-1.11 版本,因此先介紹下 Flink-1.11 的叢集資源申請和釋放流程。首先TaskManager 在部署完成後向 ResourceMananger 註冊,JobMaster 向 RM 進行資源申請,RM 根據申請的 Slot 對 TM 進行部署。TM 收到部署請求後與 JobMaster 建立連線並提供 Slot 資源。最後由 JobMaster 對 Slot 的資源進行分配並向 TM 進行部署。資源釋放流程同樣,在任務結束後,JobMaster 會釋放對應的 Slot 的資源,並釋放 TaskManager 連線,TM 也會通知 RM Slot 資源釋放。

從以上流程可以看到作業申請和釋放 Slot 資源分別需要 JobMaster、ResourceManager 以及 TaskManager 進行 2 次網路互動。這些耗時在單查詢情況下雖然可能只增加了幾十到幾百毫秒,但在多查詢情況下,會將這部分的耗時放大,甚至使查詢作業的資源申請耗時增加為秒級。

同時在這個流程中可以看到 ResourceManager、JobMaster 以及 TaskManager 三個核心功能模組在資源申請和釋放上的功能劃分不夠清晰,ResourceManager 管理計算資源存活,另一方面又管理作業的資源分配,造成查詢資源申請的單點問題;另一方面,TaskManager 不僅執行計算任務,同時還參與計算資源的申請和分配流程,申請和釋放流程過長。

此外資源分配中 SlotPool 處理 Slot 申請和分配比較複雜,每個 Task 需要獲取上游 Task 的分配位置,同時 Share Group 分配資源有多次排序和遍歷,增加了 Slot 分配的耗時,這個隨著作業複雜度上升,耗時也會增加。

在原流程中 ResourceManager 分配 Slot 時需要確保 TaskManager 中指定的 Slot 是空閒可用的,這會增加申請和釋放流程的複雜度。同時 TaskManager 通過在資源申請流程中根據 Slot 初始化對應的 TaskSlot 以及 MemoryManager,確保每個 Slot 只被一個作業的多個計算任務使用。通過分析可以發現,多個計算任務在共享 Slot 過程中,主要是共享 MemoryManager 管理 Batch 運算元的 Aggregate、Join、Sort 等運算元的臨時狀態以及流計算任務中的 Rocksdb 堆外記憶體申請和釋放,這部分記憶體共享的實現跟作業沒有強繫結關係,所以多個作業的多個計算任務也可以共享 MemoryManager。因此在為了簡化資源申請流程,及作業的資源共享上,通過去除 Slot 的感念,在 TaskMananger 中使用全域性共享的 MemoryManager。

在優化後的流程中,TaskManager 啟動後會向 ResourceManager 進行註冊,ResourceManager 向 Dispatcher 同步 TM 資訊。這裡的 Dispatcher 會同時維護一份叢集 TM 的列表,並在作業提交時提供給 Jobmasger。JobMaster 根據叢集的 TM 根據指定的部署策略選擇部署的 TM 並向 TM 傳送部署請求。優化後的各元件分工如下:

  • ResourceManager:管理計算節點的存活以及節點的資源資訊彙總,不再執行 OLAP 型別查詢計算的資源分配;
  • TaskManager:支援計算任務的執行,在將資源資訊彙報給 ResourceManager 後,不參與作業資源申請和釋放流程。
  • JobMaster:支援和實現查詢計算的資源分配。

在資源流程改造中,因為去除了 Slot 限制,因此在作業部署上可以以 TM 為粒度批量進行作業部署,通過對部署請求進行打包,大大減少了 JobMaster 與 TaskManager 之間的請求次數。

任務部署結構優化

在完成上面的優化後,通過對 Source、WordCount 以及 Join 等三類不同複雜度的作業計算任務部署效能測試,我們發現不同作業複雜度對於計算任務部署的效能影響非常大。

  1. Source 作業,只包含 Source 節點,共 1 個節點,128 併發,共 128 個計算任務,按照 TM 部署策略會使用 10 個 TM,每個 TM 部署 13 個計算任務。
  2. WordCount 作業,包含 Source 節點和 Aggregate 節點,共 2 個節點,128 併發,共 256 個計算任務,使用 10 個 TM,每個 TM 部署 26 計算任務。
  3. Join 作業,包含 3 個 Source 節點,2 個 Join 節點以及 1 個 Aggregate 節點,共 6 個節點,128 併發,共 768 個計算任務,使用 25 個 TM,每個 TM 部署 30 計算任務。

從上面的資料可以看到,隨著任務複雜度的提升,序列化的總耗時增加明顯,WordCount 的序列化總耗時約 122s,而 Source 作業的耗時在 5s 左右。Join 作業的序列化耗時更是在 200s 以上。針對這一現象,可以從兩個維度進行優化:

  1. 資料量大小:通過分析作業的部署結構發現每個 Task 的部署結構包括作業資訊、作業配置等資訊,同時包含該 Task 的資訊,包含 Task 名稱,上下游資訊,上下游的位置資訊等。這其中同一個作業不同 Task 的作業維度資訊都是相同的,同時如果作業是 All To All 的連線方式,他們的上下游資訊也是可以共享。因此可以對部署結構的冗餘資訊進行提取,比如將作業維度資訊、相同 Task 資訊、上下游位置資訊等。
  2. 序列化流程:原有的序列化是由 TM 的 Actor 負責的,高併發下存在單執行緒瓶頸,所以將部署請求抽出為單獨的鏈路,通過配置多 Netty Thread,併發處理,再將序列完的結構交由 TM 的 Acotr 處理。

經過以上兩個維度的優化,Source 作業的序列化大小由 63kb 降為 5.6kb,耗時由 5462kb 降為 644kb,複雜作業的優化更為明顯,WordCount 作業的序列化大小由 317.3kb 降為 11.1kb,耗時由 122546ms 降為 940ms,Join 作業的序列化大小由 557.5ms 降為 28.3ms,耗時由 219189ms 降為 2830ms。

模組互動優化

在完成資源流程申請和釋放優化後,剩下的模組互動主要是 JobMaster 和 TaskManager 的互動。一部分主要是在 JobMaster 初始化完成後,會與所有的 TM 建立網路連線,同時在作業執行時兩者之間會維持心跳連線;另一部分主要的互動是 TM 會上報 Task 的執行狀態,包括部署完成進入 running 的狀態以及任務結束的終態上報。這裡的互動在高併發情況下規模也是比較可觀的,以 Task 數為 128 的測試作業在 QPS 100、TM 100 的環境下為例,JM 每秒建立 10000 左右的連結,收到 Task 的更新請求 256w 次。

在之前的資源管理優化中,Dispatcher 已經維護了叢集內 TaskManager 的所有節點資訊,因此可以在 TaskMananger 初始化完成後與 Dispatcher 建立連線和心跳。所有的 JobMaster 複用該連線,而不單獨維護連線及心跳。同時之前的心跳連線中 TM 需要上報 Slot 使用快照等資訊,這一部分在資源管理優化後也不需要。這裡有個問題是之前 JobMaster 需要通過心跳感知 TM 的狀態,而改造後由 Dispatcher 負責維護與 TM 的心跳,因此當 Dispatcher 感知到 TM 異常 後,會通知相關的 JobMaster 進行 Failover 處理。

在 Task 任務更新請求的優化中,在 OLAP 的任務場景下預設採用 Pipeline 模式,在這種模式下,所有 Task 會同時開始排程,因此其實並不關心單個 Task 粒度的狀態變化,同時 Task 直接也沒有狀態的相互依賴,所以我們可以將狀態更新請求進行打包,在 Task 部署完成後 JobMaster 直接將狀態更新為 Running 不進行額外的互動請求。只在作業結束時 TM 以任務粒度進行更新。同時針對一些 Block 連線的場景,比如自 Join,TM 會對這類 Task 進行單獨狀態更新來防止死鎖。

Runtime 執行

 

Pull VS Push

Flink 計算結果鏈路基於 Pull 機制,從 Gateway 向 JobManager 發起 Pull 請求,JobManager 再向 TaskManager 節點 Pull 結果資料。Gateway 到 JobManager 之間存在 Pull 輪詢請求,存在固定的輪詢間隔時間,增加了查詢的 Latency,很難滿足 OLAP 業務對 Letancy 要求比較高的場景。同時為了支援和實現 Pull 機制,會建立一些臨時的網路、執行緒等資源,例如在 Sink 節點會建立 Socket Server,在 Gateway 節點會建立輪詢執行緒等,浪費了計算節點和 Gateway 節點的資源。此外,Dispatcher 節點是一個 Akka Actor 單點,Pull 資料流程會通過 Dispatcher 節點處理和轉發,加大了 Dispatcher 處理訊息負擔。

因此我們考慮將獲取結果鏈改為 Push 機制,一方面可以解決輪詢 Latency 的問題,同時將結果主動 Push 向 Client,也可以避免 Dispatcher 接受請求的單執行緒瓶頸,Gateway 也不需要建立輪詢執行緒進行輪詢。但 Push 機制也存在新的挑戰:

  1. Push 模式下資料返回進行流量控制,避免 Client 端資料堆積產生 OOM;
  2. 在原先的機制下,Client 會通過 Dispatcher 獲取當前任務狀態,在 Push 模式下則需要主動將作業狀態返回。

作業結果 Push 架構

為保證 Flink 現有功能架構的穩定性,通過在 JobManager 中新增一個獨立的 NettyServer 提供 Socket 服務,SocketServer 接收 Gateway 建立連線和提交作業,然後將接收到的 JobGraph 物件通過 DispatcherGateway 提交到 Dispatcher,再由 Dispatcher 將 Task 部署到 TaskMananger 中。其中 Sink 運算元會與 JM 的 NettyServer 建立單獨的 Channel 連線 Push 結果。JM 的 NettyServer 收到結果後會將結果推給 Gateway。

整個 Push 過程利用 Netty 的 Watermark 機制進行流量控制,傳送節點前判斷 Channel 是否可寫,不可寫則進行阻塞操作,形成反壓。只有在接收節點消費資料完成並且 Netty Watermark 恢復到正常值後,傳送節點才會恢復結果推送。同時 Sink 節點發送的資料會帶上當前的結果的狀態,用於判斷該 Sink 節點是否完成。之後會在 NettyServer 中彙總這些 Task 的結果狀態,當所有資料傳送完成後,NettyServer 向 Gateway 傳送任務結束的狀態資訊。通知 Gateway 資料已經全部發送完成,該 Query 已經結束。在 50 TM Client 128 併發下空資料測試下,Push 模式將 QPS 從 Pull 模式的 850 提升到 4096,提升了 5 倍左右。

執行緒優化

通過對 TM CPU 使用的分析發現在高併發情況下,執行緒的建立能佔用 30% 以上的 CPU。因此通過池化執行緒池減少執行緒的建立銷燬,但是池化執行緒操作會帶來的問題是由於原來的 Cancel 執行緒只需關心執行執行緒是否存活,但執行緒池中執行緒資源是複用的,因此需要對執行執行緒進行封裝,維護執行緒當前執行的 Task,Interrupt 執行緒在進行 Interrupt 操作時也會判斷中斷的 Task 是否正確,線上程結束後更新狀態用於其他 Cancel 執行緒判斷當前執行執行緒的狀態。

鎖優化

在對 Thread 的 Profier 分析中發現存在高併發下 Task 頻繁的搶鎖操作。其中 NetworkBufferPool 用於提供資料傳輸的記憶體,MemoryPool 為運算元計算時提供堆外記憶體。以 NetworkBufferPool 為例,Task 初始化完成後,LocalBufferPool 為空,所有的網路記憶體都需要向 NetworkBufferPool 申請,而 NetworkBufferPool 是整個 TM 唯一的,為了保證一致性,所有的記憶體申請和釋放都需要申請鎖。以 100 併發, Task 100,TM 為 1 為例,每秒會產生 1w 次的鎖請求。而在 NetworkBufferPool 記憶體足夠的情況下,可以通過將記憶體打包進行申請以減少記憶體申請的次數從而減少鎖搶佔的開銷。

Task 在向記憶體池申請記憶體會以 BatchSegment 為單位申請,一個 BatchSegment 封裝了 10 個 Segment。通過批量記憶體申請將鎖搶佔的次數減少為原來的十分之一。在 10 併發,Task 100,TM 為 1,BatchSegment size 為 10 的情況下,申請 Segment 的鎖佔用請求由 1000/s 變為 100/s,模擬申請 10w 個 Segment 的總耗時由 4353ms,縮短為 793 ms。

記憶體優化

另外在高 QPS 測試中發現當 JM 執行的作業數量比較多時,JM 會頻繁的觸發 Fullgc,導致作業 Latency 上漲。這種情況通過對 JM 的記憶體分析可以發現大部分的記憶體都是用在儲存作業的 Metric 資訊中,包括作業維度以及 Task 維度的資訊。而在 OLAP 的場景下位元組內部有單獨的 MetricrePorter,很多 Metric 資訊並不需要同步到 JM 端。因此通過 TM 端的 Metric 中加入白名單對 Metric 上報進行過濾 ,只保留需要使用的 Metric,比如TM資源資訊、Task io 資訊等。在對 Metric 進行過濾後,JM 的 Full GC 頻率降低了 88%,對作業 Latency 的影響顯著減少。

收益

 

完成作業排程和執行優化後,通過對優化後的 Flink 叢集進行 Benchmark 測試。測試結果顯示,QPS 提升顯著。單節點作業 QPS,從原先的 17 提升到現在的 33。WordCount 兩節點作業最高 QPS,從 7.5 提升到 20 左右。Join 作業的 QPS 從 2 提升至 11,Latency 在 32 個執行緒下,下降明顯。單節點作業的 Latency,從 1.8 秒降低到 200 毫秒左右。WordCount 兩節點作業從 4 秒降低到 1 秒多。Join 作業的 QPS 從原先的 15 秒降為 2.5 秒左右。

在業務收益方面,ByteHTAP 受到了公司核心業務的廣泛認可。目前,已經接入 User Growth、電商、幸福裡、飛書等 12 個核心業務。線上汲取能支援業務複雜查詢高峰期 200 左右的 QPS,Latency P99 控制在 5 秒以下。

未來規劃

 

在公司內部,需要支援多租戶的業務場景,該業務場景包含了少量重要租戶,冷租戶多,租戶數量大的特點,存在 Noisy Neghbor,資源利用率低的問題。

針對這類業務會主要在下面幾個維度提高 Serverless 能力:

  1. 通過支援 TaskManager 資源隔離和單節點維度支援計算任務優先順序排程解決 Noisy Neghbor 的問題;
  2. 通過支援彈性擴縮容解決資源利用率問題。

在效能提升方面,主要分為三個部分:

  1. 叢集負載效能:JobManager 支援水平擴充套件、叢集資源利用率優化、Task 排程效能優化;
  2. Flink OLAP 執行時優化:執行時網路訊息優化、資源申請流程優化;
  3. 冷啟動優化:Gateway 冷啟動優化、網路初始化優化、記憶體申請流程優化。