Flink 实践教程-进阶(8):自定义标量函数(UDF)
作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将您详细介绍如何使用自定义标量函数(UDF),对随机产生的数据进行处理后存入 MySQL 中。
前置准备
创建流计算 Oceanus 集群
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
创建 MySQL 实例
进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库。
创建 MySQL 表
-- 建表语句,用于接收 Sink 端数据
CREATE TABLE `udf_output` (
`id` int(10) NOT NULL,
`len_name` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
开发 UDF
这里定义一个获取字符串字段长度的函数。如果传入一个字段,则获取这个字段的长度后返回;如果传入两个字段,则获取这两个字段的长度和后返回。
1. 代码编写
在本地IDE中创建 maven 项目,编写自定义函数UDF的代码。
// 类名:StringLengthUdf
package demos.UDF;
import org.apache.flink.table.functions.ScalarFunction;
public class StringLengthUdf extends ScalarFunction {
public long eval(String a) {
return a == null ? 0 : a.length();
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
}
2. 项目打包
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:
mvn clean package
命令行打包后生成的 JAR 包可以在项目 target 目录下找到。
流计算 Oceanus 作业
上传依赖
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 JAR 包。
创建 SQL 作业
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。单击【作业参数】,在【引用程序包】处选择刚才上传的 JAR 包。
1. 创建 Function
CREATE TEMPORARY SYSTEM FUNCTION StringLengthUdf AS 'demos.UDF.StringLengthUdf';
StringLengthUdf
代表创建的函数名, demos.UDF.StringLengthUdf
代表类路径。
2. 创建 Source
CREATE TABLE random_source (
id INT,
name1 VARCHAR,
name2 VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='1', -- 每秒产生的数据条数
'fields.id.kind'='sequence', -- 无界的随机数
'fields.id.start'='1', -- 随机数的最小值
'fields.id.end'='5', -- 随机数的最大值
'fields.name1.length'='10', -- 随机字符串的长度
'fields.name2.length'='10' -- 随机字符串的长度
);
3. 创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` (
`id` INT,
`len_name` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
-- 指定数据库连接参数
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数
'table-name' = 'udf_output', -- 需要写入的数据表
'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)
'password' = 'xxxxxxxxx', -- 数据库访问的密码
'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数
'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔
);
4. 编写业务 SQL
INSERT INTO jdbc_upsert_sink_table
SELECT
id,
CAST(StringLengthUdf(name1,name2) AS INT) AS `len_name`
FROM random_source;
运行作业
点击【发布草稿】->【运行版本】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
总结
本文首先在本地开发 UDF 函数,将其打成 JAR 包后上传到 Oceanus 平台引用。接下来使用 Datagen 连接器产生虚拟数据,调用 UDF 函数进行不同字段的字符串长度的加和操作后存入 MySQL 中。
-
自定义标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。
-
UDF 需要在 ScalarFunction 类中实现 eval 方法,且必须声明为 public 类型;自定义函数中 open 方法和 close 方法可选;可被重载,即在一个 UDF 中实现多个 eval 方法。
参考链接
[1] Oceanus 控制台: http://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群: http://cloud.tencent.com/document/product/849/48298
[3] MySQL 控制台: http://console.cloud.tencent.com/cdb
[4] 创建 MySQL 实例: http://cloud.tencent.com/document/product/236/46433
- 最佳实践:MySQL CDC 同步数据到 ES
- 腾讯云ES:一站式配置,TKE容器日志采集与分析就是这么简单!
- 速度提升10倍,腾讯基于Iceberg的数据治理与优化实践
- Flink 实践教程:入门(12):元数据的使用
- Flink Metrics&REST API 介绍和原理解析
- Flink 最佳实践:TDSQL Connector 的使用(上)
- Flink Watermark 机制及总结
- Flink 实践教程-进阶(10):自定义聚合函数(UDAF)
- Flink 实践教程-进阶(9):自定义表值函数(UDTF)
- 数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换
- Flink 实践教程-进阶(8):自定义标量函数(UDF)
- 实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu
- 基于流计算 Oceanus(Flink) CDC 做好数据集成场景
- Flink 实现 MySQL CDC 动态同步表结构
- 流计算 Oceanus | Flink JVM 内存超限的分析方法总结
- Flink 实践教程-进阶(6):CEP 复杂事件处理
- 腾讯云 AI 视觉产品基于流计算 Oceanus(Flink)的计费数据去重尝试
- 腾讯云原生实时数仓建设实践
- 专家带你吃透 Flink 架构:一个新版 Connector 的实现
- 流计算 Oceanus | 巧用 Flink 构建高性能 ClickHouse 实时数仓