Flink資源排程模型

語言: CN / TW / HK

作者:王剛,騰訊CSIG高階工程師

Flink 資源模型 / 排程設計

背景知識

首先,我們來簡單回顧一下 Flink 作業的執行時模型,然後再來探討在這種執行模型下,Flink 的資源模型和排程架構的設計和實現。

我們引用官網非常經典的一張圖,來說明一個 Flink 流作業簡化後的執行檢視。

Tasks 和 Operator Chains (部分譯自官網)

我們知道,一個 Flink 作業可以看做是由 Operators 組成的 DAG,一個 Operator 代表對資料流的進行的某個資料變化操作( Sources 和 Sinks 也是代表資料流流入和資料流流出的特殊Operator )。在實際的分散式執行中,Flink 會把符合聚合規則的相鄰 Operator 的 SubTask 聚合成 Tasks,每一個 Task 都會被單獨的執行緒執行。這種把多個 Operator 的 SubTask 聚合成 Tasks 優化通常非常有效:能有效減少執行緒間切換(相比單獨的每個operator的每個subtask佔用一個執行緒)、資料快取的成本,從而在降低資料處理延遲的同時增加系統的吞吐。

下圖代表了資料流在 Operator Chain 之後,會實際產生 5 個 SubTask,相應的需要 5 個併發執行緒來處理該資料流。

在此,我們簡要的區分下 Task 和 SubTask 的異同:

Task Task 是 Flink Runtime 的執行的基本單元,Task 封裝了一個 Operator 或 Operator Chain 的某一併行例項
SubTask 一個 SubTask 是負責處理某一資料流的一部分的 Task,SubTask 術語強調對於同一個 Operator 或 Operator Chain 這裡有多個並行的 Tasks。

所以,一個 Flink 的作業,最終會轉化為一個個 Task 在叢集上執行。我們接下來從 Task 執行維度分析,一層層來看 Flink 的資源模型設計。

資源模型

首先,我們介紹 Flink 基本的幾個執行時概念。

Flink Job:

Flink程式提交到Flink Cluster執行後,會生成一個或者多個Flink jobs。根據上文的介紹,我們知道一個Flink job其實是資料流變換的執行時抽象。具體來講,是由operator或者operator-chain組成的一個個Task進行資料處理的有向圖。

Flink Cluster:

一個 Flink Cluster 一般是由一個 JobManager 和多個 TaskManager 組成的分散式系統。

JobManager:

JobManager 是 Flink Cluster 資源的編排器,負責協調 Flink Application 的分散式執行,具體職責有:  

1) 決定什麼時候排程下一個 Task  

2)處理 Task 執行結束或者失敗的情形

3)協調 Checkpoint 的觸發和執行  

4)協調 Flink Job 在發生失敗時的恢復行為  

5)其它情形。JobMananger 程序主要由三個不同的元件組成:

  • Flink ResourceManager

    ResourceManager 負責 Flink Clutster 資源的分配和回收工作,它管理著 Flink Cluster 的基本資源排程單元 Task Slots。Flink 針對不同的資源環境和執行環境(YARN、Kubernetes 和 standalone 模式等),有不同的 ResourceManager 實現。

  • Flink Dispatcher

    Dispatcher 提供了提交 Flink 應用的 REST 介面,並且負責為每一個提交的 Job 啟動一個新的 JobMaster。另外,Dispatcher還提供了用來查詢 Job 執行狀態的 Flink WebUI。

  • Flink JobMaster

    JobMaster,顧名思義,負責管理一個單獨的 Flink Job的執行。多個 Flink Jobs 可以同時執行在一個 Flink Cluster 中,每一個 Flink Job 都會有一個對應的 JobMaster。

TaskManager:

TaskManager 負責執行組成 Job 的 Tasks,並且會負責資料流之間的資料轉發和快取工作。Flink 執行時,必須有至少一個 TaskManager。一個 Task Manager 可能會被劃分成多個 Slots,Slot 是 TaskManager 資源的一個子集, 也是 TaskManager 中最小的資源排程單位,Slot 的概念貫徹了資源排程過程的始終。

下面引用官網的一段材料來說明 Task Slot 和 Task 執行之間的關係。

Task Slots 和資源(摘自官網)

每個 TaskManager都是一個 JVM 程序 ,可以在單獨的執行緒中執行一個或多個 SubTask。為了控制一個 TaskManager 中接受多少個 Task,就有了所謂的 Task Slots (至少一個)。

每個 Task Slot 代表 TaskManager 中資源的固定子集。例如,具有 3 個 Slot 的 TaskManager,會將其託管記憶體 1/3 用於每個 Slot。分配資源意味著 subtask 不會與其他作業的 subtask 競爭託管記憶體,而是具有一定數量的保留託管記憶體。注意此處沒有 CPU 隔離;當前 Slot 僅分離 Task 的託管記憶體通過調整 Task Slot 的數量,使用者可以定義 subtask 如何互相隔離。每個 TaskManager 有一個 Slot,這意味著每個 Task 組都在單獨的 JVM 中執行(例如,可以在單獨的容器中啟動)。具有多個 Slot 意味著更多 subtask 共享同一 JVM。同一 JVM 中的 Task 共享 TCP 連線(通過多路複用)和心跳資訊。它們還可以共享資料集和資料結構,從而減少了每個 Task 的開銷。

預設情況下,Flink 允許 SubTask 共享 Slot,即便它們是不同的 Task 的 SubTask,只要是來自於同一作業即可。結果就是一個 Slot 可以持有整個作業管道。允許 Slot 共享 有兩個主要優點:

Flink 叢集所需的 Task Slot 和作業中使用的最大並行度恰好一樣。無需計算程式總共包含多少個 Task(具有不同並行度)。

容易獲得更好的資源利用。如果沒有 Slot 共享,非密集 subtask( source/map() ) 將阻塞和密集型 subtask( window ) 一樣多的資源。通過 Slot 共享,我們示例中的基本並行度從 2 增加到 6,可以充分利用分配的資源,同時確保繁重的 subtask 在 TaskManager 之間公平分配。

排程模型

該小結部分內容引自 深入解讀Flink資源管理機制 [4]

概覽

Flink 的資源排程是一個典型的兩層模型。其中從 Cluster 到 Job 的分配過程是由 Slot Manager 來完成,Job 內部分配給 Task 資源的過程則是由 Scheduler 來完成。如下圖,Scheduler 向 Slot Pool 發出 Slot Request(資源請求),Slot Pool 如果不能滿足該資源需求則會進一步請求 Resource Manager,具體來滿足該請求的元件是 Slot Manager。

Flink Cluster 到 Flink Job 資源排程過程

如下圖,Cluster 到 Job 的資源排程過程中主要包含兩個過程。

  • Slot Allocation(下圖紅色箭頭)Scheduler 向 Slot Pool 傳送請求,如果 Slot 資源足夠則直接分配,如果 Slot 資源不夠,則由 Slot Pool 再向 Slot Manager 傳送請求(此時即為 Job 向 Cluster 請求資源),如果 Slot Manager 判斷叢集當中有足夠的資源可以滿足需求,那麼就會向 Task Manager 傳送 Assign 指令,Task Manager 就會提供 Slot 給 Slot Pool,Slot Pool 再去滿足 Scheduler 的資源請求。

  • Starting TaskManagers(下圖藍色箭頭)在 Active Resource Manager 資源部署模式下,當 Resource Manager 判定 Flink Cluster 中沒有足夠的資源去滿足需求時,它會進一步去底層的資源排程系統請求資源,由排程系統把新的 Task Manager 啟動起來,並且 TaskManager 向 Resource Manager 註冊,則完成了新 Slot 的補充。

Flink Job 到 Task 排程過程

JobMaster 中的 Scheduler 元件,會根據 Execution Graph和 Task 的執行狀態,決定接下來要排程的 Task。

我們已經知道 Flink 是通過 Task Slots 來定義執行資源。每個 TaskManager 有一到多個 task slot,每個 task slot 可以執行一條由多個並行 task 組成的流水線。這樣一條流水線由多個連續的 task 組成,比如並行度為 n 的 MapFunction 和 並行度為 n 的 ReduceFunction。需要注意的是 Flink 經常併發執行連續的 task,不僅在流式作業中到處都是,在批量作業中也很常見。

下圖很好的闡釋了這一點,一個由資料來源、 MapFunction ReduceFunction 組成的 Flink 作業,其中資料來源和 MapFunction 的並行度為 4 ,ReduceFunction 的並行度為 3 。流水線由一系列的 Source - Map - Reduce 組成,執行在 2 個 TaskManager 組成的叢集上,每個 TaskManager 包含 3 個 slot,整個作業的執行如下圖所示。

Flink 內部通過 SlotSharingGroup [5] 和 CoLocationGroup [6] 來定義哪些 Task 可以共享一個 Slot, 哪些 Task 必須嚴格放到同一個 Slot。

注:本文主要內容收集整理自Flink官網和公開的技術部落格

參考資料

[1] Flink 官網--Flink Architecture: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/  

[2] Flink 官網--Jobs and Scheduling: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/  

[3] Flink 官網--Task Lifecycle: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/task_lifecycle/  

[4] 深入解讀 Flink 資源管理機制: https://www.infoq.cn/article/tnq4vystluqfkqzczesa  

[5] SlotSharingGroup: https://github.com/apache/flink/blob/release-1.15//flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java  

[6] CoLocationGroup: https://github.com/apache/flink/blob/release-1.15//flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java  

掃碼加入 流計算 Oceanus 產品交流群:point_down:

流計算 Oceanus  限量秒殺專享活動火爆進行中↓↓

掃碼關注 「騰訊雲大資料」 ,瞭解騰訊雲流計算 Oceanus 更多資訊 ~

騰訊雲大資料

長按二維碼
關注我們