大資料Hadoop之——Flink的狀態管理和容錯機制(checkpoint)

語言: CN / TW / HK

一、Flink中的狀態

官方文件

有狀態的計算是流處理框架要實現的重要功能,因為稍複雜的流處理場景都需要記錄狀態,然後在新流入資料的基礎上不斷更新狀態。下面的幾個場景都需要使用流處理的狀態功能:

  • 資料流中的資料有重複,想對重複資料去重,需要記錄哪些資料已經流入過應用,當新資料流入時,根據已流入過的資料來判斷去重。
  • 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式快取下來。比如,判斷一個溫度感測器資料流中的溫度是否在持續上升。
  • 對一個時間視窗內的資料進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。

個狀態更新和獲取的流程如下圖所示,一個運算元子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。 在這裡插入圖片描述

1)鍵控狀態(Keyed State)

在這裡插入圖片描述

keyed state 介面提供不同型別狀態的訪問介面,這些狀態都作用於當前輸入資料的 key 下。換句話說,這些狀態僅可在 KeyedStream 上使用,在Java/Scala API上可以通過 stream.keyBy(...) 得到 KeyedStream,在Python API上可以通過 stream.key_by(...) 得到 KeyedStream。

1、控制元件狀態特點

  • 鍵控狀態是根據輸入資料流中定義的鍵(key)來維護和訪問的
  • Flink 為每個 key 維護一個狀態例項,並將具有相同鍵的所有資料,都分割槽到同一個運算元任務中,這個任務會維護和處理這個 key 對應的狀態
  • 當任務處理一條資料時,它會自動將狀態的訪問範圍限定為當前資料的 key

2、鍵控狀態型別

| 鍵控狀態型別 | 說明 | 方法 | | ------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | | ValueState[T] | 值狀態,儲存一個可以更新和檢索的值 | ValueState.update(value: T)
ValueState.value() | | ListState[T] | 列表狀態,儲存一個元素的列表可以往這個列表中追加資料,並在當前的列表上進行檢索。 | ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.update(values: java.util.List[T])
ListState.get()(注意:返回的是Iterable[T]) | | ReducingState | 聚合狀態,儲存一個單值,表示新增到狀態的所有值的聚合,介面與 ListState 類似,但使用 add(T) 增加元素,會使用提供的 ReduceFunction 進行聚合。 | ReducingState.add(value: T)
ReducingState.get() | | AggregatingState | 聚合狀態,保留一個單值,表示新增到狀態的所有值的聚合。和 ReducingState 相反的是, 聚合型別可能與 新增到狀態的元素的型別不同。 介面與 ListState 類似,但使用 add(IN) 新增的元素會用指定的 AggregateFunction 進行聚合。 | AggregatingState.add(value: T)
AggregatingState.get() | | MapState | 對映狀態,維護了一個對映列表,儲存Key-Value對。 | MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K) |

【溫馨提示】所有型別的狀態還有一個clear() 方法,清除當前 key 下的狀態資料,也就是當前輸入元素的 key。

3、狀態有效期 (TTL)

任何型別的 keyed state 都可以有 有效期 (TTL)。所有狀態型別都支援單元素的 TTL。 這意味著列表元素和對映元素將獨立到期

【官網示例】

```java package com import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.common.time.Time

object StateTest001 { def main(args: Array[String]): Unit = { val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build

val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

} }

``` TTL 配置有以下幾個選項: - newBuilder 的第一個引數表示資料的有效期,是【必選項】。 - TTL 的更新策略(預設是 OnCreateAndWrite) 1. StateTtlConfig.UpdateType.OnCreateAndWrite - 僅在建立和寫入時更新 2. StateTtlConfig.UpdateType.OnReadAndWrite - 讀取時也更新

1)過期資料的清理

預設情況下,過期資料會在讀取的時候被刪除,例如 ValueState#value,同時會有後臺執行緒定期清理(如果 StateBackend 支援的話)。可以通過 StateTtlConfig 配置關閉後臺清理:

```java import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .disableCleanupInBackground .build ```

2)全量快照時進行清理

可以啟用全量快照時進行清理的策略,這可以減少整個快照的大小。當前實現中不會清理本地的狀態,但從上次快照恢復時,不會恢復那些已經刪除的過期資料。 該策略可以通過 StateTtlConfig 配置進行配置:

```java import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot .build ```

【溫馨提示】這種策略在 RocksDBStateBackend 的增量 checkpoint 模式下無效。

3)增量資料清理

另外可以選擇增量式清理狀態資料,在狀態訪問或/和處理時進行。如果某個狀態開啟了該清理策略,則會在儲存後端保留一個所有狀態的惰性全域性迭代器。 每次觸發增量清理時從迭代器中選擇已經過期的數進行清理

4)在 RocksDB 壓縮時清理

如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定製的壓縮過濾器。RocksDB 會週期性的對資料進行合併壓縮從而減少儲存空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經過期的狀態資料。該特性可以通過 StateTtlConfig 進行配置:

```java import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupInRocksdbCompactFilter(1000) .build ```

【注意】 - 如果沒有 state 訪問,也沒有處理資料,則不會清理過期資料。 - 增量清理會增加資料處理的耗時。 - 現在僅 Heap state backend 支援增量清除機制。在 RocksDB state backend 上啟用該特性無效。 - 如果 Heap state backend 使用同步快照方式,則會儲存一份所有 key 的拷貝,從而防止併發修改問題,因此會增加記憶體的使用。但非同步快照則沒有這個問題。 - 對已有的作業,這個清理方式可以在任何時候通過 StateTtlConfig 啟用或禁用該特性,比如從 savepoint 重啟後。

4、鍵控狀態的使用

除了上面描述的介面之外,Scala API 還在 KeyedStream 上對 map() 和 flatMap() 訪問 ValueState 提供了一個更便捷的介面mapWithState。 使用者函式能夠通過 Option 獲取當前 ValueState 的值,並且返回即將儲存到狀態的值。

```java val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) }) ```

2)運算元狀態(Operatior State)

在這裡插入圖片描述

運算元狀態(或者非 keyed 狀態)是繫結到一個並行運算元例項的狀態。Kafka Connector 是 Flink 中使用運算元狀態一個很具有啟發性的例子。Kafka consumer 每個並行例項維護了 topic partitions 和偏移量的 map 作為它的運算元狀態。

【溫馨提示】 Python DataStream API 仍無法支援運算元狀態。

1、運算元狀態特點

  • 運算元狀態的作用範圍限定為運算元任務,由同一並行任務所處理的所有資料都可以訪問到相同的狀態
  • 狀態對於同一子任務而言是共享
  • 運算元狀態不能由相同或不同運算元的另一個子任務訪問

2、運算元狀態型別

| 鍵控狀態型別 | 說明 | | ------------------------------ | :----------------------------------------------------------- | | 列表狀態(ListState) | 將狀態表示為一組資料的列表 | | 聯合列表狀態(UnionListState) | 也將狀態表示為資料的列表。它與常規列表狀態的區別在於,在發生故障時,或者從儲存點(savepoint)啟動應用程式時如何恢復 | | 廣播狀態(BroadcastState) | 如果一個運算元有多項任務,而它的每項任務狀態又都相同,那麼這種特殊情況最適合應用廣播狀態。 |

3)廣播狀態 (Broadcast State)

廣播狀態是一種特殊的運算元狀態。引入它的目的在於支援一個流中的元素需要廣播到所有下游任務的使用情形。在這些任務中廣播狀態用於保持所有子任務狀態相同。 該狀態接下來可在第二個處理記錄的資料流中訪問。廣播狀態和其他運算元狀態的不同之處在於:

  • 它具有 map 格式,
  • 它僅在一些特殊的運算元中可用。這些運算元的輸入為一個廣播資料流和非廣播資料流,
  • 這類運算元可以擁有不同命名的多個廣播狀態 。

【溫馨提示】 Python DataStream API 仍無法支援運算元狀態。

二、狀態後端(State Backends)

狀態的儲存、訪問以及維護,由一個可插入的元件決定,這個元件就 叫做狀態後端(state backend) ,狀態後端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠端儲存

1)三種狀態儲存方式

| 儲存方式 | 說明 | | ------------------- | :----------------------------------------------------------- | | MemoryStateBackend | 【預設模式】狀將鍵控狀態作為記憶體中的物件進行管理,將它們儲存在TaskManager的JVM堆上,將checkpoint儲存在JobManager的記憶體中。主要適用於本地開發和除錯。 | | FsStateBackend | 基於檔案系統進行儲存,可以是本地檔案系統,也可以是 HDFS 等分散式檔案系統。 需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的資料仍然是儲存在 TaskManager 的記憶體中的,只有在 checkpoint 時,才會將狀態快照寫入到指定檔案系統上。 | | RocksDBStateBackend | 將所有狀態序列化後,存入本地的RocksDB中儲存。 |

【溫馨提示】特別在 MemoryStateBackend 內使用HeapKeyedStateBackend時,Checkpoint 序列化資料階段預設有最大 5 MB資料的限制。

對於HeapKeyedStateBackend,有兩種實現: - 支援非同步 Checkpoint(預設):儲存格式 CopyOnWriteStateMap - 僅支援同步 Checkpoint:儲存格式 NestedStateMap

2)配置方式

Flink 支援使用兩種方式來配置後端管理器:

1、【第一種方式】基於程式碼方式進行配置

【溫馨提示】只對當前作業生效 java // 配置 FsStateBackend env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink/checkpoints")); // 配置 RocksDBStateBackend env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/flink/checkpoints")); 配置 RocksDBStateBackend 時,需要額外匯入下面的依賴:

xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.9.0</version> </dependency>

2、【第二種方式】基於 flink-conf.yaml 配置檔案的方式進行配置

【溫馨提示】對所有部署在該叢集上的作業都生效

xml state.backend: filesystem state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints

三、容錯機制(checkpoint)

checkpoint是Flink容錯的核心機制。它可以定期地將各個Operator處理的資料進行快照儲存( Snapshot )。如果Flink程式出現宕機,可以重新從這些快照中恢復資料。

1)一致性

談到容錯性,就沒法避免一致性這個概念。所謂一致性就是:成功處理故障並恢復之後得到的結果與沒有發生任何故障是得到的結果相比,前者的正確性。換句大白話,就是故障的發生是否影響得到的結果。在流處理過程,一致性分為3個級別

  • at-most-once:至多一次。故障發生之後,計算結果可能丟失,就是無法保證結果的正確性;
  • at-least-once:至少一次。計算結果可能大於正確值,但絕不會小於正確值,就是計算程式發生故障後可能多算,但是絕不可能少算;
  • exactly-once:精確一次。系統保證發生故障後得到的計算結果的值和正確值一致;

Flink的容錯機制保證了exactly-once,也可以選擇at-least-once。Flink的容錯機制是通過對資料流不停的做快照(snapshot)實現的。

2)檢查點(checkpoint)

Flink 中的每個方法或運算元都能夠是有狀態的,為了讓狀態容錯,Flink 需要為狀態新增 checkpoint(檢查點)。Checkpoint 使得 Flink 能夠恢復狀態和在流中的位置,從而嚮應用提供和無故障執行時一樣的語義。官方文件

1、開啟與配置 Checkpoint

預設情況下 checkpoint 是禁用的。通過呼叫 StreamExecutionEnvironmentenableCheckpointing(n)來啟用 checkpoint,裡面的 n 是進行 checkpoint 的間隔,單位毫秒

2、Checkpoint 屬性

| 屬性 | 說明 | | ----------------------------- | :----------------------------------------------------------- | | 精確一次(exactly-once) | 你可以選擇向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中傳入一個模式來選擇保證等級級別。 | | checkpoint 超時 | 如果 checkpoint 執行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。 | | checkpoints 之間的最小時間 | 該屬性定義在 checkpoint 之間需要多久的時間,以確保流應用在 checkpoint 之間有足夠的進展。如果值設定為了 5000, 無論 checkpoint 持續時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒後會才開始下一個 checkpoint。 | | checkpoint 可容忍連續失敗次數 | 該屬性定義可容忍多少次連續的 checkpoint 失敗。超過這個閾值之後會觸發作業錯誤 fail over。 預設次數為“0”,這意味著不容忍 checkpoint 失敗,作業將在第一次 checkpoint 失敗時fail over。 | | 併發 checkpoint 的數目 | 預設情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統不會觸發另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。 不過允許多個 checkpoint 並行進行是可行的,對於有確定的處理延遲(例如某方法所呼叫比較耗時的外部服務),但是仍然想進行頻繁的 checkpoint 去最小化故障後重跑的 pipelines 來說,是有意義的。 | | externalized checkpoints | 你可以配置週期儲存 checkpoint 到外部系統中。Externalized checkpoints 將他們的元資料寫到持久化儲存上並且在 job 失敗的時候不會被自動刪除。 這種方式下,如果你的 job 失敗,你將會有一個現有的 checkpoint 去恢復。更多的細節請看 Externalized checkpoints 的部署文件。 |

【官網示例】

```java val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 每 1000ms 開始一次 checkpoint env.enableCheckpointing(1000)

// 高階選項:

// 設定模式為精確一次 (這是預設值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 確認 checkpoints 之間的時間會進行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// Checkpoint 必須在一分鐘內完成,否則就會被拋棄 env.getCheckpointConfig.setCheckpointTimeout(60000)

// 允許兩個連續的 checkpoint 錯誤 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)

// 同一時間只允許一個 checkpoint 進行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// 使用 externalized checkpoints,這樣 checkpoint 在作業取消後仍就會被保留 env.getCheckpointConfig().enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

// 開啟實驗性的 unaligned checkpoints env.getCheckpointConfig.enableUnalignedCheckpoints() ```

3)從檢查點恢復狀態

  • 【第一步】遇到故障之後,第一步就是重啟應用
  • 【第二步】是從 checkpoint 中讀取狀態,將狀態重置,從檢查點重新啟動應用程式後,其內部狀態與檢查點完成時的狀態完全相同
  • 【第三步】開始消費並處理檢查點到發生故障之間的所有資料,這種檢查點的儲存和恢復機制可以為應用程式狀態提供“精確一次”(exactly- once)的一致性,因為所有運算元都會儲存檢查點並恢復其所有狀態,這樣一來所有的輸入流就都會被重置到檢查點完成時的位置

4)檢查點的實現演算法

  • 【一種簡單的想法】:暫停應用,儲存狀態到檢查點,再重新恢復應用
  • 【Flink 的改進實現】:
    1. 基於 Chandy-Lamport 演算法的分散式快照
    2. 將檢查點的儲存和資料處理分離開,不暫停整個應用

5)檢查點演算法

基於Chandy-Lamport演算法實現的分散式快照

1、檢查點分界線(Checkpoint Barrier)

在這裡插入圖片描述

  • 將barrier插入到資料流中,作為資料流的一部分和資料一起向下流動。Barrier不會干擾正常資料,資料流嚴格有序。

  • 一個barrier把資料流分割成兩部分:一部分進入到當前快照,另一部分進入到下一個快照。

  • 每一個barrier都帶有快照ID,並且barrier之前的資料都進入了此快照。Barrier不會干擾資料流處理,所以非常輕量。

  • 多個不同快照的多個barrier會在流中同時出現,即多個快照可能同時建立。

2、Barrier對齊

在這裡插入圖片描述

當一個opeator有多個輸入流的時候,checkpoint barrier n 會進行對齊,就是已到達的會先快取到buffer裡等待其他未到達的,一旦所有流都到達,則會向下遊廣播,exactly-once 就是利用這一特性實現的,at least once 因為不會進行對齊,就會導致有的資料被重複處理。

3、執行一次檢查點步驟

  1. jobManager會向每個source任務傳送一條帶有新檢查點ID的訊息,通過這種方式來啟動檢查點。
  2. 資料來源將他們各自的狀態寫入檢查點後,並向下遊所有分割槽發出一個檢查點barrier。狀態後端在狀態存入檢查點之後,會返回通知給source任務,source任務再向jobmanager確認檢查點完成。
  3. barrier向下遊傳遞,下游任務會等待所有輸入分割槽的barrier的到達後再做狀態儲存通知jobmanager狀態儲存完成,並再向下遊所有分割槽傳送收到的檢查點barrier。

【溫馨提示】對於barrier已經到達的分割槽,繼續到達的資料會被快取;對於barrier未到達的分割槽,資料會被正常處理所有barrier都到達後,做完狀態儲存且向下遊傳送檢查點barrier後,當前任務繼續處理快取的資料和後面到來的資料。

  1. sink任務向jobmanager確認狀態儲存到checkpoint完成。即所有任務都確認已成功將狀態儲存到檢查點時,檢查點就真正完成了。

6)儲存點(savepoint)

1、概述

  • Flink 還提供了可以自定義的映象儲存功能,就是儲存點(savepoints)
  • 原則上,建立儲存點使用的演算法與檢查點完全相同,因此儲存點可以認為就是具有一些額外元資料的檢查點;
  • Flink不會自動建立儲存點,因此使用者(或者外部排程程式)必須明確地觸發建立操作;
  • 儲存點是一個強大的功能。除了故障恢復外,儲存點可以用於:有計劃的手動備份,更新應用程式,版本遷移,暫停和重啟應用,等等。

2、savepoint觸發的三種方式

  1. 使用 flink savepoint 命令觸發 Savepoint,其是在程式執行期間觸發 savepoint。

  2. 使用 flink cancel -s 命令,取消作業時,並觸發 Savepoint。

  3. 使用 Rest API 觸發 Savepoint,格式為:/jobs/:jobid /savepoints

7)檢查點(checkpoint)與 儲存點(savepoint)的區別與聯絡

  • checkpoint的側重點是“容錯”,即Flink作業意外失敗並重啟之後,能夠直接從早先打下的checkpoint恢復執行,且不影響作業邏輯的準確性。而savepoint的側重點是“維護”,即Flink作業需要在人工干預下手動重啟、升級、遷移或A/B測試時,先將狀態整體寫入可靠儲存,維護完畢之後再從savepoint恢復現場。
  • savepoint是“通過checkpoint機制”建立的,所以savepoint本質上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各個TaskManager定時觸發快照並自動清理,一般不需要使用者干預savepoint面向使用者完全根據使用者的需要觸發與清理
  • checkpoint的頻率往往比較高(因為需要儘可能保證作業恢復的準確度),所以checkpoint的儲存格式非常輕量級,但作為trade-off犧牲了一切可移植(portable)的東西,比如不保證改變並行度和升級的相容性。savepoint則以二進位制形式儲存所有狀態資料和元資料,執行起來比較慢而且“貴”,但是能夠保證portability,如並行度改變或程式碼升級之後,仍然能正常恢復。

未完待續,請耐心等待~