Flink 有狀態運算元和應用Demo詳解

語言: CN / TW / HK

一.引言

入門Demo講到了Flink的一個處理特性就是通過時間視窗對一段時間的資料進行處理,這次有狀態運算元則是另一種基於時間的處理,有狀態運算元根據自身狀態的過期時間,可以根據一定時間內的狀態改變做出相對應的變化,相比於傳統流式處理,狀態的引入豐富了事件的處理方式。本章同樣採用之前的溫度感測器作為資料來源,與之前的區別是統計指標由一段時間內一個感測器的平均溫度變為了檢測一個感測器一段時間內是否發生較大變化,如果超過預期閾值則發生報警,否則無事發生。

 

二.依賴支援與輔助類

依賴支援與輔助類同上一篇博文相同,需要的話可以直接去這裡貼上!

 

三.有狀態運算元和應用

1.簡介

有狀態運算元和應用是Flink的一大特性,其作用於 KeyedStream,一般通過 keyBy() 方法指定需要聚合的 key 即可將原始的 DataStream 轉換為 KeyedStream。對於每一個鍵值key,Flink都會維護一個例項狀態,函式的鍵值分割槽狀態會分佈在函式所在的運算元的所有並行任務上,這意味著每個函式的並行例項都會負責一部分鍵值域並維護相應的狀態例項。KeyedStream 會根據指定鍵值進行分割槽並記住鍵值的定義,作用在 KeyedStream 上的運算元可以訪問他的鍵值定義的上下文資訊。

 

2.狀態的初始化-ValueState

相關鍵值分割槽狀態的宣告需要在 RuntimeContext中定義,ValueState[T] 用於儲存型別為T的單個值,常用的方法有通過 .value() 獲取該狀態對應的例項T,.update(Value: T)來更新當前狀態的例項以及 .clear 清除當前狀態例項。相關的狀態原語還包括:ListState[T],MapState[K, V],ReducingState[T] 以及 AggreatingState[I, O],有興趣的同學可以查詢相關資訊,這裡暫時不多展開。

 

3.狀態的處理-ProcessFunction

通常應用ProcessFunction作用在KeyedStream的應用上並對相應的狀態進行,ProcessFunction一般需要覆蓋三個函式: open,processElement以及onTimer。

open

open函式起到初始化的作用,一般我們會在open中通過StateDescriptor以及RuntimeContext獲取狀態例項。

processElement

該函式內通過open獲得的狀態例項與上下文實現程式碼需求,且支援通過Collector產出下一步資料。

onTimer

一般而言一個狀態不會一直存在,通過timeService().registerEventTimeTimer()可以定義當前狀態的事件事件容忍度,可以理解為過期時間,當一個狀態到期還未發生任何改變或者被清除,則會強行執行onTimer進行操作,一般沒有操作的狀態都會在這一步執行 .clear() 減少儲存的壓力。

 

4.狀態的儲存-Statebackend

運算元和狀態的互動會對應用的魯棒性以及效能產生影響,這其中狀態後端起到一定作用。狀態後端負責儲存每個狀態例項的本地狀態,並在生成檢查點時將他們寫入遠端持久化儲存。Flink提供了三種狀態後端: MemoryStateBackend,FsStateBackend以及 RocksDBStateBackend。

MemoryStateBackend

MemoryStateBackend 將狀態以常規物件的方式儲存在 TaskManager 程序的 JVM 堆裡,如果某個任務的例項狀態變得很大,則所在JVM可能由於OOM而終止,此外由於堆中防止了過多常駐記憶體物件而引起垃圾回收停頓的問題也會發生。其次在生成CheckPoint時,MemoryStateBackend會將狀態全部發送至JobManager並儲存到它的堆記憶體中,因此需要JobManager具有足夠的記憶體,否則一旦JobManager出現故障,狀態則會丟失。一般而言,MemoryStateBackend用於開發和除錯,大型線上任務不會選擇。

FsStateBackend

FsStateBackend和MemoryStateBackend一樣,將本地狀態儲存在TaskManager的JVM中,區別是建立檢查點是不會講狀態放入JobManager的堆記憶體而是放入遠端持久化檔案系統,這樣既能在本地享有訪問記憶體的速度,又可以支援容錯,但TaskManager的記憶體問題並沒有得到解決。

RocksDBStateBackend

RocksDBStateBackend會把全部狀態存到本地RocksDB例項中。RocksDB是一個嵌入式鍵值儲存,可以將資料儲存到本地磁碟上。為了從RocksDB上讀寫資料,系統需要對資料進行序列化與反序列化。RocksDBStateBackend同樣會將狀態以檢查點的形式寫入遠端持久化檔案系統,因為它能夠將資料寫入磁碟且支援增量檢查點,所以對於狀態非常大、多的應用是很好地選擇。

 

四.Demo詳解

資料來源是 SensorReading類,其中包含了三個變數,感測器id,感測器生成溫度的時間 timeStamp 以及 當時檢測到的溫度 temperature,相關程式碼參考依賴支援和輔助類中的連結。

1.溫度報警處理類

class SelfCleaningTemperatureAlertFunction(val threshold: Double)
    extends KeyedProcessFunction[String, SensorReading, TemperatureRecord] {

  // 定義上一次的時間戳與溫度
  private var lastTempState: ValueState[Double] = _
  private var lastTimerState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    // 為溫度狀態註冊
    val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
    lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
    // 為時間戳狀態註冊
    val timestampDescriptor: ValueStateDescriptor[Long] =
      new ValueStateDescriptor[Long]("timestampState", classOf[Long])
    lastTimerState = getRuntimeContext.getState(timestampDescriptor)
  }

  override def processElement(
      reading: SensorReading,
      ctx: KeyedProcessFunction[String, SensorReading, TemperatureRecord]#Context,
      out: Collector[TemperatureRecord]): Unit = {

    // 更新當前狀態值
    val newTimer = ctx.timestamp() + (3600 * 1000)
    // get timestamp of current timer
    val curTimer = lastTimerState.value()
    // delete previous timer and register new timer
    ctx.timerService().deleteEventTimeTimer(curTimer)
    ctx.timerService().registerEventTimeTimer(newTimer)
    // update timer timestamp state
    lastTimerState.update(newTimer)

    // 獲取上一次溫度值
    val lastTemp = lastTempState.value()
    // check if we need to emit an alert
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      // temperature increased by more than the thresholdTimer
      out.collect(TemperatureRecord(reading.id, reading.temperature, tempDiff))
    }

    // update lastTemp state
    this.lastTempState.update(reading.temperature)
  }

  override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[String, SensorReading, TemperatureRecord]#OnTimerContext,
      out: Collector[TemperatureRecord]): Unit = {

    // clear all state for the key
    lastTempState.clear()
    lastTimerState.clear()
  }
}

根據上面介紹的處理函式大致介紹一下函式內的分工:

open:

open函式註冊並初始化了溫度狀態與時間戳狀態,這裡預設為Double 0.0 與 Long 0L。註冊方式通過ValueStateDescriptor獲取狀態例項。

processElement:

這一部分主要獲取當前時間為當前狀態定義新的過期時間並重置 LastTime。

    // 更新當前狀態值
    val newTimer = ctx.timestamp() + (3600 * 1000)
    // get timestamp of current timer
    val curTimer = lastTimerState.value()
    // delete previous timer and register new timer
    ctx.timerService().deleteEventTimeTimer(curTimer)
    ctx.timerService().registerEventTimeTimer(newTimer)
    // update timer timestamp state
    lastTimerState.update(newTimer)

 這一步則是業務邏輯,判斷前後兩次的溫度差是否超過預定閾值,超過則發出報警,這裡發出報警通過Collector的 TemperatureRecord類,TemperatureRecord類構成十分簡單,只包含了感測器id,當前溫度與當前溫度差三個變數。  case class TemperatureRecord(id: String, tmd: Double, diff: Double)

    // 獲取上一次溫度值
    val lastTemp = lastTempState.value()
    // check if we need to emit an alert
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      // temperature increased by more than the thresholdTimer
      out.collect(TemperatureRecord(reading.id, reading.temperature, tempDiff))
    }

    // update lastTemp state
    this.lastTempState.update(reading.temperature)

onTimer:

到期處理並沒有過多的邏輯,只是清除了相應的狀態。

  override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[String, SensorReading, TemperatureRecord]#OnTimerContext,
      out: Collector[TemperatureRecord]): Unit = {

    // clear all state for the key
    lastTempState.clear()
    lastTimerState.clear()
  }

 

2.溫度報警主類

Main類主要是FlinkExecutionEnvirnment的初始化、引數配置,資料來源來源與處理函式的定義,以及最後的程式觸發。與之前TimeWindow處理大同小異,唯一區別是通過KeyedStream處理完成了狀態運算元的處理。

  def main(args: Array[String]) {
    
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // checkpoint every 10 seconds
    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // configure watermark interval
    env.getConfig.setAutoWatermarkInterval(1000L)

    println("Parallelism: " + env.getParallelism)
    println("StateBackend: " + env.getStateBackend)

    // ingest sensor stream
    val sensorData: DataStream[SensorReading] = env
      // SensorSource generates random temperature readings
      .addSource(new SensorSource)
      // assign timestamps and watermarks which are required for event time
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    val keyedSensorData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)

    val alerts: DataStream[TemperatureRecord] = keyedSensorData
      .process(new SelfCleaningTemperatureAlertFunction(1.5))

    // print result stream to standard out
    alerts.print()

    // execute application
    env.execute("Generate Temperature Alerts")
  }

 

五.一些收穫

1.狀態初始化 ValueState-Initialization

第一次初始化的ValueState存放為預設值,例如Double的0.0與Long的0L,如果ValueState[T]儲存了自定義或其他例項,則初始化會為null,需要注意在processElement內部進行判斷。

 

2.並行度 parallelism

本地模式下資料來源生成並行度會根據機器core數決定,本地4核的情況下,會產生4倍的資料,叢集模式下則根據自定義的並行度或者叢集本身的core數決定。可以通過下述方法獲得當前執行環境並行度:

    env.getParallelism

 

3.狀態後端 StateBackend

本地執行下預設是 MemoryStateBackend,即放在JVM中,叢集模式下一般都採用RocksDBStateBackend模式,初始化方法:

    val checkpointPath: String = ""
    val backend = new RocksDBStateBackend(checkpointPath)
    env.setStateBackend(backend)

獲取當前狀態後端方法:

    env.getStateBackend