Flink 實踐教程-進階(10):自定義聚合函式(UDAF)

語言: CN / TW / HK

作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介  

流計算 Oceanus 是大資料產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連線、亞秒延時、低廉成本、安全穩定等特點的企業級實時大資料分析平臺。流計算 Oceanus 以實現企業資料價值最大化為目標,加速企業實時化數字化的建設程序。

本文將為您詳細介紹如何使用自定義聚合函式(UDAF),將處理後的存入 MySQL 中。

前置準備

建立流計算 Oceanus 叢集

進入 Oceanus 控制檯 [1],點選左側【叢集管理】,點選左上方【建立叢集】,具體可參考 Oceanus 官方文件 建立獨享叢集 [2]。

建立 MySQL 例項

進入 MySQL 控制檯 [3],點選【新建】。具體可參考官方文件 建立 MySQL 例項 [4]。進入例項後,單擊右上角【登陸】即可登陸 MySQL 資料庫。

建立 MySQL 表

-- 建表語句,用於向 Source 提供資料
CREATE TABLE `udaf_input` (
`id` int(10) NOT NULL,
`product` varchar(50) DEFAULT '',
`value` int(10) DEFAULT NULL,
`weight` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8


-- 插入資料
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (1, 'oceanus-1', 2, 2);
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (2, 'oceanus-1', 3, 3);
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (3, 'oceanus-2', 5, 4);
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (5, 'oceanus-2', 6, 5);


-- 建表語句,用於接收 Sink 端資料
CREATE TABLE `udaf_output` (
`product` varchar(50) NOT NULL DEFAULT '',
`sum` double(11,0) DEFAULT NULL,
PRIMARY KEY (`product`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

開發 UDAF

我們自定義一個 UDAF,繼承 AggregateFunction ,對運算元輸入的兩個欄位計算加權平均值。

1. 程式碼編寫

WeightedAvgAccumulator 類:

package demos.UDAF;


public class WeightedAvgAccumulator{
public long sum = 0;
public int count = 0;
}

WeightedAvg 類:

package demos.UDAF;


import org.apache.flink.table.functions.AggregateFunction;


public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {


@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}


@Override
public Long getValue(WeightedAvgAccumulator acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}


public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}


public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}


public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
for (WeightedAvgAccumulator a : it) {
acc.count += a.count;
acc.sum += a.sum;
}
}


public void resetAccumulator(WeightedAvgAccumulator acc) {
acc.count = 0;
acc.sum = 0L;
}
}

2. 打包 Jar

使用 IDEA 自帶打包工具 Build Artifacts 或者命令列進行打包。命令列打包命令:

mvn clean package

命令列打包後生成的 Jar 包可以在專案 target 目錄下找到。

流計算 Oceanus 作業

上傳依賴

在 Oceanus 控制檯,點選左側【依賴管理】,點選左上角【新建】新建依賴,上傳本地 Jar 包。

建立 SQL 作業

在 Oceanus 控制檯,點選左側【作業管理】,點選左上角【新建】新建作業,作業型別選擇 SQL 作業,點選【開發除錯】進入作業編輯頁面。 單擊【作業引數】,在【引用程式包】處選擇剛才上傳的 Jar 包。

1. 建立 Function

CREATE TEMPORARY SYSTEM FUNCTION WeightedAvg  AS 'demos.UDAF.WeightedAvg';

WeightedAvg 代表建立的函式名, demos.UDAF.WeightedAvg 代表程式碼所在路徑。

2. 建立 Source

CREATE TABLE `mysql_cdc_source_table` (
`id` INT,
`product` VARCHAR,
`value` INT,
`weight` INT,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的資料庫表定義了主鍵, 則這裡也需要定義
) WITH (
'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
'hostname' = 'xx.xx.xx.xx', -- 資料庫的 IP
'port' = 'xxxx', -- 資料庫的訪問埠
'username' = 'root', -- 資料庫訪問的使用者名稱(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 許可權)
'password' = 'xxxxxxxxx', -- 資料庫訪問的密碼
'database-name' = 'testdb', -- 需要同步的資料庫
'table-name' = 'udaf_input' -- 需要同步的資料表名
);

3. 建立 Sink

CREATE TABLE `jdbc_source_table` (
`product` VARCHAR,
`sum` DOUBLE,
PRIMARY KEY(`product`) NOT ENFORCED
) WITH (
-- 指定資料庫連線引數
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 請替換為您的實際 MySQL 連線引數
'table-name' = 'udaf_output', -- 需要寫入的資料表
'username' = 'root', -- 資料庫訪問的使用者名稱(需要提供 INSERT 許可權)
'password' = 'xxxxxxxxx', -- 資料庫訪問的密碼
'sink.buffer-flush.max-rows' = '200', -- 批量輸出的條數
'sink.buffer-flush.interval' = '2s' -- 批量輸出的間隔
);

4. 編寫業務 SQL

INSERT INTO jdbc_source_table
SELECT
product,CAST(WeightedAvg(`value`,`weight`) AS DOUBLE) AS `sum`
FROM mysql_cdc_source_table GROUP BY `product`;

總結

本文首先在本地開發 UDAF 函式,將其打成 Jar 包後上傳到 Oceanus 平臺引用。接下來使用 MySQL CDC 聯結器獲取 udaf_input 表資料,呼叫 UDAF 函式對輸入的兩個欄位計算加權平均值後存入 MySQL 中。 其他的自定義函式,例如自定義標量函式(UDF)和自定義表值函式(UDTF)的使用方法和影片教程可以參考之前的文章 Flink 實踐教程:進階8-自定義標量函式(UDF) [5]、 Flink 實踐教程:進階9-自定義表值函式(UDTF) [6]

  • 自定義聚合函式(UDAF)可以將多條記錄聚合成 1 條記錄。

參考連結

[1] Oceanus 控制檯: https://console.cloud.tencent.com/oceanus/overview  

[2] 建立獨享叢集: https://cloud.tencent.com/document/product/849/48298  

[3] MySQL 控制檯: https://console.cloud.tencent.com/cdb  

[4] 建立 MySQL 例項: https://cloud.tencent.com/document/product/236/46433  

[5] Flink 實踐教程:進階8-自定義標量函式(UDF): https://cloud.tencent.com/developer/article/1946320  

[6] Flink 實踐教程:進階9-自定義表值函式(UDTF): https://cloud.tencent.com/developer/article/1951900  

流計算 Oceanus  限量秒殺專享活動火爆進行中↓↓

點選文末 「閱讀原文」 ,瞭解騰訊雲流計算 Oceanus 更多資訊 ~

騰訊雲大資料

長按二維碼
關注我們