State Migration on Flink SQL

語言: CN / TW / HK

本文整理自位元組跳動基礎架構周伊莎的演講內容。Flink SQL 作為實時數倉建設中重要的工具,能夠幫助使用者快速開發流式任務,支援實時資料處理的場景和需求,本文將分享 SQL 作業迭代中狀態的保持——狀態遷移相關的現狀、問題解決及未來規劃。

作者|位元組跳動基礎架構工程師-周伊莎

 

背 景

Flink SQL 作為實時數倉建設中重要的工具,能夠幫助使用者快速開發流式任務,支援實時資料處理的場景和需求。相比 DataStream 作業,SQL 作業在開發成本和維護成本上都具有非常大的優勢,無需掌握複雜的開發語言,程式設計環境等等,無需經歷打包,部署等耗時的流程,簡單地編輯 SQL 語句即可建立擁有複雜邏輯的流式任務。然而,對使用者遮蔽掉底層細節,意味著 SQL 作業會喪失一些程式碼層面的靈活度。

其中一個非常重要的話題就是 SQL 作業迭代中狀態的保持——狀態遷移

現狀

首先,為什麼要遷移舊狀態呢?

除了一些簡單的 ETL 任務,很多流式任務承載著複雜的業務邏輯,例如:計算每分鐘的訂單總額。這些計算邏輯的中間結果在 Flink 內部會作為狀態被儲存,方便在 Failover 或迭代後基於上一個狀態繼續計算。

當前,如果我們無法遷移狀態時,舊的狀態會被丟棄,然後回撥作業 Offset 去重跑任務,以達到計算的連續性(通常會保證 At Least Once)。

那麼這樣做有什麼問題呢?

  • 重跑會帶來計算資源的浪費;
  • 對於時延性要求比較高的作業來說,重跑帶來的資料 Delay 是使用者無法接受的;
  • 如果有一些長週期的任務,譬如說計算月粒度視窗的聚合,而輸入的資料只儲存了 7 天或者更短的時間,那麼這樣的任務就會因為輸入資料的缺失而無法重跑;
  • 在某些場景下可能會導致計算出錯,例如,將 Offset 回撥到某個視窗的起始時間戳,則上一個視窗的遲到資料可能會導致錯誤的輸出。

因此,在流式作業的迭代時,需要儘量遷移舊狀態,來保證計算的連續性和正確性。

SQL 作業與狀態

狀態的恢復有兩個充分必要條件,其一是 OperatorID 的一致性,OperatorID 與運算元的狀態是強繫結的——運算元狀態的 Namespace 以其 OperatorID 命名;其二是運算元 State Serializer 的相容性。當 OperatorID 保持不變且運算元新舊 State Serializer 相互相容時,才能成功從 Checkpoint 中恢復作業的狀態。

在 DataStream 作業中,可以通過為有狀態運算元設定 UID / UID Hash 來保證 OperatorID 的一致性,通過自定義 State Serializer 來解決 Serializer 的相容問題,因此,即使作業進行迭代,邏輯改變,也很容易在作業版本間平滑地遷移狀態。

但是在 SQL 作業中,使用者直觀可見的只有 SQL 這一層,SQL 層往下的 Table 層, Datastream API 層 以及 Runtime 層,使用者都是無法直接控制的。因此 SQL 作業的狀態對使用者來說是完全黑盒的,意味著 SQL 作業的使用者是無法通過 API 來完成與狀態的互動的,同時,迭代中對 SQL 的修改,也很容易使得前文的兩個條件被打破,從而導致狀態無法遷移。

問題分類

由此,我們可以把 SQL 作業狀態遷移的問題分為兩大類:

  • DAG 極易發生變更
  • State serializer 不可相容

首先來看看問題一,SQL 作業的 DAG 是極易隨著使用者的修改發生變更的。包括兩種修改:

  • 第一種是隱式修改:例如,在上圖的 SQL 中,Bigint Field 後面增加了一個加 2000 這樣的邏輯,導致 DAG 圖裡新增一個 Calc 節點;打開了 Mini-batch 優化或者為 Source 新增了Watermark,也會導致作業的 DAG 中新增 Mini-batch Assigner 或者 Watermark Assigner 節點。
  • 另一種是顯式修改:例如,新增維表,輸入的 Source,輸出的 Sink 等等,這些都是比較直觀的導致 DAG 圖新增節點的情況。

DAG 發生變更之後,OperatorID 基本都會發生變化,導致狀態無法恢復。這是本文計劃解決的問題型別。

問題二是 State serializer 不可相容。在 SQL 任務中,Flink 版本不變的情況下,相同的運算元使用的 State 型別是一致的,例如,GroupAggregate 運算元裡會存一個 ValueState,這個 valueState 裡面存的是一個由所有 Accumulator 組成的 Row。但隨著 SQL 中相關邏輯的修改,State 裡實際儲存的資料型別會發生變化,導致新舊 State serializer 無法相容。

例如上圖中,我們在第四行新增了一個 Last value 聚合,GroupAggregate 運算元的儲存的 ValueState 從一個4列 Row 的變成一個5列的 Row,因此導致新舊 Serializer 不相容,狀態無法被正常讀取,從而恢復失敗。
這類問題的解決方案不在本文的探討範圍內,將在未來展望一節中簡要介紹位元組目前的探索方案。

 

Operator DAG 視覺化編輯

解決思路

在 DataStream API 層,Flink 已經提供了一種幫助使用者在 DAG 變更時進行狀態遷移的能力,即為運算元設定 UID 或者 UID Hash 來保證 Operator ID 不變。

那麼在 SQL 作業中怎麼去使用這樣的能力呢?主要分三步:

  1. 先為 SQL 作業提供 DAG 的視覺化預覽。
  2. 允許使用者對 DAG 中運算元的屬性進行編輯。
  3. 將使用者編輯的 UID 和 UID Hash 傳遞到執行時。

DAG 預覽

執行時我們能在 Flink Web UI 上看到一個 Task 粒度的 DAG 圖,它對應的內部抽象是 JobGraph。而在我們的場景下需要一個運算元粒度的 DAG 圖,內部也有一個對應的抽象是 StreamGraph。但為了隔離外部儲存的 DAG 和 StreamGraph 的實現,此處提出一個獨立的抽象,叫 PlanGraph,將 StreamGraph 裡的一些屬性對映上去。

那麼 StreamGraph 中的運算元和 PlanGraph 中的節點如何形成穩定的對映呢?我們複用了Job Graph Generator 中使用的 Stream Graph Hasher V2 來為每個運算元生成確定性的 ID。

上圖右側是 PlanGraph 抽象的一些核心 Field,第一個是上文提到的確定性 ID;第二個是 Generated OperatorID 這個是與 JobGraph 中運算元的 OperatorID 一一對應的。第三個是 User Provided Hash,使用者可以通過這個欄位來為每一個運算元指定他的 UID 和 UID Hash。另外還有一些其他的 StreamNode 的屬性和一些展示相關的屬性。

上圖展示的是一個 SQL 任務初始的視覺化效果,左側是一個簡單的 SQL,它的邏輯是從 Source 讀資料,做一次全域性聚合後寫出到 Sink 。右側是對應的視覺化效果,它展示了所有 Task 粒度的節點,展開每個 Task 節點,可以看到各個 Task 包含的運算元鏈。點選運算元或 Task 節點,下方的屬性 Tab 會展示節點相關屬性,如運算元粒度會展示:運算元 ID、運算元名稱、並行度等等。

注意這裡的一個小 Tip,為了減少使用者的理解複雜度,對外暴露的屬性只有運算元 Hash 一個,而實際上這個值會被同時設定成運算元的 UID 和 UID Hash。

另外,為了減少使用者的配置工作量,位元組內部版本在檢查 Checkpoint 中各運算元 State 的元資訊時,會跳過沒有實際儲存狀態的部分,這意味著使用者無需為無狀態的運算元去配置 UID。

當用戶對任務做了一些迭代修改導致 DAG 發生變更後,會展現出上圖所示的 DAG Diff 面板。圖中這個例子裡,我們為任務打開了 Mini-batch 優化(注意,為了舉例方便,我們暫時關閉了 Local-global 優化),可以看到右側的新 DAG 相較於左側的舊 DAG 新增了一個 Mini-batch Assigner 節點。

顯然,此時 GroupAggregate 運算元的 OperatorID 會發生變化,導致下一次重啟時,它的狀態恢復失敗。

那麼如果要做狀態的遷移該怎麼操作呢?

首先在左右兩張圖上都選中我們需要遷移狀態的 Group Aggregate 運算元,從左側把舊的運算元 ID,複製到右側的 Hash 屬性中即可,至此我們就完成了基本的編輯步驟,只要將這些資訊提交到執行時,我們就能將舊任務的狀態遷移至新任務中了。

整體使用流程

下面介紹一下整體的使用流程:

  1. 對於每一個作業版本,包括它的 SQL 跟 Normal configs ,系統都會為其生成一個 PlanGraph ,然後儲存到外部系統;
  2. 當用戶對作業做了一些迭代和修改之後,會產生新版本的 SQL 跟 Normal configs,和與此對應的 PlanGraph;
  3. 把舊的 PlanGraph 與新的 PlanGraph 進行 diff 對比後,由使用者手工地修改或者採用自動對映來複用舊圖中的運算元 ID;
  4. 修改後的PlanGraph會和SQL及Normal configs一起提交給Flink API, PlanGraph 中的運算元 ID 會被對映到實際生成的 JobGraph 中去,最終,包含這些資訊的 JobGraph 會被提交到執行時。

到此為止,為使用者提供了基礎的 SQL 作業狀態遷移能力。

易用性問題

在實際使用中,上述方案會遇到非常多的易用性問題。使用者的 SQL DAG 遠不止幾個運算元那麼簡單,對於複雜的 DAG,為它所有的節點去手動配置 UID 或者 UIDHash 的成本是非常高的。即使真的要手動地去配置,我們也很難快速地去定位到底哪一些節點是有狀態的。

針對這些易用性問題,我們提供了以下解決方案:

  1. 提供 Best Effort 的自動對映,把舊圖中的運算元 ID 自動地對映到新圖上;
  2. 高亮使用狀態的節點
  3. 除圖形化的 DAG Diff 外,額外提供 DAG 對應的 JSON 的程式碼對比

Best Effort 的自動對映

本功能旨在減少使用者手動配置工作量,自動為在新舊圖中相同的節點完成運算元 UID / UID Hash 的對映。Best Effort 意味著儘量地進行對映,但不保證所有的節點都可以完成對映。

在闡述具體的演算法之前,需要先了解一個前提:運算元的 Description (即 RelDetailed Description,包含對應 RelNode 的 Plan 級別的屬性)是描述運算元的一個強有力的資訊。當兩個運算元 Description 完全相等的時候,它在新舊圖中大概率是相同的節點。

以下是這個演算法的基本流程:

  1. 分別在新圖和舊圖裡去收集具有相同 Description 的運算元;
  2. 為每一對這樣的新舊運算元計算它們的相似度,並放入最大堆。相似度的計算 Tips:主要是去比較它所有的出入節點的屬性, 每有一個相同的出或入節點時,都會被加權後累加到最終的相似度中。
  3. 輪詢這個最大堆,直到新圖或舊圖中的所有節點都完成匹配。每個節點僅會被匹配一次,每發現一對匹配的節點,從舊節點中取出它的 Generated OperatorID 填入到新節點的 User Provided Hash 中。

至此,一次 Best Effort 的自動對映就已經完成。在實際應用中,這種演算法效果良好,對於簡單的 DAG 改動,能夠完成 100% 的對映,無需使用者再進行手動配置

JSON 程式碼比較

第二個提高易用性的功能是提供運算元 JSON 程式碼的比較。當 DAG 圖十分複雜且自動對映功能無法完成全部對映時,仍然需要使用者為剩餘的有狀態節點手動設定 UID / UID Hash,對於這部分節點的定位,我們可以通過 JSON 程式碼的形式來呈現。

按照拓撲排序的順序以 JSON 的形式呈現運算元屬性列表,當新增或者刪除節點時,通過 JSON 程式碼的比較,可以非常快速的定位到兩張圖的 Diff,而在新舊圖中相同的節點,除了 OperatorID 發生變化,其餘屬性是完全一致的,只需簡單的從左側複製 OperatorID 到右側即可。

其他優化

第三個是一些比較小的優化點,包括把有狀態節點打上特殊的標記,來提示使用者去重點關注。

另外還有一個圖搜尋的功能,使用者可以通過它的一些節點屬性例如 Description 等來進行節點的搜尋,這樣可以方便使用者在圖模式下進行手工修改時,快速的定位需要修改的節點。

 

未來展望

在整體的規劃中,我們的目標是構建全面的 SQL 作業狀態恢復能力圖譜——劃分能力邊界, 明確可恢復場景,提供解決方案,對不可恢復場景做到提前發現。

因此計劃從以下兩個方面著手:

  1. 提供完善的狀態不可恢復事前檢查能力
  2. 持續增強 SQL State 恢復能力,覆蓋兩類典型問題的典型場景

對於第一個方向,目前我們正在探索 FLIP-22 中提到的 Eager State Declaration, 來將獲取 State 元資訊的時機提前,允許在執行時之前就獲取相關元資訊來判斷狀態是否可成功遷移。

對於第二個方向,上文介紹的工作為 SQL 作業在 DAG 變更導致的狀態恢復問題提供了一套較為完整的解決方案,但我們仍在思考這個場景是否有更易用的解決方案,例如,利用 SQL Hint 來為各個 SQL 程式碼段來設定更為易讀的 UID。

除此之外,我們還在探索基於列血緣資訊的 State Schema Evolution 方案,為 State 中儲存的RowData 的每一個欄位提供血緣 Digest,以期解決典型場景下 State Serializer 不相容導致的狀態無法遷移的問題。

目前,位元組跳動流式計算團隊同步支援的火山引擎流式計算 Flink 版正在公測中,支援雲中立模式,支援公共雲、混合雲及多雲部署,全面貼合企業上雲策略,歡迎申請試用:流式計算 Flink 版-火山引擎


歡迎關注「位元組跳動雲原生計算」公眾號,後臺回覆加入技術交流群,參與技術交流,瞭解更多資訊!