Flink 執行引擎:流批一體的融合之路

語言: CN / TW / HK

簡介: 本文由 Apache Flink Committer 馬國維分享,主要介紹 Flink 作為大資料計算引擎的流批一體融合之路。

本文由 Apache Flink Committer 馬國維分享,主要介紹 Flink 作為大資料計算引擎的流批一體融合之路。內容包括:

1、背景
2、流批一體的分層架構
3、流批一體DataStream
4、流批一體DAG Scheduler
5、流批一體的Shuffle架構
6、流批一體的容錯策略
7、未來展望

一、背景

隨著網際網路和移動網際網路的不斷髮展,各行各業都積累海量的業務資料。而企業為了改善使用者體驗,提升產品在市場上的競爭力,都採取了實時化方式來處理大資料。社交媒體的實時大屏、電商的實時推薦、城市大腦的實時交通預測、金融行業的實時反欺詐,這些產品的成功都在說明大資料處理的實時化已經成為一個勢不可擋的潮流。

在實時化的大趨勢下,Flink 已經成為實時計算行業的事實標準。我們看到,不光是阿里巴巴,國內外各個領域的頭部廠商,都把 Flink 做為實時計算的技術底座,國內有位元組跳動、騰訊、華為,國外有 Netflix、Uber 等等。

而業務實時化只是一個起點,Flink 的目標之一就是給使用者提供實時離線一體化的使用者體驗。其實很多使用者不僅需要實時的資料統計,為了確認運營或產品的策略的效果,使用者同時還需要和歷史(昨天,甚至是去年的同期)資料比較。而從使用者的角度來看,原有的流、批獨立方案存在一些痛點:

  • 人力成本比較高。由於流和批是兩套系統,相同的邏輯需要兩個團隊開發兩遍。
  • 資料鏈路冗餘。在很多的場景下,流和批計算內容其實是一致,但是由於是兩套系統,所以相同邏輯還是需要執行兩遍,產生一定的資源浪費。
  • 資料口徑不一致。這個是使用者遇到的最重要的問題。兩套系統、兩套運算元,兩套 UDF,一定會產生不同程度的誤差,這些誤差給業務方帶來了非常大的困擾。這些誤差不是簡單依靠人力或者資源的投入就可以解決的。

2020 年的雙十一,在實時洪峰到達 40 億的歷史新高的同時,Flink 團隊與 DT 團隊一起推出了基於 Flink 的全鏈路流批一體的數倉架構,很好解決了 Lambda 的架構所帶來的一系列問題:流批作業使用同一 SQL,使研發效率提升了 3~4 倍;一套引擎確保了資料口徑天然一致;流批作業在同一叢集執行,削峰填谷大幅提升了資源效率。

Flink 流批一體的成功,離不開 Flink 開源社群的健康蓬勃發展。從 Apache 軟體基金會 2020 年度報告可以看出,在反映開源社群繁榮情況的三個關鍵指標上 Flink 都名列前茅:使用者郵件列表活躍度,Flink 排名第一;開發者提交次數 Flink 排名第二,Github 使用者訪問量排名第二。這些資料並不侷限於大資料領域,而是 Apache 開源基金會下屬的所有專案。

2020 年也是 Blink 反哺社群的第二年,這兩年我們把 Blink 在集團內積累的經驗逐步貢獻回社群,讓 Flink 成為真正意義上的流批一體平臺。我希望通過這篇文章給大家分享下這兩年 Flink 在執行引擎流批融合方面做了哪些工作。同時也希望 Flink 的老使用者和新朋友可以進一步瞭解 Flink 流批一體架構的“前世今生”。

二、流批一體的分層架構

總體來說,Flink 的核心引擎主要分為如下三層:

  • SDK 層。Flink 的 SDK 主要有兩類,第一類是關係型 Relational SDK 也就是 SQL/Table,第二類是物理型 Physical SDK 也就是 DataStream。這兩類 SDK 都是流批統一,即不管是 SQL 還是 DataStream,使用者的業務邏輯只要開發一遍,就可以同時在流和批的兩種場景下使用;
  • 執行引擎層。執行引擎提供了統一的 DAG,用來描述資料處理流程 Data Processing Pipeline(Logical Plan)。不管是流任務還是批任務,使用者的業務邏輯在執行前,都會先轉化為此 DAG 圖。執行引擎通過 Unified DAG Scheduler 把這個邏輯 DAG 轉化成在分散式環境下執行的Task。Task 之間通過 Shuffle 傳輸資料,我們通過 Pluggable Unified Shuffle 架構,同時支援流批兩種 Shuffle 方式;
  • 狀態儲存。狀態儲存層負責儲存運算元的狀態執行狀態。針對流作業有開源 RocksdbStatebackend、MemoryStatebackend,也有商業化的版本的GemniStateBackend;針對批作業我們在社群版本引入了 BatchStateBackend。

本文主要分享如下幾個方面的內容:

  1. 流批一體的 DataStream 介紹瞭如何通過流批一體的 DataStream 來解決 Flink SDK 當前面臨的挑戰;
  2. 流批一體的 DAG Scheduler 介紹瞭如何通過統一的 Pipeline Region 機制充分挖掘流式引擎的效能優勢;如何通過動態調整執行計劃的方式來改善引擎的易用性,提高系統的資源利用率;
  3. 流批一體的 Shuffle 架構介紹如何通過一套統一的 Shuffle 架構既可以滿足不同 Shuffle 在策略上的定製化需求,同時還能避免在共性需求上的重複開發;
  4. 流批一體的容錯策略介紹瞭如何通過統一的容錯策略既滿足批場景下容錯又可以提升流場景下的容錯效果。

三、流批一體 DataStream

SDK 分析以及面臨的挑戰

如上圖所示,目前 Flink 提供的 SDK 主要有三類:

  1. Table/SQL 是一種 Relational 的高階 SDK,主要用在一些資料分析的場景中,既可以支援 Bounded 也可以支援 Unbounded 的輸入。由於 Table/SQL 是 Declarative 的,所以系統可以幫助使用者進行很多優化,例如根據使用者提供的Schema,可以進行 Filter Push Down 謂詞下推、按需反序列二進位制資料等優化。目前 Table/SQL 可以支援 Batch 和 Streaming 兩種執行模式。[1]
  2. DataStream 屬於一種 Physical SDK。Relatinal SDK 功能雖然強大,但也存在一些侷限:不支援對 State、Timer 的操作;由於 Optimizer 的升級,可能導致用相同的 SQL 在兩個版本中出現物理執行計劃不相容的情況。而 DataStream SDK,既可以支援 State、Timer 維度 Low Level 的操作,同時由於 DataStream 是一種 Imperative SDK,所以對物理執行計劃有很好的“掌控力”,從而也不存在版本升級導致的不相容。DataStream 目前在社群仍有很大使用者群,例如目前未 Closed 的 DataStream issue 依然有近 500 個左右。雖然 DataStream 即可以支援 Bounded 又可以支援 Unbounded Input 用 DataStream 寫的 Application,但是在 Flink-1.12 之前只支援 Streaming 的執行模式。
  3. DataSet 是一種僅支援 Bounded 輸入的 Physical SDK,會根據 Bounded 的特性對某些運算元進行做一定的優化,但是不支援 EventTime 和 State 等操作。雖然 DataSet 是 Flink 提供最早的一種 SDK,但是隨著實時化和資料分析場景的不斷髮展,相比於 DataStream 和 SQL,DataSet 在社群的影響力在逐步下降。

目前 Table/SQL 對於流批統一的場景支援已經比較成熟,但是對於 Phyiscal SDK 來說還面臨的一些挑戰,主要有兩個方面:

  1. 利用已有 Physical SDK 無法寫出一個真正生產可以用的流批一體的 Application。例如使用者寫一個程式用來處理 Kafka 中的實時資料,那麼利用相同的程式來處理儲存在 OSS/S3/HDFS 上的歷史資料也是非常自然的事情。但是目前不管是 DataSet 還是 DataStream 都無法滿足使用者這個“簡單”的訴求。大家可能覺得奇怪,DataStream 不是既支援 Bounded 的 Input 又支援 Unbounded 的 Input,為什麼還會有問題呢?其實“魔鬼藏在細節中”,我會在 Unified DataStream 這一節中會做進一的闡述。
  2. 學習和理解的成本比較高。隨著 Flink 不斷壯大,越來越多的新使用者加入 Flink 社群,但是對於這些新使用者來說就要學習兩種 Physical SDK。和其他引擎相比,使用者入門的學習成本是相對比較高的;兩種 SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對於使用者來說,理解兩套機制的門檻也不小;由於這兩 SDK 還不相容,一個新使用者一旦選擇錯誤,將會面臨很大的切換成本。

Unified Physical SDK

為了解決上述 Physical SDK 所面臨的挑戰,我們把 Unified DataStream SDK 作為 Flink 統一的 Physical SDK。這個部分主要解決兩個問題:

  1. 為什麼選擇 DataStream 作為 Unified Physical SDK?
  2. Unified DataStream 比“老”的 DataStream 提供了哪些能力讓使用者可以寫出一個真正生產可以用的流批一體 Application?

為什麼不是 Unified DataSet

為了解決學習和理解成本比較高的問題,最自然最簡單的方案就是從 DataStream 和 DataSet 中選擇一個作為 Flink 的唯一的 Physical SDK。那為什麼我們選擇了 DataStream 而不是 DataSet 呢?主要有兩個原因:

  1. 使用者收益。在前邊已經分析過,隨著 Flink 社群的發展,目前 DataSet 在社群的影響力逐漸下降。如果選擇使用 DataSet 作為 Unified Physical SDK,那麼使用者之前在 DataStream 大量“投資”就會作廢。而選擇 DataStream,可以讓許多使用者的已有 DataStream “投資”得到額外的回報;
  2. 開發成本。DataSet 過於古老,缺乏大量對於現代實時計算引擎基本概念的支援,例如 EventTime、Watermark、State、Unbounded Source 等。另外一個更深層的原因是現有 DataSet 運算元的實現,在流的場景完全無法複用,例如 Join 等。而對於 DataStream 則不然,可以進行大量的複用。那麼如何在流批兩種場景下複用 DataStream 的運算元呢?

Unified DataStream

很多對 Flink 有一定了解的使用者可能會問:DataStream 是同時支援 Bounded/Unbounded 的輸入,為什麼我們會說:用 DataStream 無法寫出一個真正生產可以用的流批一體 Application 呢?簡單來說,DataStream 原本主要設計目標是給 Unbounded 場景使用的,所以導致在 Bounded 的場景下在效率、可用性、易用性方面和傳統的批引擎還有一定距離。具體來說體現在如下兩個方面:

- 效率

先給大家看一個例子,下邊是一個跑同樣規模的 WordCount 的作業,DataStream 和 DataSet 的效能對比圖。從這個例子可以看出,DataSet 的效能是 DataStream 將近 5 倍。

很明顯,要讓 DataStream 在生產中既可以支援流的場景又要支援批的場景,就一定要大幅提高 DataStream 在 Bounded 場景下效率。那麼為什麼 DataStream 的效率要比 DataSet 的效率低呢?

前面我們已經提到,DataStream 原本主要設計目標是給 Unbounded 的場景下使用的,而 Unounded 場景下一個主要的特點就是亂序,也就是說任何一個 DataStream 的運算元無法假設處理的 Record 是按照什麼順序進行的,所以許多運算元會用一個 K/V 儲存來快取這些亂序的資料,等到合適的時候再從 K/V 儲存中取出這些資料進行處理並且進行輸出。一般情況下,運算元對 K/V 儲存訪問涉及大量的序列化和反序列化,同時也會引發隨機磁碟 I/O;而在 DataSet 中,假設資料是有界的,也就是可以通過優化來避免隨機的磁碟 I/O 訪問,同時也對序列化和反序列化做了相關優化。這也是為什麼用 DataSet 寫的 WorkerCount 要比用 DataStream 寫的 WordCount 快 5 倍主要原因。

知道到了原因,是不是要把所有的 DataStream 的運算元,都重寫一遍就可以了呢?理論上沒問題,但是 DataStream 有大量的運算元需要重寫,有些運算元還比較複雜,例如與 Window 相關的一系列運算元。可以想象到,如果都全部重寫,工程量是非常之巨大的。所以我們通過單 Key 的 BatchStateBackend 幾乎完全避免了對所有運算元重寫,同時還得到了非常不錯的效果。

- 一致性

對於 Flink 有一定了解的同學應該都知道,原來用 DataStream 寫的 Application 都採用 Streaming 的執行模式,在這種模式下是通過 Checkpoint 的方式保持端到端的 Exactly Once 的語義,具體來說一個作業的 Sink 只有當全圖的所有運算元(包括 Sink 自己)都做完各自的 Snapshot 之後,Sink 才會把資料 Commit 到外部系統中,這是一個典型的依賴 Flink Checkpoint 機制的 2PC 協議。

而在 Bounded 的場景下雖然也可以採用 Streaming 的方式但是對於使用者來說可能會存在一些問題:

  1. 資源消耗大: 使用 Streaming 方式,需要同時拿到所有的資源。在某些情況下,使用者可能沒有這麼多資源;
  2. 容錯成本高: 在 Bounded 場景下,為了效率一些運算元可能無法支援 Snapshot 操作,一旦出錯可能需要重新執行整個作業。

所以在 Bounded 場景下,使用者希望 Application 可以採用 Batch 執行模式,因為利用 Batch 執行的模式可以非常自然的解決上述兩個問題。在 Bounded 場景下支援 Batch 的執行模式是比較簡單的,但是卻引入了一個非常棘手的問題——利用已有的 Sink API 無法保證端到端的 Exactly Once 語義。這是由於 Bounded 場景下是沒有 Checkpoint 的,而原有 Sink 都是依賴 Checkpoint 保證端到端的 ExactlyOnce 的。同時我們不希望開發者針對 Sink 在不同模式下開發兩套不同的實現,因為這樣非常不利用 Flink 和其他生態的對接。

實際上,一個 Transactional 的 Sink 主要解決如下 4 個問題:

  1. What to commit?
  2. How to commit?
  3. Where to commit?
  4. When to commit?

而 Flink 應該讓 Sink 開發者提供 What to commit 和 How to commit,而系統應該根據不同的執行模式,選擇 Where to commit 和 When to commit 來保證端到端的 Exactly Once。最終我們提出了一個全新 Unified Sink API,從而讓開發者只開發一套 Sink 就可以同時執行在 Streaming 和 Batch 執行模式下。這裡介紹的只是主要思路,在有限流的場景下如何保證 End to End 的一致性;如何對接 Hive、Iceberg 等外部生態,實際上還是存在一定挑戰。

四、流批一體 DAG Scheduler

Unified DAG Scheduler 要解決什麼問題

原來 Flink 有兩種排程的模式:

  1. 一種是流的排程模式,在這種模式下,Scheduler 會申請到一個作業所需要的全部資源,然後同時排程這個作業的全部 Task,所有的 Task 之間採取 Pipeline 的方式進行通訊。批作業也可以採取這種方式,並且在效能上也會有很大的提升。但是對於執行比較長的 Batch 作業來說來說,這種模式還是存在一定的問題:規模比較大的情況下,同時消耗的資源比較多,對於某些使用者來說,他可能沒有這麼多的資源;容錯代價比較高,例如一旦發生錯誤,整個作業都需要重新執行。
  2. 一種是批的排程模式。這種模式和傳統的批引擎類似,所有 Task 都是可以獨立申請資源,Task 之間都是通過 Batch Shuffle 進行通訊。這種方式的好處是容錯代價比較小。但是這種執行方式也存在一些短板。例如,Task 之間的資料都是通過磁碟來進行互動,引發了大量的磁碟 IO。

總的來說,有了這兩種排程方式是可以基本滿足流批一體的場景需求,但是也存在著很大的改進空間,具體來說體現在三個方面:

1、架構不一致、維護成本高。排程的本質就是進行資源的分配,換句話說就是要解決 When to deploy which tasks to where 的問題。原有兩種排程模式,在資源分配的時機和粒度上都有一定的差異,最終導致了排程架構上無法完全統一,需要開發人員維護兩套邏輯。例如,流的排程模式,資源分配的粒度是整個物理執行計劃的全部 Task;批的排程模式,資源分配的粒度是單個任務,當 Scheduler 拿到一個資源的時候,就需要根據作業型別走兩套不同的處理邏輯;
2、效能。傳統的批排程方式,雖然容錯代價比較小,但是引入大量的磁碟 I/O,並且效能也不是最佳,無法發揮出 Flink 流式引擎的優勢。實際上在資源相對充足的場景下,可以採取“流”的排程方式來執行 Batch 作業,從而避免額外的磁碟 I/O,提高作業的執行效率。尤其是在夜間,流作業可以釋放出一定資源,這就為批作業按照“Streaming”的方式執行提供了可能。
3、自適應。目前兩種排程方式的物理執行計劃是靜態的,靜態生成物理執行計劃存在調優人力成本高、資源利用率低等問題。

基於 Pipeline Region 的統一排程

為了既能發揮流引擎的優勢,同時避免全圖同時排程存在的一些短板,我們引入 Pipeline Region 的概念。Unified DAG Scheduler 允許在一個 DAG 圖中,Task 之間既可以通過 Pipeline 通訊,也可以通過 Blocking 方式進行通訊。這些由 Pipeline 的資料交換方式連線的 Task 被稱為一個 Pipeline Region。基於以上概念,Flink 引入 Pipeline Region 的概念,不管是流作業還是批作業,都是按照 Pipeline Region 粒度來申請資源和排程任務。細心的讀者可以發現,其實原有的兩種模式都是 Pipeline Region 排程的特例。

即便可以資源上滿足“流”的排程模式,那麼哪些任務可以採取“流”的方式排程呢?

有同學還是會擔心採取“流”的排程方式容錯代價會比較高,因為在“流”的排程方式下,一個 Task 發生錯誤,和他聯通的所有 Task 都會 Fail,然後重新執行。

在 Flink 中,不同 Task 之間有兩種連線方式[2],一種是 All-to-All 的連線方式,上游 Task 會和下游的所有的 Task 進行連線;一種是 PointWise 的連結方式,上游的 Task 只會和下游的部分 Task 進行連線。

如果一個作業的所有 Task 之間都是通過 All-to-All 方式進行連線,一旦採取“流”的排程方式,那麼整個物理拓撲都需要同時被排程,那麼確實存在 FailOver 代價比較高的問題[3]。但是在實際 Batch 作業的拓撲中,Task 之間不都是通過 All-to-All 的邊連線,Batch 作業中存在的大量 Task 通過 PointWise 的邊連線,通過“流”的方式排程由 PointWise 連線的 Task 連通圖,在減少作業的容錯成本的同時,可以提高作業的執行效率,如下圖所示在,在全量的 10T TPC-DS 測試中,開啟所有 PointWise 邊都採用 Pipeline 的連結方式就可以讓整效能有 20% 以上的效能提升。

上述只是 Schduler 提供的劃分 Pipeline Region 的 4 種策略中的一種[4],實際上 Planner 可以根據實際執行場景,定製哪些 Task 之間採取 Pipeline 的傳輸方式,哪些 Task 之間採取 Batch 的傳輸方式方式。

自適應排程

排程的本質是給物理執行計劃進行資源分配的決策過程。Pipeline Region 解決了物理執行計劃確定之後,流作業和批作業可以統一按照 Pipeline Region 的粒度進行排程。對於批作業來說靜態生成物理執行計劃存在一些問題[5]:

  1. 配置人力成本高。對於批作業來說,雖然理論上可以根據統計資訊推斷出物理執行計劃中每個階段的併發度,但是由於存在大量的 UDF 或者統計資訊的缺失等問題,導致靜態決策結果可能會出現嚴重不準確的情況;為了保障業務作業的 SLA,在大促期間,業務的同學需要根據大促的流量估計,手動調整高優批作業的併發度,由於業務變化快,一旦業務邏輯發生變化,又要不斷的重複這個過程。整個調優過程都需要業務的同學手動操作,人力成本比較高,即便這樣也可能會出現誤判的情況導致無法滿足使用者 SLA;
  2. 資源利用率低。由於人工配置併發度成本比較高,所以不可能對所有的作業都手動配置併發度。對於中低優先順序的作業,業務同學會選取一些預設值作為併發度,但是在大多數情況下這些預設值都偏大,造成資源的浪費;而且雖然高優先順序的作業可以進行手工併發配置,由於配置方式比較繁瑣,所以大促過後,雖然流量已經下降但是業務方仍然會使用大促期間的配置,也造成大量的資源浪費現象;
  3. 穩定性差。資源浪費的情況最終導致資源的超額申請現象。目前大多數批作業都是採取和流作業叢集混跑的方式,具體來說申請的資源都是非保障資源,一旦資源緊張或者出現機器熱點,這些非保障資源都是優先被調整的物件。

為了解決靜態生成物理執行存在這些問題,我們為批作業引入了自適應排程功能[6],和原來的靜態物理執行計劃相比,利用這個特性可以大幅提高使用者資源利用率。 Adaptive Scheduler 可以根據一個 JobVertex 的上游 JobVertex 的執行情況,動態決定當前 JobVertex 的併發度。在未來,我們也可以根據上游 JobVertex 產出的資料,動態決定下游採用什麼樣的運算元。

五、流批一體的 Shuffle 架構

Flink 是一個流批一體的平臺,因此引擎對於不同的執行模式要分別提供 Streaming 和Batch 兩種型別的 Shuffle。雖然 Streaming Shuffle 和 Batch Shuffle 在具體的策略上存在一定的差異,但是本質上都是為了對資料進行重新劃分(re-partition),因此不同的 Shuffle 之間還存在一定的共性。所以我們的目標是提供一套統一的 Shuffle 架構,既可以滿足不同 Shuffle 在策略上的定製,同時還能避免在共性需求上進行重複開發。

總體來說,Shuffle 架構可以劃分成如下圖所示的四個層次。流和批的 Shuffle 需求在各層上有一定差異,也有大量的共性,下邊我做了一些簡要分析。

流批 Shuffle 之間的差異

大家都知道,批作業和流作業對 Shuffle 的需求是有差異的,具體可以體現在如下 3 個方面:

1、Shuffle 資料的生命週期。流作業的 Shuffle 資料和 Task 的生命週期基本是一致的;而批作業的 Shuffle 資料和 Task 生命週期是解耦的;
2、Shuffle 資料的儲存介質。因為流作業的 Shuffle 資料生命週期比較短,所以可以把流作業的 Shuffle 資料儲存在記憶體中;而批作業的 Shuffle 資料生命週期有一定的不確定性,所以需要把批作業的 Shuffle 資料儲存在磁碟中;
3、Shuffle 部署方式[7]。把 Shuffle 服務和計算節點部署在一起,對流作業來說這種部署方式是有優勢的,因為這樣會減少不必要網路開銷,從而減少 Latency。但對於批作業來說,這種部署方式在資源利用率、效能、穩定性上都存在一定的問題。[8]

流批 Shuffle 之間的共性

批作業和流作業的 Shuffle 有差異也有共性,共性主要體現在:

1、資料的 Meta 管理。所謂 Shuffle Meta 是指邏輯資料劃分到資料物理位置的對映。不管是流還是批的場景,在正常情況下都需要從 Meta 中找出自己的讀取或者寫入資料的物理位置;在異常情況下,為了減少容錯代價,通常也會對 Shuffle Meta 資料進行持久化;
2、資料傳輸。從邏輯上講,流作業和批作業的 Shuffle 都是為了對資料進行重新劃分(re-partition/re-distribution)。在分散式系統中,對資料的重新劃分都涉及到跨執行緒、程序、機器的資料傳輸。

流批一體的 Shuffle 架構

Unified Shuffle 架構抽象出三個元件[9]: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通過和這三個元件互動完成運算元間的資料的重新劃分。通過這三個元件可以滿足不同Shuffle外掛在具體策略上的差異:

  1. Shuffle Master 資源申請和資源釋放。也就是說外掛需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it;
  2. Shuffle Writer 上游的運算元利用 Writer 把資料寫入 Shuffle Service——Streaming Shuffle 會把資料寫入記憶體;External/Remote Batch Shuffle 可以把資料寫入到外部儲存中;
  3. Shuffle Reader 下游的運算元可以通過 Reader 讀取 Shuffle 資料;

同時,我們也為流批 Shuffle 的共性——Meta 管理、資料傳輸、服務部署[10]——提供了架構層面的支援,從而避免對複雜元件的重複開發。高效穩定的資料傳輸,是分散式系統最複雜的子系統之一,例如在傳輸中都要解決上下游反壓、資料壓縮、記憶體零拷貝等問題,在新的架構中只要開發一遍,就可以同時在流和批兩種場景下共同使用,大大減少了開發和維護的成本。

六、流批一體的容錯策略

Flink 原有容錯策略是以檢查點為前提的,具體來說無論是單個 Task 出現失敗還是JobMaster 失敗,Flink 都會按照最近的檢查點重啟整個作業。雖然這種策略存在一定的優化空間,但是總的來說對於流的場景是基本是接受的。目前,Flink Batch 執行模式下不會開啟檢查點[11],這也意味一旦出現任何錯誤,整個作業都要從頭執行。

雖然原有策略在理論上可以保證最終一定會產出正確的結果,但是明顯大多數客戶都無法接受這種容錯策略所付出的代價。為了解決這些問題,我們分別對 Task 和 JM 的容錯都做了相應的改進。

Pipeline Region Failover

雖然在 Batch 執行模式下沒有定時的 Checkpoint,但是在 Batch 執行模式下,Flink允許 Task 之間通過 Blocking Shuffle 進行通訊。對於讀取 Blocking Shuffle 的 Task 發生失敗之後,由於 Blocking Shuffle 中儲存了這個 Task 所需要的全部資料,所以只需要重啟這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務即可,而不需要重啟整個作業。

總的來說,Pipeline Region Failover 策略和 Scheduler 在進行正常排程的時候一樣,都是把一個 DAG 拆分成由若干 Pipeline shuffle 連線的一些 Pipeline Region,每當一個 Task 發生 FailOver 的時候,只會重啟這個 Task 所在的 Region 即可。

JM Failover

JM 是一個作業的控制中心,包含了作業的各種執行狀態。Flink 利用這些狀態對任務進行排程和部署。一旦 JM 發生錯誤之後,這些狀態將會全部丟失。如果沒有這些資訊,即便所有的工作節點都沒有發生故障,新 JM 仍然無法繼續排程原來的作業。例如,由於任務的結束資訊都已丟失,一個任務結束之後,新 JM 無法判斷現有的狀態是否滿足排程下游任務的條件——所有的輸入資料都已經產生。

從上邊的分析可以看出,JM Failover 的關鍵就是如何讓一個 JM“恢復記憶”。在 VVR[12] 中我們通過基於 Operation Log 機制恢復 JM 的關鍵狀態。

細心的同學可能已經發現了,雖然這兩個改進的出發點是為了批的場景,但是實際上對於流的作業容也同樣有效。上邊只是簡要的介紹了兩種容錯策略的思路,實際上還有很多值得思考的內容。例如 Blocking 上游資料丟失了我們應該如何處理?JM 中有哪些關鍵的狀態需要恢復?

七、未來展望

為了提供比現在更快、更穩的使用者體驗,我們已經開始了下一代流式架構的研發;Flink在流批一體的場景下得到了越來越多使用者的認可,但是我們也知道業界還有很多高水平傳統大資料系統值得我們學習。最後我也希望感興趣的小夥伴可以加入我們,一起打造一個具有完美使用者體驗的流批一體大資料計算引擎。

https://developer.aliyun.com/article/783112?utm_content=g_1000256443

本文為阿里雲原創內容,未經允許不得轉載。

 

分享到: