自適應批作業排程器:為 Flink 批作業自動推導並行度

語言: CN / TW / HK

▼ 關注「 Apache Flink 」,獲取更多技術乾貨 

01

引言

對大部分使用者來說,為 Flink 運算元配置合適的並行度並不是一件容易的事。對於批作業,小的並行度會導致作業執行時間長,故障恢復慢,而不必要的大並行度會導致資源浪費,任務部署和資料 shuffle 開銷也會變大。

為了控制批作業的執行時長,運算元的並行度應該和其需要處理的資料量成正比。使用者需要通過預估運算元需要處理的資料量來配置並行度。但準確預估運算元需要處理的資料量是一件很困難的事情:需要處理的資料量可能每天都在變化,作業中可能會存在大量的 UDF 和複雜運算元導致難以判斷其產出的資料量。

為了解決這個問題,我們在 Flink 1.15 中引入了一種新的排程器:自適應批作業排程器(Adaptive Batch Scheduler)。自適應批作業排程器會在作業執行時根據每個運算元需要處理的實際資料量來自動推導並行度。它會帶來以下好處:

  1. 大大降低批處理作業併發度調優的繁瑣程度;

  2. 可以根據處理的資料量為不同的運算元配置不同的並行度,這對於之前只能配置全域性並行度的 SQL 作業尤其有益;

  3. 可以更好的適應每日變化的資料量。

02

用法

使 Flink 自動推導運算元的並行度,需要進行以下配置:

  1. 啟用自適應批作業排程器;

  2. 配置運算元的並行度為 -1。

2.1 啟用自適應批作業排程器

啟用自適應批作業排程器,需要進行以下配置:

  1. 配置 jobmanager.sche duler: AdaptiveBatch;

  2. execution.batch-shuffle-mode 配置為 ALL-EXCHANGES-BLOCKING (預設值)。因為目前自適應批作業排程器只支援 shuffle mode 為 ALL-EXCHANGES-BLOCKING 的作業。

此外,還有一些相關配置來指定自動推導的運算元並行度的上下限、預期每個運算元處理的資料量以及 source 運算元的預設並行度,詳情請參閱 Flink 文件 [1]

2.2 配置運算元的並行度為 -1

自適應批作業排程器只會為使用者未指定並行度的運算元(即並行度為預設值 -1)推導並行度。所以需要進行以下配置:

  1. 配置 parallelism.default: -1

  2. 對於 SQL 作業,需要配置 table.exec.resource.default-parallelism: -1

  3. 對於 DataStream/DataSet 作業,避免在作業中通過運算元的 setParallelism() 方法來指定並行度;

  4. 對於 DataStream/DataSet 作業,避免在作業中通過 StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 方法來指定並行度。

03

實現細節

接下來我們將介紹自適應批作業排程器的實現細節。在此之前,我們簡要介紹一下涉及到的一些術語概念:

  1. 邏輯節點( JobVertex [2]  和邏輯拓撲( JobGraph [3] :邏輯節點是為了更優的效能而將幾個運算元連結到一起形成的運算元鏈,邏輯拓撲則是多個邏輯節點連線組成的資料流圖。

  2. 執行節點( ExecutionVertex [4]  和執行拓撲( ExecutionGraph [5] :執行節點對應一個可部署物理任務,是邏輯節點根據並行度進行展開生成的。例如,如果一個邏輯節點的並行度為 100,就會生成 100 個對應的執行節點。執行拓撲則是所有執行節點連線組成的物理執行圖。

以上概念的介紹可以參見 Flink 文件 [6] 。需要注意的是,自適應批作業排程器是通過推導邏輯節點的並行度來決定該節點包含的運算元的並行度的。

實現細節主要包括以下幾部分:

  1. 使排程器能夠收集執行節點產出資料的大小;

  2. 引入一個新元件 VertexParallelismDecider   [7]  來負責根據邏輯節點需要處理的資料量計算其並行度;

  3. 支援動態構建執行拓撲,即執行拓撲從一個空的執行拓撲開始,然後隨著作業排程逐漸新增執行節點;

  4. 引入自適應批作業排程器來更新和排程執行拓撲。

後續章節會對以上內容進行詳細介紹。

圖 1 - 自動推導並行度的整體結構

3.1 收集執行節點產出的資料量

自適應批作業排程器是根據邏輯節點需要處理的資料量來決定其並行度的,因此需要收集上游節點產出的資料量。為此,我們引入了一個 numBytesProduced 計數器來記錄每個執行節點產出的資料分割槽(ResultPartition)的資料量,並在執行節點執行完成時將累計值傳送給排程器。

3.2 為邏輯節點決定合適的並行度

我們引入了一個新元件 VertexParallelismDecider 來負責為邏輯節點計算並行度。計算演算法如下:

假設

  1. V 是使用者配置的期望每個執行節點處理的資料量;

  2. totalB yte non-broadcast   是邏輯節點需要處理的非廣播資料的總量;

  3. totalBytes broadcast   邏輯節點需要處理的廣播資料的總量;

  4. maxBroadcastRatio 是每個執行 節點處理的廣播資料的比例上限;

  5. normalize(x) 是一個輸出與 x 最接近 的 2 的冪的函式。

計算並行度的公式如下:

值得注意的是,我們在這個公式中引入了兩個特殊處理:

  1. 限制每個執行節點處理的廣播資料的比例;

  2. 將並行度調整為 2 的冪。

此外,上述公式不能直接用來決定 source 節點的並行度,因為 source 節點不會消費資料。為了解決這個問題,我們引入了配置選項 jobmanager.adaptive-batch-scheduler.default-source-parallelism ,允許使用者手動配置 source 節點的並行度。請注意,並非所有 source 都需要此選項,因為某些 source 可以自己推導並行度(例如,HiveTableSource,詳情請參閱 HiveParallelismInference),對於這些source,更推薦由它們自己推導並行度。

■ 3.2.1 限制每個執行節點處理的廣播資料的比例

我們在公式限制每個執行節點處理的廣播資料上限比例為 maxBroadcastRatio。 即每個執行節點處理的非廣播資料至少為 (1-maxBroadcastRatio) * V 。如果不這樣做,當廣播資料的資料量接近 V 時,即使非廣播資料的量非常小,也可能會被計算出很大的並行度,這是不必要的,會導致資源浪費和任務部署的開銷變大。

通常情況下,一個執行節點需要處理的廣播資料量會小於要處理的非廣播資料。 因此,我們將 maxBroadcastRatio 預設設定為 0.5。目前,這個值是硬編碼在程式碼中的,我們後續會考慮將其改為可配置的。

■  3.2.2 將並行度調整為 2 的冪

normalize 函式會將並行度調整為最近的 2 的冪,這樣做是為了避免引入資料傾斜。為了更好的理解本節,我們建議您先閱讀 子分割槽動態對映 部分。

以圖 4(b)為例,A1/A2 產生 4 個子分割槽,B 最終被決定的並行度為 3。這種情況下,B1 將消費 1 個子分割槽,B2 將消費 1 個子分割槽,B3 將消費 2 個子分割槽。我們假設不同子分割槽的資料量都相同,這樣 B3 需要消費的資料量是 B1/B2 的 2 倍,從而導致了資料傾斜。

為了解決這個問題,我們需要讓所有下游執行節點消費的子分割槽數量都一樣,也就是說上游產出的子分割槽數量應該是下游邏輯節點並行度的整數倍。為簡單起見,我們希望使用者指定的最大並行度為 2^N(如果不是則會被自動調整到不超過配置值的 2^N),然後將下游邏輯節點的並行度調整到最接近的 2^M(M <= N),這樣就可以保證子分割槽被下游均勻消費。

不過這只是一個臨時的解決方案,最終應該通過 自動負載均衡 來解決,我們將在後續版本中實現。

3.3 動 態構建執行拓撲

在引入自適應批作業排程器之前,執行拓撲是以靜態方式構建的,也就是在排程開始前執行拓撲就被完全創建出來了。為了使邏輯節點並行度可以在執行時決定,執行拓撲需要支援動態構建。

■ 3.3.1 向執行拓撲動態新增節點和邊

動態構建執行拓撲是指一個 Flink 作業從一個空的執行拓撲開始,然後隨著排程逐步附加執行節點,如圖 2 所示。

執行拓撲由執行節點和執行邊(ExecutionEdge)組成。只有在以下情況下,才會將邏輯節點展開建立執行節點並將其新增到執行拓撲:

  1. 對應邏輯節點的並行度已經被確定(以便 Flink 知道應該建立多少個執行節點);

  2. 所有上游邏輯節點都已經被展開(以便 Flink 通過執行邊將新建立的執行節點和上游執行節點連線起來)。

圖 2 - 動態構建執行拓撲

■ 3.3.2 子分割槽動態對映

在引入自適應批作業排程器之前,在部署執行節點時,Flink 需要知道其下游邏輯節點的並行度。因為下游邏輯節點的並行度決定了上游執行節點需要產出的子分割槽數量。以圖 3 為例,下游 B 的並行度為 2,因此上游的 A1/A2 需要產生 2 個子分割槽,索引為 0 的子分割槽被 B1 消費,索引為 1 的子分割槽被 B2 消費。

圖 3 - 靜態執行拓撲消費子分割槽的方式

但顯然,這不適用於動態圖,因為當部署上游執行節點時,下游邏輯節點的並行度可能尚未確定(即部署 A1/A2 時,B 的並行度還未確定)。為了解決這個問題,我們需要使上游執行節點產生的子分割槽數量與下游邏輯節點的並行度解耦。

我們通過以下方法實現解耦:將上游執行節點產生子分割槽的數量設定為下游邏輯節點的最大並行度(最大並行度是一個可配置的固定值),然後在下游邏輯節點並行度被確定後,將這些子分割槽均分給不同的下游執行節點進行消費。也就是說,部署下游執行節點時,每個下游執行節點都會被分配到一個子分割槽範圍來消費。假設 N 是下游邏輯節點並行度,P 是子分割槽的數量。對於第 k 個下游執行節點,消費的子分割槽範圍應該是:

以圖 4 為例,B 的最大並行度為 4,因此 A1/A2 有 4 個子分割槽。然後如果B的確定並行度為 2,則子分割槽對映將為圖 4(a),如果B的確定並行度為 3,則子分割槽對映將為圖 4(b)。

圖 4 - 動態執行拓撲消費子分割槽的方式

3.4 動態更新並排程執行拓撲

自適應批作業排程器排程作業的方式和預設排程器基本相同,唯一的區別是:自適應批作業排程器是從一個空的執行拓撲開始排程,在處理任何排程事件之前,都會嘗試決定所有邏輯節點的並行度,然後嘗試為邏輯節點生成對應的執行節點,並通過執行邊連線上游節點,更新執行拓撲。

排程器會在每次排程之前嘗試按照拓撲順序決定所有邏輯節點的並行度:

  1. 對於 source 節點,其並行度會在開始排程之前就進行確定;

  2. 對於非 source 節點,需要在其所有上游節點資料產出完成後才能確定其並行度。

然後,排程程式將嘗試按照拓撲順序將邏輯節點展開生成執行節點。一個可以被展開的邏輯節點應該滿足以下條件:

  1. 該邏輯節點並行度已確定;

  2. 所有上游邏輯節點都已經被展開。

04

未來展望 - 自動負載均衡

執行批作業時,可能會出現資料傾斜(某個執行節點需要處理的資料遠多於其他執行節點),這會導作業出現長尾現象,拖慢作業的完成速度。如果 Flink 可以自動改善或者解決這個問題,可以給使用者很大的幫助。

一種典型的資料傾斜情況是某些子分割槽的資料量明顯大於其他子分割槽。這種情況可以通過劃分更細粒度的子分割槽,並根據子分割槽大小來平衡工作負載來解決(如圖 5)。自適應批作業排程器的工作可以被認為是邁向它的第一步,因為自動重新平衡的要求類似於自適應批作業排程器,它們都需要動態圖的支援和結果分割槽大小的採集。

基於自適應批作業排程器的實現,我們可以通過增加最大並行度(為了更細粒度的子分割槽)和簡單地更改子分割槽範圍劃分演算法(為了平衡工作負載)來解決上述問題。在目前的設計中,子分割槽範圍是按照子分割槽的個數來劃分的,我們可以改成按照子分割槽中的資料量來劃分,這樣每個子分割槽範圍內的資料量可以大致相同,從而平衡下游執行節點的工作量。

圖 5 - 自動負載均衡

註釋

[1]  https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/elastic_scaling/#adaptive-batch-scheduler

[2]  https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java

[3]  https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java

[4]  https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java

[5]  https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

[6]  https://nightlies.apache.org/flink/flink-docs-master/zh/docs/internals/job_scheduling/#jobmanager-資料結構

[7]  https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java

往期精選

▼ 關注「 Apache Flink 」,獲取更多技術乾貨 

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群~

點選「閱 讀原文 」,進入 Flink 中文學習網~