位元組跳動 Flink 基於 Slot 的資源管理實踐

語言: CN / TW / HK

總體介紹

眾所周知,Flink 在提交和執行 Flink 作業時,需要配置 Flink 資源資訊,包括 TaskManager 的數量,每個 TaskManager 的 CPU 數、記憶體大小以及 Slot 數量。TaskManager 的數量,每個 TaskManager 的 CPU 數、記憶體大小都比較容易理解,主要是配置啟動的計算程序數以及每個程序繫結的物理資源大小。

那麼 Slot 是什麼?為什麼需要在 Flink 作業啟動時配置?

一言以蔽之,Slot 是 Flink 叢集管理資源的最小單位,也是 Flink 作業申請和釋放資源的單位。本文主要分析 Flink 基於 Slot 資源管理、作業資源申請以及釋放流程。

閱讀提示:位元組跳動內部目前主要使用 Flink-1.11 版本,所以本文分析的 Flink Slot 資源管理實現部分內容將基於此版本展開。

資源流程分析

Flink 作業在執行過程中,整個 Flink 叢集其實分為四個角色節點,分別為 Dispatcher、JobMaster、ResourceManager 以及 TaskManager,其中 Dispatcher、JobMaster 以及 ResourceManager 在同一個程序內啟動和執行。

  • Dispatcher 接收各類查詢請求,例如作業的各類 Metrics 等;
  • JobMaster 是作業的 AM,管理作業的執行狀態;
  • ResourceManager 管理 Flink 叢集的資源和資源分配;
  • TaskManager 管理 Flink 計算任務的執行。

Flink 作業被提交到資源管理器 (Yarn/K8s) 後,資源管理器根據作業所需的資源配置(多少個 TaskManager,每個 TaskManager 分配多少 CPU/記憶體)為作業分配資源,並啟動對應數量的 TaskManager 程序。

TaskManager 程序啟動後,向 ResourceManager 節點註冊資訊,其中最關鍵的資訊就是 Slot。

TaskManager 根據配置的每個 TaskManager 的 Slot 數,向 ResourceManager 彙報 Slot,而在 ResourceManager 節點內維護和管理所有的 Slot 列表。我們可以簡單地將 Slot 理解為資源槽,這個資源槽會在 TaskManager 上跟作業具體的計算任務關聯。

Slot 是一個邏輯概念,不會跟具體的 CPU 核數繫結,例如一個 TaskManager 綁定了 N 個 CPU 和配置了 M 個 Slot,N 和 M 之間沒有任何關聯。

但是 Slot 跟記憶體資源相關,我們知道 TaskManager 啟動時會指定程序的總記憶體大小,這塊的記憶體會被分為堆內記憶體、堆外記憶體,其中堆外記憶體又被分為 Managed Memory 和 Direct Memory,對具體記憶體劃分有興趣的小夥伴可以通Flink 記憶體模型詳細瞭解。

這裡我們要說的是 Managed Memory,這部分記憶體不會預先分配,但是會按照 Slot 劃分大小。簡單來講,就是將 TaskManager 中 Managed Memory 總大小除以 Slot 的數量,就是每個 Slot 可以使用的 Managed Memory 大小。

TaskManager 的每個 Slot 關聯多個計算任務,每個計算任務由獨立的 Java 執行緒執行,所以多個計算執行緒會跟一個 Slot 關聯,也就是多個計算執行緒會共享一個 Managed Memory 記憶體。

Slot 申請流程

上文提到,TaskManager 根據配置的 Slot 數量,會向 ResourceManager 彙報它上面的 Slot 資料。ResourceManager 節點在內部維護 TaskManager 列表,每個 TaskManager 分別有哪些 Slot 以及目前空閒的 Slot 集合。

Flink 叢集中的每個 Flink 作業會有一個 JobMaster 節點,JobMaster 節點將 Flink 作業解析成物理執行計劃,向 ResourceManager 申請 Slot 資源,同時管理作業中每個計算任務的執行狀態。當一個作業提交到 Flink 集群后,Slot 資源總體申請流程如下圖所示。

  1. TaskManager 向 ResourceManager 彙報 Slot 資源資料;
  2. JobMaster 向 ResourceManager 發起 Slot 資源申請;
  3. ResourceManager 根據 Free Slots 集合為 Slot 申請分配 Slot,然後向 TaskManager 發起資源確認;
  4. TaskManager 在程序內將指定 Slot 標記為指定作業使用,然後向 JobMaster 彙報 Slot 資源分配資訊;
  5. JobMaster 接收到 Slot 資源分配資訊後,將 Slot 資源跟作業中的計算任務關聯;
  6. JobMaster 將計算任務描述資訊傳送到 Slot 所屬的 TaskManager,部署計算任務。

ResourceManager

ResourceManager 在 Slot Pool Manager 中管理和維護 TaskManager 以及對應的 Slot 集合,每個 Slot 有三種狀態:FREE、PENDING 和 ALLOCATED。

ResourceManager 處理 Slot 申請是一個非同步過程,ResourceManager 接收到 Slot 申請後會先將請求放入到 Pending 列表中,然後給這些請求分配 Slot,最後向 TaskManager 傳送請求確認資源申請,完成確認後會更新分配的 Slot 狀態。主要流程和操作如下圖所示。

JobMaster

JobMaster 申請資源也是一個非同步申請加回調確認的過程,主要通過 Slot Pool 管理和實現資源申請。

Slot Pool 中管理4類 Slot 相關資料結構,分別為 waitingForResourceManager 列表結構、pendingRequests 列表結構、AvailableSlots 結構和 AllocatedSlots 結構。

1、waitingForResourceManager 資料

Flink 作業的 JobMaster 根據每個計算任務,生成一個 Slot 申請請求,並放入到一個 waitingForResourceManager 的請求列表內。這裡需要注意的是會有 Slot 共享的問題,如果多個計算任務共享同一個 Slot,那麼這些計算任務只會生成一個 Slot 申請請求。

每個 Slot 請求會生成唯一的 AllocationID,該 ID 會由 ResourceManager 傳送給 TaskManager,並最終返回給 JobMaster。

當 JobMaster 跟 ResourceManager 建立連線時,從 waitingForResourceManager 中遍歷獲取每個 Slot 請求,然後逐個向 ResourceManager 傳送 Slot 申請,同時將每個 Slot Request 放入到 Pending Request 列表中。

2、PendingRequests 資料

在上文我們提到,JobMaster 向 ResourceManager 傳送申請 Slot 請求後,由於 ResourceManager 的非同步申請機制,ResourceManager 並不會直接返回申請到的 Slot 資料,所以 JobMaster 會將 Slot 請求放入到 Pending Request 等待回撥。

當 JobMaster 接收到 TaskManager 的 offerSlots 請求獲取到申請的 Slot 資訊時,才真正完成 Slot 的申請,在 offerSlots 請求中包含分配給該作業的 Slot 列表。JobMaster 遍歷每個 offer Slot,執行每個 Slot 的分配操作,具體可以分為以下幾個步驟

  1. 根據上文中每個請求的 AllocationID 從 pendingRequests 中移除指定的 Request;

  2. 通過非同步回撥,將 Slot 分配給指定的計算任務(多個計算任務共享,則會分配多個計算任務);

  3. 在 AllocatedSlots 資料結構中增加分配的 Slot 資訊。

3、AllocatedSlots AvailableSlots 資料

AllocatedSlots 存放已經被分配給計算任務的 Slot 資訊列表,AvailableSlots 存放還未被分配給計算任務的 Slot 資訊列表。

由於存在超時重新申請等異常情況,例如 JobMaster 申請 Slot 超時重新發起申請請求,所以存在 TaskManager 向 JobMaster 返回的 offerSlots 根據 AllocationID 在 pendingRequests 中找不到對應請求,或者在 LazyFromSource 過程中上游計算任務執行完成需要釋放 Slot 等情況,所以會將這些未被分配的 Slot 放入到 AvailableSlots 中。

當作業需要新的 Slot 分配給指定的計算任務時,會優先從 AvailableSlots 中查詢 Slot 資源,只有未找到才會向 ResourceManager 發起請求。

在 AvailableSlots 中的每個 Slot 會帶有一個時間戳,後臺執行緒會定時檢查 AvailableSlots 中的每個 Slot,如果時間戳和當前時間超過一定閾值,該 Slot 會被主動釋放掉,避免資源洩漏。

JobMaster 中 Slot 資源申請操作流程如下圖所示。

上述流程主要基於1.11版本分析 Slot 資源申請流程,在最新發布的1.14版本中 Flink 實現了 Declarative 資源申請,總體流程也是走 JM->RM->TM,但是具體實現有比較大的簡化。

TaskManager

TaskManager 中有兩個資料結構跟作業申請資源相關:TaskSlotTable 和 JobTable。TaskSlotTable 管理 TaskManager 中的 Slot 以及跟計算任務之間的關係,主要包含以下幾類資料

  1. TaskManager 中的 Slot 數量;
  2. taskSlots,根據 Slot 索引號管理該 Slot 的狀態(TaskSlot),TaskSlot 裡包含該 Slot 的計算任務列表等資料;
  3. allocatedSlots,根據 AllocationID 管理該 Slot 的狀態(TaskSlot);
  4. taskSlotMappings,根據計算任務的 ID(ExecutionAttemptID) 管理計算任務和 TaskSlot;
  5. slotsPerJob,根據 JobID 管理屬於該 Job 的 AllocationID 集合。

JobTable 管理和 JobMaster 的連線資訊,當 TaskManager 獲取到指定作業的 Slot 申請時,根據 JobMaster 的地址跟 JobMaster 建立連線,向 JobMaster 註冊,並將連線資訊儲存到 JobTable 中。

TaskManager 在接收到 ResourceManager 傳送過來的 Slot 申請後,會對 Slot 申請進行處理並更新 TaskSlotTable,在這個過程中會將 Slot 申請加入到定時檢查中,釋放超時未分配成功的 Slot 資源。具體流程如下圖所示。

這裡比較重要的一個流程是接收到指定作業的 Slot 申請後,會跟作業建立連線,然後將 TaskManager 註冊到 JobMaster,JobMaster 接收到註冊資訊後會跟 TaskManager 建立連線和心跳監控,TM 和 JM 的心跳只監控連通性,相關流程如下圖所示。

計算任務部署流程

JobMaster 將 Slot 資源分配給計算任務後,生成計算任務的部署資訊,部署資訊裡包含作業資訊、計算任務資訊、上下游 Shuffle 資訊以及計算任務所部署的 Slot 索引號資訊等,然後 JobMaster 將部署資訊傳送給指定的 TaskManager。

TaskManager 接收到計算任務部署資訊後,對計算任務進行校驗、部署和執行,這個過程涉及到 TaskSlotTable 以及 JobTable 操作,具體流程如下圖所示。

TaskSlot 有三個狀態

  1. ACTIVE:正在被指定的作業使用;
  2. ALLOCATED:建立時的初始狀態,為某個作業建立,但是還沒被使用;
  3. RELEASING:正在被釋放中。

在 TaskSlot 建立時,會初始化一個 MemoryManager,管理 Slot 中所有計算任務申請和釋放 Managed Memory,共用 TaskSlot 的所有計算任務共享 MemoryManager,TaskSlot 管理了所有在上面執行的 Task 列表。

任務結束和 Slot 釋放

TaskManager 中的計算任務完成計算後,會釋放該計算任務申請和使用的資源,涉及到 Slot 相關的主要以下幾個操作:

  1. 釋放 MemoryManager 中指定計算任務申請的記憶體分片;
  2. 從 TaskSlotTable 中移除管理的計算任務。

完成上述操作後,會向 JobMaster 傳送計算任務更新。JobMaster 收集到所有計算任務的更新訊息後,完成作業執行並跟 ResourceManager 斷開連線,然後遍歷申請到的 Slot 並向指定的 TaskManager 傳送資源釋放請求。

TaskManager 接受到 Slot 釋放請求後,會從 TaskSlotTable 移除指定的 Slot 資訊並向 ResourceManager 釋放 Slot 資訊,如果作業所有 Slot 都被刪除,會關閉跟 JobMaster 的連線。TaskManager 處理的總體流程如下圖所示。

ResourceManager 接收到指定 Slot 釋放請求後,會從資源申請列表中查詢與該 Slot 匹配的申請請求並處理,若申請列表沒有請求則將 Slot 放入到空閒 Slot 列表中。

資源管理優化

開源社群在1.11版本之後,對資源申請和釋放流程具體的程式碼實現做過比較大的重構和優化。在資源管理和流程實現上,主要是支援細粒度資源管理(FLIP-56: Dynamic Slot Allocation - Apache Flink - Apache Software Foundation)和宣告式資源申請(FLIP-138: Declarative Resource management - Apache Flink - Apache Software Foundation)。

細粒度資源管理

在上文我們提到,TaskManager 會將 ManagedMemory 會按照裡面的 Slot 進行等分,這會帶來資源浪費。

作業的每個計算節點都可以設定不同的併發度,所以每個 Slot 內部執行的計算任務型別和數量有可能是不同的。這意味著每個 Slot 的計算任務所需的記憶體資源會存在比較大的不同,比如有些 Slot 有 Join 等計算任務,有些 Slot 沒有。

原先按照 Slot 平均劃分記憶體大小的方式會造成資源浪費,為了提升資源使用率, Flink 1.14 版本開始支援細粒度資源申請。

JobMaster 向 ResourceManager 申請 Slot 時,會向 ResourceManager 指定資源數量,包括 CPU、記憶體等。ResourceManager 根據當前維護的資源列表,為作業分配指定資源的 Slot,同時向指定的 TaskManager 傳送請求。TaskManager 接收到帶有資源資訊的 Slot 申請後,會建立 Slot,並向 JobMaster 確認資源申請。

所以從 Slot 總體申請流程上,新版本跟原先的處理是相同的,這塊主要是支援在 Slot 申請時帶上相應的資源資訊,ResourceManager 會根據管理的 TaskManager 剩餘資源資訊為計算任務分配 TaskManager。

宣告式資源申請

在上面的 Slot 申請流程中,一個 Flink 作業會申請若干個 Slot 資源,但是在申請過程中是按照單個 Slot 獨立申請的。Flink 作業,特別是流式作業,通常需要完成所有所需的 Slot 資源申請後,作業才能正常執行。

所以 Flink 新版本中支援了宣告式資源申請,JobMaster 向 ResourceManager 申請資源時,會將所需的多個 Slot 打包成一個 Batch,向 ResourceManager 發起資源申請。ResourceManager 接收到作業的多個 Slot 申請後,會處理 SlotManager 中管理的資源,然後根據 Slot 逐個向指定的 TaskManager 發起資源請求。

每個作業所需的 Slot 數量,目前是在 JobMaster 資源申請時進行打包處理,後續可能會根據 JobGraph 執行計劃中每個計算節點的併發度直接計算。

總結

總體上來講,Flink 整個資源管理、申請和分配圍繞 Slot 展開,同時每個 TaskManager 中的 Slot 數量決定了作業在該 TaskManager 中執行的併發計算任務數量。本篇文章主要介紹了 Slot 對資源分配、釋放以及計算執行的影響,希望可以幫助大家更好地決策每個 TaskManager 中的 Slot 數量,對 Flink 作業進行調優。

目前,位元組跳動流式計算團隊同步支援的火山引擎流式計算 Flink 版正在公測中,支援雲中立模式,支援公共雲、 混合雲 及多雲部署,全面貼合企業上雲策略,歡迎申請試用:www.volcengine.com/product/flink

獲取更多產品資訊,歡迎關注公眾號【位元組跳動雲原生計算】