实时即未来,大数据项目车联网之原始数据实时ETL任务消费数据策略(3)

语言: CN / TW / HK

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第3天,点击查看活动详情

实时ETL业务开发

1 原始数据实时ETL分析

根据kafka集群中的数据进行实时ETL过滤,对数据进行划分,并将划分的数据分别落地到不同的数据库中。

image-20221007105927589

2 实时ETL开发流程分析

原始数据实时ETL,读取kafka中的原始json数据,解析json数据,flink实时逻辑开发,数据落地到hive与hbase

image-20221007110005934

l 实现步骤:

原始数据实时ETL流式任务创建

原始数据实时ETL任务设置

定义任务Kakfa数据源信息(topic,group,key、value序列化信息等)

任务添加Kafka数据源,消费数据

原始数据json解析

逻辑处理,区分原始数据中的解析失败的数据和解析成功的数据

数据落地HDFSSink,创建Hive表与HDFS数据进行关联

定时任务脚本

image-20221007110019679

3 解析工具类引入

解析工具类创建的程序包路径: cn. maynor.streaming.utils

1 日期处理工具

l 在程序包下创建日期工具类 DateUtil .java

l 定义时间处理的枚举类

l 日期处理工具,包含:字符串转日期,获得当前日期、日期格式化等处理

直接获得当前日期,格式:“yyyy-MM-dd HH:mm:ss”

直接获得当前日期,格式:”yyyyMMdd”

字符串日期格式转换,传入参数格式:“yyyy-MM-dd”,转成Date类型

字符串日期格式转换,传入参数格式:“yyyy-MM-dd HH:mm:ss”,转成Date类型

字符串日期格式转换,传入参数格式:”yyyy-MM-dd HH:mm:ss“,转成”yyyyMMdd”格式

l 测试日期工具类

2 配置文件加载工具

l 配置文件加载工具,加载key在conf.properties文件中对应的值

l 在程序包下创建日期工具类 ConfigLoader

n ClassLoader(类加载器),加载conf.properties

通过Properties的load方法加载InputStream

编写方法实现获得string的key值

编写方法实现获得int的key值

3 字符串处理工具

后续在使用hbase的rowkey生成的时候,为了避免热点问题,将字符串进行反转,因此创建字符串反转方法

l 在程序包下创建日期工具类 StringUtil .java

l 定义字符串反转的方法

4 JSON解析工具

Json解析工具,即编写原始数据json解析工具类,使用第一章json解析中的第二种方式org.json解析:原始数据\sourcedata.txt

l 编写json解析对象:ItcastDataObj对象

n json的key作为对象的属性

扩展字段

类型转换、构造方法、toString方法等

讲义关联资料\ItcastDataObj属性.md

讲义关联资料\ItcastDataObj方法.md

l Json解析工具类: JsonParseUtil

讲义关联资料\JsonParseUtil.java

区分出来正确数据或者异常数据

Vin不为空且 terminalTime(事件时间)不为空,即正常数据,反之异常数据

为扩展字段进行赋值操作:errorData、terminalTimeStamp

l 工具类测试方法: JsonParseUtilTest

img

4 原始数据 实时 ETL任务设置

l Task任务创建所在的程序包: cn. maynor.streaming.task

l 创建task: KafkaSourceDataTask

img

l 原始数据实时ETL任务编写,流任务设置检查点、任务重启策略、kafka分区发现、数据积压

checkpoint

任务重启策略

kafka分区发现

数据积压

1 checkpoint配置

l 选择合适的Checkpoint存储方式

l CheckPoint存储方式存在三种

官方文档:https://ci.apache.org/projects/flink/flink-docs-release- 10/ops/state/state_backends.html

n MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend

服务器中配置默认状态存储路径:flink-conf.yaml

```bash

The backend that will be used to store operator state checkpoints

state.backend: filesystem

Directory for storing checkpoints 服务器环境存储多个挂载的本地磁盘为最优

prod envirment file:///data/flink/checkpoints

state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints ```

不同 StateBackend 之间的性能以及安全性是有很大差异的。通常情况下,在代码中设置检查点存储,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择存在挂在存储在磁盘上的FsStateBackend、RocksDBStateBackend。

这有两个原因:首先,RocksDBStateBackend 是外部存储,其他两种 Checkpoint 存储方式都是 JVM 堆存储。 受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到一定的制约;

其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。 与完整检查点相比,增量检查点可以显著缩短 checkpointing 时间,但代价是需要更长的恢复时间。

“state.checkpoints.dir”参数来指定所有的checkpoints数据和元数据存储的位置

java // 本地访问hdfs设置系统属性 System.setProperty("HADOOP_USER_NAME", "root"); // 创建flink的流式环境、设置任务的检查点(数据存储hdfs)、设置分区发现、设置任务的重启策略、数据积压内部解决策略 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // 设置流式数据的参照时间 ProcessingTime:事件被处理时机器的系统时间;IngestionTime:事件进入flink事件;EventTime:事件数据中某个字段的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2.checkpoint配置 开启检查点,设置时间间隔为5分钟 env.enableCheckpointing(300000); // 3.检查点Model设置 exactly once 仅消费一次 保证消息不丢失不重复 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 4 防止checkpoint 太过于频繁而导致业务处理的速度下降 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 5 设置checkpoint的超时时间 env.getCheckpointConfig().setCheckpointTimeout(20000); // 6 设置checkpoint最大的尝试次数,次数必须 >= 1 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 7 设置取消任务时保留checkpoint,checkpoint默认会在整个job取消时被删除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 8 设置执行job过程中,保存检查点错误时,job不失败 env.getCheckpointConfig().setFailOnCheckpointingErrors(false); String hdfsUri = ConfigLoader.getProperty("hdfsUri"); // 9.设置检查点存储的位置,使用RocksDBStateBackend,存储在hdfs分布式文件系统中,增量检查点 try { env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask")); } catch (IOException e) { e.printStackTrace(); }