Flink OLAP 在資源管理和運行時的優化

語言: CN / TW / HK

本文整理自字節跳動基礎架構工程師曹帝胄在 Flink Forward Asia 2022 核心技術專場的分享。Flink OLAP 作業 QPS 和資源隔離是 Flink OLAP 計算面臨的最大難題,也是字節跳動內部業務使用 Flink 執行 OLAP 計算需要解決的最大痛點。本次分享將圍繞 Flink OLAP 難點和瓶頸分析、作業調度、Runtime 執行、收益以及未來規劃五個方面展開介紹。

 

Flink OLAP in ByteDance

 

針對內部許多混合計算的需求場景,字節跳動提出了整合 AP 和 TP 計算的 ByteHTAP 系統,同時將 Flink OLAP 作為ByteHTAP 的 AP 計算引擎。在字節跳動一年多的發展中, Flink OLAP 已經部署支持了 20+的 ByteHTAP 線上集羣,集羣規模達到 16000+Cores,每天承擔 50w Query 的AP流量。

上圖是 Flink OLAP 在字節跳動的服務架構,Flink OLAP 通過 SQL Gateway 提供 Restfull 接口,用户可以通過 Client 向 SQL Gateway 集羣提交 Query,SQL Gateway 負責 SQL 解析並生成執行計劃後提交給 Flink 集羣。Flink 集羣接收到請求後,由 Dispatcher 創建 JobMaster,根據集羣內的 TM 按照一定的調度規則將 Task 部署到對應的 TaskManager 上,最後 Task 將結果推回 Dispatcher,並且最終由 Dispatcher 推給 Client。

挑戰

Flink OLAP 在發展期間也遇到了很多挑戰。不同於流式計算任務,OLAP 任務大部分都是秒級、毫秒級的小作業,具有 QPS 高、時延小的特點。以內部業務為例,業務方要求在高峯期支持大於 200 的 QPS,並且 Lantency p99 < 2s,而優化前的 Flink 調度性能還不能滿足業務方需求,因此我們針對 Flink 的調度性能全鏈路進行了瓶頸分析。

首先通過設計針對調度性能的一系列 Benchmark,從業務出發根據複雜度構建 3 組測試作業。每個 Source 節點只會產生一條數據,數據量可以忽略不計。測試環境使用 了5 台物理機啟動了一個 Flink Serssion 集羣,總共約 500 Cores CPU,大約 1.25w 個 Slot,實現了一個 Benchamrk 的 Client 可以根據不同的併發度批量提交作業。我們在benchmark結果中統計了 10min 內完成的作業數量,並計算作業完成的平均 Latency。

為了更好的分析 Flink 調度階段的性能瓶頸,將調度階段分成了三個階段。第一個階段是集羣 Dispatcher 收到作業請求後直接完成作業並返回結果;第二階段是作業在 JobMaster 中申請完資源後直接完成並返回結果。第三個階段是 JobMaster 將 Task 部署到 TaskManager 後,TaskManager 不執行邏輯直接將 Task 置為完成並返回,jobMaster接收所有Task完成的消息後,將作業置為結束。在實踐中發現從資源申請到作業部署的過程中 QPS 性能下降明顯。

在 E2E 的測試場景中,可以看到在 WordCount 作業中 Client 併發度從 16 提升到 32 後 Latency 上升明顯,Join 作業更是在 4 併發到 16 併發時 Latency 明顯上漲。

通過上面的 Benchmark 和 Flink 部署的全流程分析可以發現主要有兩個問題,一塊是作業在資源管理和部署上的瓶頸,一塊是任務在運行時延遲瓶頸。

針對OLAP場景,在作業資源管理和部署方面,目前 Flink 資源管理流程和部署交互流程過於複雜。在運行方面,Flink 的作業拉取結果流程存在較多限制,另外大量的小查詢會導致資源頻繁的創建銷燬。針對上面這些問題,我們分別從作業調度和運行時兩個大方向進行優化。

 

作業調度

 

資源管理流程優化

目前字節 OLAP 的改造是基於 Flink-1.11 版本,因此先介紹下 Flink-1.11 的集羣資源申請和釋放流程。首先TaskManager 在部署完成後向 ResourceMananger 註冊,JobMaster 向 RM 進行資源申請,RM 根據申請的 Slot 對 TM 進行部署。TM 收到部署請求後與 JobMaster 建立連接並提供 Slot 資源。最後由 JobMaster 對 Slot 的資源進行分配並向 TM 進行部署。資源釋放流程同樣,在任務結束後,JobMaster 會釋放對應的 Slot 的資源,並釋放 TaskManager 連接,TM 也會通知 RM Slot 資源釋放。

從以上流程可以看到作業申請和釋放 Slot 資源分別需要 JobMaster、ResourceManager 以及 TaskManager 進行 2 次網絡交互。這些耗時在單查詢情況下雖然可能只增加了幾十到幾百毫秒,但在多查詢情況下,會將這部分的耗時放大,甚至使查詢作業的資源申請耗時增加為秒級。

同時在這個流程中可以看到 ResourceManager、JobMaster 以及 TaskManager 三個核心功能模塊在資源申請和釋放上的功能劃分不夠清晰,ResourceManager 管理計算資源存活,另一方面又管理作業的資源分配,造成查詢資源申請的單點問題;另一方面,TaskManager 不僅執行計算任務,同時還參與計算資源的申請和分配流程,申請和釋放流程過長。

此外資源分配中 SlotPool 處理 Slot 申請和分配比較複雜,每個 Task 需要獲取上游 Task 的分配位置,同時 Share Group 分配資源有多次排序和遍歷,增加了 Slot 分配的耗時,這個隨着作業複雜度上升,耗時也會增加。

在原流程中 ResourceManager 分配 Slot 時需要確保 TaskManager 中指定的 Slot 是空閒可用的,這會增加申請和釋放流程的複雜度。同時 TaskManager 通過在資源申請流程中根據 Slot 初始化對應的 TaskSlot 以及 MemoryManager,確保每個 Slot 只被一個作業的多個計算任務使用。通過分析可以發現,多個計算任務在共享 Slot 過程中,主要是共享 MemoryManager 管理 Batch 算子的 Aggregate、Join、Sort 等算子的臨時狀態以及流計算任務中的 Rocksdb 堆外內存申請和釋放,這部分內存共享的實現跟作業沒有強綁定關係,所以多個作業的多個計算任務也可以共享 MemoryManager。因此在為了簡化資源申請流程,及作業的資源共享上,通過去除 Slot 的感念,在 TaskMananger 中使用全局共享的 MemoryManager。

在優化後的流程中,TaskManager 啟動後會向 ResourceManager 進行註冊,ResourceManager 向 Dispatcher 同步 TM 信息。這裏的 Dispatcher 會同時維護一份集羣 TM 的列表,並在作業提交時提供給 Jobmasger。JobMaster 根據集羣的 TM 根據指定的部署策略選擇部署的 TM 並向 TM 發送部署請求。優化後的各組件分工如下:

  • ResourceManager:管理計算節點的存活以及節點的資源信息彙總,不再執行 OLAP 類型查詢計算的資源分配;
  • TaskManager:支持計算任務的執行,在將資源信息彙報給 ResourceManager 後,不參與作業資源申請和釋放流程。
  • JobMaster:支持和實現查詢計算的資源分配。

在資源流程改造中,因為去除了 Slot 限制,因此在作業部署上可以以 TM 為粒度批量進行作業部署,通過對部署請求進行打包,大大減少了 JobMaster 與 TaskManager 之間的請求次數。

任務部署結構優化

在完成上面的優化後,通過對 Source、WordCount 以及 Join 等三類不同複雜度的作業計算任務部署性能測試,我們發現不同作業複雜度對於計算任務部署的性能影響非常大。

  1. Source 作業,只包含 Source 節點,共 1 個節點,128 併發,共 128 個計算任務,按照 TM 部署策略會使用 10 個 TM,每個 TM 部署 13 個計算任務。
  2. WordCount 作業,包含 Source 節點和 Aggregate 節點,共 2 個節點,128 併發,共 256 個計算任務,使用 10 個 TM,每個 TM 部署 26 計算任務。
  3. Join 作業,包含 3 個 Source 節點,2 個 Join 節點以及 1 個 Aggregate 節點,共 6 個節點,128 併發,共 768 個計算任務,使用 25 個 TM,每個 TM 部署 30 計算任務。

從上面的數據可以看到,隨着任務複雜度的提升,序列化的總耗時增加明顯,WordCount 的序列化總耗時約 122s,而 Source 作業的耗時在 5s 左右。Join 作業的序列化耗時更是在 200s 以上。針對這一現象,可以從兩個維度進行優化:

  1. 數據量大小:通過分析作業的部署結構發現每個 Task 的部署結構包括作業信息、作業配置等信息,同時包含該 Task 的信息,包含 Task 名稱,上下游信息,上下游的位置信息等。這其中同一個作業不同 Task 的作業維度信息都是相同的,同時如果作業是 All To All 的連接方式,他們的上下游信息也是可以共享。因此可以對部署結構的宂餘信息進行提取,比如將作業維度信息、相同 Task 信息、上下游位置信息等。
  2. 序列化流程:原有的序列化是由 TM 的 Actor 負責的,高併發下存在單線程瓶頸,所以將部署請求抽出為單獨的鏈路,通過配置多 Netty Thread,併發處理,再將序列完的結構交由 TM 的 Acotr 處理。

經過以上兩個維度的優化,Source 作業的序列化大小由 63kb 降為 5.6kb,耗時由 5462kb 降為 644kb,複雜作業的優化更為明顯,WordCount 作業的序列化大小由 317.3kb 降為 11.1kb,耗時由 122546ms 降為 940ms,Join 作業的序列化大小由 557.5ms 降為 28.3ms,耗時由 219189ms 降為 2830ms。

模塊交互優化

在完成資源流程申請和釋放優化後,剩下的模塊交互主要是 JobMaster 和 TaskManager 的交互。一部分主要是在 JobMaster 初始化完成後,會與所有的 TM 建立網絡連接,同時在作業運行時兩者之間會維持心跳連接;另一部分主要的交互是 TM 會上報 Task 的運行狀態,包括部署完成進入 running 的狀態以及任務結束的終態上報。這裏的交互在高併發情況下規模也是比較可觀的,以 Task 數為 128 的測試作業在 QPS 100、TM 100 的環境下為例,JM 每秒創建 10000 左右的鏈接,收到 Task 的更新請求 256w 次。

在之前的資源管理優化中,Dispatcher 已經維護了集羣內 TaskManager 的所有節點信息,因此可以在 TaskMananger 初始化完成後與 Dispatcher 建立連接和心跳。所有的 JobMaster 複用該連接,而不單獨維護連接及心跳。同時之前的心跳連接中 TM 需要上報 Slot 使用快照等信息,這一部分在資源管理優化後也不需要。這裏有個問題是之前 JobMaster 需要通過心跳感知 TM 的狀態,而改造後由 Dispatcher 負責維護與 TM 的心跳,因此當 Dispatcher 感知到 TM 異常 後,會通知相關的 JobMaster 進行 Failover 處理。

在 Task 任務更新請求的優化中,在 OLAP 的任務場景下默認採用 Pipeline 模式,在這種模式下,所有 Task 會同時開始調度,因此其實並不關心單個 Task 粒度的狀態變化,同時 Task 直接也沒有狀態的相互依賴,所以我們可以將狀態更新請求進行打包,在 Task 部署完成後 JobMaster 直接將狀態更新為 Running 不進行額外的交互請求。只在作業結束時 TM 以任務粒度進行更新。同時針對一些 Block 連接的場景,比如自 Join,TM 會對這類 Task 進行單獨狀態更新來防止死鎖。

Runtime 執行

 

Pull VS Push

Flink 計算結果鏈路基於 Pull 機制,從 Gateway 向 JobManager 發起 Pull 請求,JobManager 再向 TaskManager 節點 Pull 結果數據。Gateway 到 JobManager 之間存在 Pull 輪詢請求,存在固定的輪詢間隔時間,增加了查詢的 Latency,很難滿足 OLAP 業務對 Letancy 要求比較高的場景。同時為了支持和實現 Pull 機制,會創建一些臨時的網絡、線程等資源,例如在 Sink 節點會創建 Socket Server,在 Gateway 節點會創建輪詢線程等,浪費了計算節點和 Gateway 節點的資源。此外,Dispatcher 節點是一個 Akka Actor 單點,Pull 數據流程會通過 Dispatcher 節點處理和轉發,加大了 Dispatcher 處理消息負擔。

因此我們考慮將獲取結果鏈改為 Push 機制,一方面可以解決輪詢 Latency 的問題,同時將結果主動 Push 向 Client,也可以避免 Dispatcher 接受請求的單線程瓶頸,Gateway 也不需要創建輪詢線程進行輪詢。但 Push 機制也存在新的挑戰:

  1. Push 模式下數據返回進行流量控制,避免 Client 端數據堆積產生 OOM;
  2. 在原先的機制下,Client 會通過 Dispatcher 獲取當前任務狀態,在 Push 模式下則需要主動將作業狀態返回。

作業結果 Push 架構

為保證 Flink 現有功能架構的穩定性,通過在 JobManager 中新增一個獨立的 NettyServer 提供 Socket 服務,SocketServer 接收 Gateway 創建連接和提交作業,然後將接收到的 JobGraph 對象通過 DispatcherGateway 提交到 Dispatcher,再由 Dispatcher 將 Task 部署到 TaskMananger 中。其中 Sink 算子會與 JM 的 NettyServer 創建單獨的 Channel 連接 Push 結果。JM 的 NettyServer 收到結果後會將結果推給 Gateway。

整個 Push 過程利用 Netty 的 Watermark 機制進行流量控制,發送節點前判斷 Channel 是否可寫,不可寫則進行阻塞操作,形成反壓。只有在接收節點消費數據完成並且 Netty Watermark 恢復到正常值後,發送節點才會恢復結果推送。同時 Sink 節點發送的數據會帶上當前的結果的狀態,用於判斷該 Sink 節點是否完成。之後會在 NettyServer 中彙總這些 Task 的結果狀態,當所有數據發送完成後,NettyServer 向 Gateway 發送任務結束的狀態信息。通知 Gateway 數據已經全部發送完成,該 Query 已經結束。在 50 TM Client 128 併發下空數據測試下,Push 模式將 QPS 從 Pull 模式的 850 提升到 4096,提升了 5 倍左右。

線程優化

通過對 TM CPU 使用的分析發現在高併發情況下,線程的創建能佔用 30% 以上的 CPU。因此通過池化線程池減少線程的創建銷燬,但是池化線程操作會帶來的問題是由於原來的 Cancel 線程只需關心執行線程是否存活,但線程池中線程資源是複用的,因此需要對執行線程進行封裝,維護線程當前執行的 Task,Interrupt 線程在進行 Interrupt 操作時也會判斷中斷的 Task 是否正確,在線程結束後更新狀態用於其他 Cancel 線程判斷當前執行線程的狀態。

鎖優化

在對 Thread 的 Profier 分析中發現存在高併發下 Task 頻繁的搶鎖操作。其中 NetworkBufferPool 用於提供數據傳輸的內存,MemoryPool 為算子計算時提供堆外內存。以 NetworkBufferPool 為例,Task 初始化完成後,LocalBufferPool 為空,所有的網絡內存都需要向 NetworkBufferPool 申請,而 NetworkBufferPool 是整個 TM 唯一的,為了保證一致性,所有的內存申請和釋放都需要申請鎖。以 100 併發, Task 100,TM 為 1 為例,每秒會產生 1w 次的鎖請求。而在 NetworkBufferPool 內存足夠的情況下,可以通過將內存打包進行申請以減少內存申請的次數從而減少鎖搶佔的開銷。

Task 在向內存池申請內存會以 BatchSegment 為單位申請,一個 BatchSegment 封裝了 10 個 Segment。通過批量內存申請將鎖搶佔的次數減少為原來的十分之一。在 10 併發,Task 100,TM 為 1,BatchSegment size 為 10 的情況下,申請 Segment 的鎖佔用請求由 1000/s 變為 100/s,模擬申請 10w 個 Segment 的總耗時由 4353ms,縮短為 793 ms。

內存優化

另外在高 QPS 測試中發現當 JM 運行的作業數量比較多時,JM 會頻繁的觸發 Fullgc,導致作業 Latency 上漲。這種情況通過對 JM 的內存分析可以發現大部分的內存都是用在保存作業的 Metric 信息中,包括作業維度以及 Task 維度的信息。而在 OLAP 的場景下字節內部有單獨的 MetricrePorter,很多 Metric 信息並不需要同步到 JM 端。因此通過 TM 端的 Metric 中加入白名單對 Metric 上報進行過濾 ,只保留需要使用的 Metric,比如TM資源信息、Task io 信息等。在對 Metric 進行過濾後,JM 的 Full GC 頻率降低了 88%,對作業 Latency 的影響顯著減少。

收益

 

完成作業調度和執行優化後,通過對優化後的 Flink 集羣進行 Benchmark 測試。測試結果顯示,QPS 提升顯著。單節點作業 QPS,從原先的 17 提升到現在的 33。WordCount 兩節點作業最高 QPS,從 7.5 提升到 20 左右。Join 作業的 QPS 從 2 提升至 11,Latency 在 32 個線程下,下降明顯。單節點作業的 Latency,從 1.8 秒降低到 200 毫秒左右。WordCount 兩節點作業從 4 秒降低到 1 秒多。Join 作業的 QPS 從原先的 15 秒降為 2.5 秒左右。

在業務收益方面,ByteHTAP 受到了公司核心業務的廣泛認可。目前,已經接入 User Growth、電商、幸福裏、飛書等 12 個核心業務。線上汲取能支持業務複雜查詢高峯期 200 左右的 QPS,Latency P99 控制在 5 秒以下。

未來規劃

 

在公司內部,需要支持多租户的業務場景,該業務場景包含了少量重要租户,冷租户多,租户數量大的特點,存在 Noisy Neghbor,資源利用率低的問題。

針對這類業務會主要在下面幾個維度提高 Serverless 能力:

  1. 通過支持 TaskManager 資源隔離和單節點維度支持計算任務優先級調度解決 Noisy Neghbor 的問題;
  2. 通過支持彈性擴縮容解決資源利用率問題。

在性能提升方面,主要分為三個部分:

  1. 集羣負載性能:JobManager 支持水平擴展、集羣資源利用率優化、Task 調度性能優化;
  2. Flink OLAP 運行時優化:運行時網絡消息優化、資源申請流程優化;
  3. 冷啟動優化:Gateway 冷啟動優化、網絡初始化優化、內存申請流程優化。