Flink Watermark 机制及总结
作者 :黄龙,腾讯 CSIG 高级工程师
Flink Watermark
前言
Flink 水印机制,简而言之,就是在 Flink 使用 Event Time 的情况下,窗口处理事件乱序和事件延迟的一种设计方案。本文从基本的概念入手,来看下 Flink 水印机制的原理和使用方式。
Flink 在流应⽤程序中三种 Time 概念
Time 类型 | 备注 |
---|---|
Processing Time | 事件被机器处理的系统时间,提供最好的性能和最低的延迟。分支式异步环境下,容易受到事件到达系统的速度,事件在系统内操作流动速度以及中断的影响。 |
Event Time | 一般指数据本身携带的时间戳,能够满足在特定场景下数据准确性的需求。一般而言与 Processing Time 有时间延迟,需要引入水印机制处理事件乱序和时间乱序问题。 |
Ingestion Time | 事件进入 Flink 的时间。一般在 Flink Source 定义,提供给下游窗口计算的触发计算。 |
⼀般来说,在⽣产环境中 Event Time 与 Processing Time 是常用的策略。
Flink 的 Window
Window 是无限数据流处理的核心,Window 将一个无限长的 stream 拆分成有限大小的 buckets ,我们可以在这些 buckets 上做计算操作。
Window 的组成
Apache Flink 为用户提供了自定义 Window 的功能。自定的 Window 主要包含的组件为 Window assigner、 evictor 和 trigger,接下来将对其进行详细分析。
1. 窗口分配器(Window Assinger)
窗口分配器定义了数据流中的元素如何分配到窗口中,通过在分组数据流中调用 .window(...) 或者非分组数据流中调用 .windowAll(...) 时指定窗口分配器(WindowAssigner)来实现。WindowAssigner 负责将每一个到来的元素分配给一个或者多个窗口(window), Flink 提供了一些常用的预定义的窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。你也可以通过继承 WindowAssigner 类来自定义自己的分配器。
查看源码可以看⻅ WindowAssigner 这个抽象类有如下实现类:
常用的 WindowAssigner 实现类的功能介绍如下:
Assinger | 备注 |
---|---|
GlobalWindows | 所有的数据都分配到同一个窗口。 |
MergingWindowAssigner | 可 Merge 的窗口分配处理。 |
SlidingProcessingTimeWindows | 基于 Processing Time 的滚动窗口分配处理。 |
SlidingEventTimeWindows | 基于 Event Time 的滚动窗口分配处理。 |
TumblingProcessingTimeWindows | 基于 Processing Time 的滑动窗口分配处理。 |
TumblingEventTimeWindows | 基于 Event Time 的滑动窗口分配处理。 |
ProcessingTimeSessionWindows | 基于 Processing Time 且可 merge 的会话窗口分配处理。 |
EventTimeSessionWindows | 基于 Event Time 且可 merge 会话窗口分配处理。 |
2. 触发器(Trigger)
触发器决定了一个窗口何时可以被窗口函数处理,每一个窗口分配器都有一个默认的触发器,该触发器决定合适计算和清除窗口。如果默认的触发器不能满足你的需要,你可以通过调用 trigger(...)
来指定一个自定义的触发器。触发器的接口有5个方法来允许触发器处理不同的事件:
-
onElement()
方法,每个元素被添加到窗口时调用 -
onEventTime()
方法,当一个已注册的事件时间计时器启动时调用 -
onProcessingTime()
方法,当一个已注册的处理时间计时器启动时调用 -
onMerge()
方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
每个触发动作的返回结果⽤ TriggerResult 定。TriggerResult 有四种状态:
-
CONTINUE
:什么也不做 -
FIRE
:触发计算 -
PURGE
:清除窗口中的数据 -
FIRE_AND_PURGE
:触发计算并清除窗口中的数据
查看源码可以看⻅ Trigger 这个抽象类有如下实现类:
常用的 Trigger 实现类的功能介绍如下:
Trigger | 备注 |
---|---|
EventTimeTrigger | 当水印通过窗口末尾时触发的触发器。 |
ProcessingTimeTrigger | 当系统时间通过窗口末尾时触发的触发器。 |
CountTrigger | 窗口元素达到阈值触发的触发器。 |
PurgingTrigger | 作为参数,使其成为带有清除功能触发器。 |
DeltaTrigger | 基于 DeltaFunction 和一个阈值的触发器。 |
3.窗口驱逐器(Evictor)
Flink 的窗口模型允许指定一个除了 WindowAssigner 和 Trigger 之外的可选参数 Evitor,这个可以通过调用 evitor(...) 方法来实现。这个驱逐器(evitor)可以在触发器触发之前或者之后,或者窗口函数被应用之前清理窗口中的元素。如果没有定义 Evictor,触发器直接将所有窗⼝元素交给计算函数。
查看源码可以看⻅ Evictor 这个抽象类有如下实现类:
常用的 Evictor 实现类的功能介绍如下:
Trigger | 备注 |
---|---|
TimeEvitor | 清除时间戳小于窗口元素中的最大时间戳 - interval的元素。 |
CountEvitor | 只保存指定数量的数据。 |
DeltaEvitor | 通过一个 DeltaFunction 和一个阈值,计算窗口缓存中最近的一个元素和剩余的所有元素的 delta 值,并清除 delta 值大于或者等于阈值的元素。 |
Event Time 使用的场景和需要解决的问题
Event Time 场景⼀般是业务需求需要时间这个字段,⽐如购物时是要先有下单事件、再有⽀付事件;借贷事件的⻛控是需要依赖时间来做判断的;机器异常检测触发的告警也是要具体的异常事件的时间展示出来;商品⼴告及时精准推荐给⽤户依赖的就是⽤户在浏览商品的时间段/频率/时⻓等。这些场景只能根据事件时间来处理数据。
当基于事件时间的数据流进⾏窗⼝计算时,由于 Flink 接收到的事件的先后顺序并不是严格的按照事件的 Event Time 顺序排列(会因为各种各样的问题如⽹络的抖动、设备的故障、应⽤的异常等) ,最为困难的⼀点也就是如何确定对应当前窗⼝的事件已经全部到达。然⽽实际上并不能百分百的准确判断,因此业界常⽤的⽅法就是基于已经收集的消息来估算是否还有消息未到达,这就是 Watermark 的思想。Watermark 本质来说就是⼀个时间戳,代表着⽐这时间戳早的事件已经全部到达窗⼝,即假设不会再有⽐这时间戳还⼩的事件到达,这个假设是触发窗⼝计算的基础,只有 Watermark ⼤于窗⼝对应的结束时间,窗⼝才会关闭和进⾏计算。按照这个标准去处理数据,那么如果后⾯还有⽐这时间戳更⼩的数据,那么就视为迟到的数据,对于这部分迟到的数据处理也是一个问题。
Watermark + window 处理乱序数据
在 Flink 中,数据处理中需要通过调⽤ DataStream 中的 assignTimestampsAndWatermarks ⽅法来分配时间和⽔印,该⽅法可以传⼊两种参数,⼀个是 AssignerWithPeriodicWatermarks,另⼀个是 AssignerWithPunctuatedWatermarks,通常建议在数据源(source)之后就进⾏⽣成⽔印,或者做些简单操作⽐如 filter/map/flatMap 之 后再⽣成⽔印,越早⽣成⽔印的效果会更好,也可以直接在数据源头就做⽣成⽔印。
1.AssignerWithPeriodicWatermarks
数据流中每一个递增的 Event Time 都会产生一个 Watermark在实际的⽣产环境中,在 TPS 很⾼的情况下会产⽣⼤量的 Watermark,可能在⼀定程度上会对下游算⼦造成⼀定的压⼒,所以只有在实时性要求⾮常⾼的场景才会选择这种⽅式来进⾏⽔印的⽣成。而且新版 Flink 源码中已经标记为 @Deprecated
2. AssignerWithPeriodicWatermarks
周期性的产生一个 Watermark,但是必须结合时间或者积累条数两个维度,否则在极端情况下会有很⼤的延时,所以周期性 Watermark 的⽣成⽅式需要根据业务场景的不同进⾏不同程度的调试,以便达到理想的效果。
查看源码可以看⻅ AssignerWithPeriodicWatermarks 这个抽象类有如下主要实现类:
-
BoundedOutOfOrdernessTimestampExtractor:
该类⽤来发出滞后于数据时间的⽔印,可以传⼊⼀个时间代表着可以允许数据延迟到来的时间是多⻓,超过延迟时间的话如果还来了之前早的数据,那么 Flink 就会丢弃了。
-
CustomWatermarkExtractor:这是⼀个⾃定义的周期性⽣成⽔印的类,在这个类⾥⾯的数据是 KafkaEvent。
Late Element(延迟数据)的处理
延迟数据三种处理方案
1. 丢弃(默认)
2. allowedLateness 指定允许数据延迟的时间
在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时 间内到达的数据还是可以触发 window 执行的。调用 .allowedLateness(Time lateness)
3. sideOutputLateData 收集迟到的数据
通过 sideOutputLateData 可以把迟到的数据统一收集,统一存储,方便后期排查问题。该⽅法会将延迟的数据发送到给定 OutputTag 的 side output 中去,然后你可以通过 SingleOutputStreamOperator.getSideOutput(OutputTag) 来获取这些延迟的数据。
在多并行度下的 Watermark 应用
在多并行度下(假设流程序存在 shuffle,存在一个算子多个输入的情况), Watermark 会在每个并行度的 source 处或者其他算子内部添加,水印在数据流 shuffle 的过程中的合并方式是:Watermark 会对齐会取所有 channel 最小的 Watermark。
下图显示了多并行度下事件水印的合并方式。
以 Kafka Source 为例,通常每个 Kafka 分区的数据时间戳是递增的(事件是有序的),但是当你作业设置多个并⾏度的时候,Flink 去消费 Kafka 数据流是并⾏的,那么并⾏的去消费 Kafka 分 区的数据就会导致打乱原每个分区的数据时间戳的顺序。在这种情况下,你可以使⽤ Flink 中的 Kafka-partition-aware 特性来⽣成⽔印,使⽤该特性后,⽔印会在 Kafka 消费端⽣成,然后每个 Kafka 分区和每个分区上的⽔印最后的合并⽅式和⽔印在数据流 shuffle 过程中的合并⽅式⼀ 致。
下面的插图展示了如何使用每个kafka分区的水印生成,以及在这种情况下,水印如何通过数据流传播。
Flink SQL 之 Watermark 的使用
在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 Watermark 生成表达式,同时标记这个已有字段为时间属性字段。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
如果源中的时间戳数据表示为一个 epoch time,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列
CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
在场景和最佳实践方面,这里引用一下云+ 社区 腾讯云流计算 Oceanus 专栏文章 。这里可以找到关于 Flink的当下热门的应用场景和最佳实践,而且定时更新,极具参考价值。这里就不做过多的介绍了。
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
总结
本文从 Flink Watermark 涉及的基本的概念入手,阐述 Flink 水印机制的原理和使用方式。先后介绍了 Time 的类型,Windows 的组成,Event Time 和 Watermark 的使用场景和方式,重点是 Watermark 的设计方案如何解决窗口处理事件乱序和事件延迟的问题。抛转引玉,希望通过本文的介绍,有更多的人了解和关注 FLink 相关机制和原理。希望大家多关注云+ 社区 腾讯云流计算 Oceanus,多多交流,相互学习,共同进步。
扫码加入 流计算 Oceanus 产品交流群:point_down:
扫码关注 「腾讯云大数据」 ,了解腾讯云流计算 Oceanus 更多信息 ~
腾讯云大数据
长按二维码
关注我们
- 最佳实践:MySQL CDC 同步数据到 ES
- 腾讯云ES:一站式配置,TKE容器日志采集与分析就是这么简单!
- 速度提升10倍,腾讯基于Iceberg的数据治理与优化实践
- Flink 实践教程:入门(12):元数据的使用
- Flink Metrics&REST API 介绍和原理解析
- Flink 最佳实践:TDSQL Connector 的使用(上)
- Flink Watermark 机制及总结
- Flink 实践教程-进阶(10):自定义聚合函数(UDAF)
- Flink 实践教程-进阶(9):自定义表值函数(UDTF)
- 数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换
- Flink 实践教程-进阶(8):自定义标量函数(UDF)
- 实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu
- 基于流计算 Oceanus(Flink) CDC 做好数据集成场景
- Flink 实现 MySQL CDC 动态同步表结构
- 流计算 Oceanus | Flink JVM 内存超限的分析方法总结
- Flink 实践教程-进阶(6):CEP 复杂事件处理
- 腾讯云 AI 视觉产品基于流计算 Oceanus(Flink)的计费数据去重尝试
- 腾讯云原生实时数仓建设实践
- 专家带你吃透 Flink 架构:一个新版 Connector 的实现
- 流计算 Oceanus | 巧用 Flink 构建高性能 ClickHouse 实时数仓