数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换
作者: 吴云涛,腾讯 CSIG 高级工程师
在这个数据爆炸的时代,企业做数据分析也面临着新的挑战, 如何能够更高效地做数据准备,从而缩短整个数据分析的周期,让数据更有时效性,增加数据的价值,就变得尤为重要。将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程(即 ETL 过程),则需要开发人员则需要掌握 Spark、Flink 等技能,使用的技术语言则是 Java、Scala 或者 Python,一定程度上增加了数据分析的难度。而 ELT 过程逐渐被开发者和数据分析团队所重视,如果读者已经非常熟悉 SQL,采用 ELT 模式完成数据分析会是一个好的选择,比如说逐渐被数据分析师重视的 DBT 工具,便利用了 SQL 来做数据转换。DBT 会负责将 SQL 命令转化为表或者视图,广受企业欢迎。此外使用 ELT 模式进行开发技术栈也相对简单,可以使数据分析师像软件开发人员那样方便获取到加工后的数据。
Flink SQL 可以说是对 ELT 模式的一种支持,避免了使用 Java/Scala/Python 编程语言进行开发的复杂性。并且 Flink SQL 采用的是开源的 Apache Calcite 来实现对标准 SQL 语法的支持,没有额外的 SQL 语法学习成本。腾讯云 流计算 Oceanus [1] 是基于 Apache Flink 构建的企业级实时大数据分析平台,提供了可视化的 SQL 作业,降低了数据分析团队的数据获取难度。数据分析团队只对 SQL 熟悉也能完成数据的清洗与分析工作,使团队更专注业务数据的分析工作。Flink SQL 作业的创建,可参考之前的文章 Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务 [ 2]。本文主要对数据转换过程中 Flink SQL 作业中常用的类型转换函数进行了总结。
常用类型转换函数
-
CAST(value AS type) 将某个值转为 type 类型。 type 类型可参考 Flink 官方网站 Data Types [3] 章节。
示例测试语句: SELECT CAST(var1 AS VARCHAR) FROM Test;
测试数据和结果:
测试数据(INT var1) 测试结果 VARCHAR 58 '58' -
CAN_CAST_TO(str, type) 判断 str 字符串是否可以被转换为 type 指定的类型,返回值为布尔型。返回值可以在 CASE 语句中作为条件使用。
示例测试语句: SELECT CAN_CAST_TO(var1, type) FROM Test; 测试数据和结果: True
测试数据(VARCHAR var1) 测试数据(VARCHAR type) 测试结果(BOOLEAN) 123456 INTEGER true
-
DATE_FORMAT_SIMPLE(timestamp, simple_format) 将 BIGINT(long)类型的字段(必须是以毫秒为单位的 Unix 时间戳)以 Java 的 SimpleDateFormat 支持的时间格式化模板转为字符串形式(输出时区为:GMT+8)。
示例测试语句: SELECT DATE_FORMAT_SIMPLE(unix_ts, 'yyyy-MM-dd HH:mm:ss') FROM Test; 测试数据和结果:
测试数据(unix_ts) 测试结果 VARCHAR 1627997937000 2021-08-03 21:38:57 -
DATE_FORMAT(timestamp, format) 将 Timestamp 类型的字段以 Java 的 SimpleDateFormat 支持的时间格式化模板转为字符串形式。
示例测试语句: SELECT DATE_FORMAT(timestamp, format) FROM Test; 测试数据和结果:
测试数据(timestamp) 测试数据(format) 测试结果 VARCHAR 2021-01-01 12:13:14 yyMMdd 210101 2021-01-01 12:13:14 yyyyMMdd 20210101 TIMESTAMP_TO_LONG(timestamp) 或 TIMESTAMP_TO_LONG(timestamp, mode) 将某个 TIMESTAMP 类型的参数转为 BIGINT (Long) 类型的值。若 mode 为 'SECOND',则转为以秒来计数的 Unix 时间戳,例如1548403425。若 mode 为其他值或者省略,则转为以毫秒计数的 Unix 时间戳,例如1548403425512。
-
UNNEST 列转换为行,常常用于 Array 或者 Map 类型。将某1个字段数据转为多个。 示例测试语句: SELECT userId, productImage FROM Test1, UNNEST(productImages) as t(productImage);
其中 productImages 为 Test1 表中 ARRAY <String> 类型字段。UNNEST 函数也可以用 UDTF 函数替代,可参考文章 《Flink 实践教程:进阶9-自定义表值函数(UDTF)》。 测试数据和结果:
测试数据(userId INT, productImages ARRAY <VARCHAR> ) | 测试结果(INT, VARCHAR) |
---|---|
100, ['image1', 'image2'] | 100, 'image1' 100, 'image2' |
其他类型转换函数
-
DATE string 以“yyyy-MM-dd”的形式返回从字符串解析的 SQL 日期。
-
TIME string 以“HH:mm:ss”的形式返回从字符串解析的 SQL 时间。
-
TIMESTAMP string 以“yyyy-MM-dd HH:mm:ss[.SSS]”的形式返回从字符串解析的 SQL 时间戳。
-
UNIX_TIMESTAMP(string1[, string2]) 使用表配置中指定的时区将格式为 string2 的日期时间字符串 string1(如果未指定默认情况下:yyyy-MM-dd HH:mm:ss) 转换为 Unix 时间戳(以秒为单位)。
-
TO_DATE(string1[, string2]) 将格式为 string2(默认为 ‘yyyy-MM-dd’)的字符串 string1 转换为日期。
-
TO_TIMESTAMP_LTZ(numeric, precision) 将纪元秒或纪元毫秒转换为 TIMESTAMP_LTZ,有效精度为 0 或 3,0 代表
TO_TIMESTAMP_LTZ(epochSeconds, 0)
, 3 代表TO_TIMESTAMP_LTZ(epochMilliseconds, 3)
。 -
TO_TIMESTAMP(string1[, string2]) 将 ‘UTC+0’ 时区下格式为 string2(默认为:‘yyyy-MM-dd HH:mm:ss’)的字符串 string1 转换为时间戳。
总结
参考 Flink 官方网站实现其他类型的字段处理,具体参考 Flink 系统(内置)函数 [4]。我们也可以通过用户自定义函数(UDX):自定义标量函数(UDF)、自定义表值函数(UDTF)、自定义聚合函数(UDAF)来完成更复杂的 Flink SQL 作业的数据处理工作,具体参考之前的文章 Flink 实践教程:进阶8-自定义标量函数(UDF) [5] 等。
阅读参考:
[1] 流计算 Oceanus: http://cloud.tencent.com/product/oceanus
[2] Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务: http://cloud.tencent.com/developer/article/1895677
[3] Flink Data Types: http://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/types/
[4] Flink 系统(内置)函数: http://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/systemfunctions/
[5] Flink 实践教程:进阶8-自定义标量函数(UDF): http://cloud.tencent.com/developer/article/1946320
点击文末 「阅读原文」 ,了解腾讯云流计算 Oceanus 更多信息 ~
腾讯云大数据
长按二维码
关注我们
- 最佳实践:MySQL CDC 同步数据到 ES
- 腾讯云ES:一站式配置,TKE容器日志采集与分析就是这么简单!
- 速度提升10倍,腾讯基于Iceberg的数据治理与优化实践
- Flink 实践教程:入门(12):元数据的使用
- Flink Metrics&REST API 介绍和原理解析
- Flink 最佳实践:TDSQL Connector 的使用(上)
- Flink Watermark 机制及总结
- Flink 实践教程-进阶(10):自定义聚合函数(UDAF)
- Flink 实践教程-进阶(9):自定义表值函数(UDTF)
- 数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换
- Flink 实践教程-进阶(8):自定义标量函数(UDF)
- 实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu
- 基于流计算 Oceanus(Flink) CDC 做好数据集成场景
- Flink 实现 MySQL CDC 动态同步表结构
- 流计算 Oceanus | Flink JVM 内存超限的分析方法总结
- Flink 实践教程-进阶(6):CEP 复杂事件处理
- 腾讯云 AI 视觉产品基于流计算 Oceanus(Flink)的计费数据去重尝试
- 腾讯云原生实时数仓建设实践
- 专家带你吃透 Flink 架构:一个新版 Connector 的实现
- 流计算 Oceanus | 巧用 Flink 构建高性能 ClickHouse 实时数仓