Flink connecton for gbase8c
上一次發文,好像還是上一次,鴿了這麼久,開始還是有一些心理負擔的,但是時間長了,好像就坦然了一些,但問題終究還是要面對的,所以今天我來了。。。
因為一些原因,研究的方向有了一些變動,目前以分散式叢集架構,以及編譯器為主。相信關注我一段時間的朋友,也發現了 JimSQL 最近也做了一次大更新,有興趣的同學,可以一起交流。
好了,今天我們來分享,手把手構建 Flink connector GBase8c , 各位讀者老爺們扣Q上車... Let's go !!!
GBase8c
GBase 8c,是 南 大通用分散式交易 型資料庫 管理 系統的簡稱, 是一款 shared nothing架構的分散式交易型資料庫叢集。 具備高效能、高可用、彈性伸縮、高安全性等特性,可以部署在物理機、虛擬機器、容器、私有云和公有云,為金融核心系統、網際網路業務系統和政企業務系統提供安全、穩定、可靠的資料儲存和管理服務。
技術特點
作為一款金融級分散式交易型資料庫產品,GBase 8c具有強一致性的全域性事務、計算儲存分離、靈活的資料分佈、靈活的部署方式、線上擴容縮容、線上升級、資料高可用、高安全性、異地多活、資料高效載入、叢集備份恢復、易維護、標準化、相容國產生態等技術特徵。
強一致性全域性事務
GBase 8c採用兩階段提交協議和全域性事務號來保證全域性事務的強一致性,每個跨節點的事務,要麼全部成功,要麼全部失敗,不會出現某些節點事務成功,另外一些節點事務失敗的情況,實現全域性事務的強一致性。GBase 8c的事務處理具有自動容錯能力,某個正在處理事務的節點發生故障後,新的節點會繼續進行未完成的事務處理,而不需要應用程式重新請求。
計算儲存分離
GBase 8c採用shared nothing架構,計算和儲存分離。可以根據業務需求,對計算能力和儲存能力分別進行水平擴充套件,達到降低總體擁有成本的目的。
靈活的資料分佈
使用者可以按照業務場景的需要,選擇資料分佈策略,從而在效能、可靠性和靈活性間獲得最佳匹配。
GBase 8c支援複製表和分佈表。複製表用於儲存只讀或者讀多寫少的資料,可以在本地進行和分佈表的聯合查詢,從而大幅提升查詢的效能。分佈表用於儲存單表規模較大的資料,通過Hash等方式分佈到各個儲存節點,降低單表資料量,提升資料讀寫效能。
配置GBase8c
# 建立使用者
gbase=# create user jacky with password 'jacky';
ERROR: Password must contain at least three kinds of characters.
# 需要符合密碼規範
gbase=# create user jacky with password '[email protected]';
NOTICE: The encrypted password contains MD5 ciphertext, which is not secure.
CREATE ROLE
# 建立資料庫
gbase=# create database test owner jacky;
CREATE DATABASE
# 授權
gbase=# grant all privileges on database test to jacky;
GRANT
gbase=# alter role jacky createdb;
ALTER ROLE
gbase=# grant all privileges to jacky;
ALTER ROLE
Flink connector GBase8c
我們其實是可以自己手寫Sink將CDC的資料直接匯入我們的目標資料庫的。這樣是不是不夠優雅?我們是不是可以通過Flink SQL的方式將資料匯入到GBase8c呢?答案是肯定的,接下來我們就來實現一個簡單的GBase8c的Flink connector
-
構建 行轉換器(RowConverter)
-
構建 方言(Dialect)
-
註冊動態表工廠(DynamicTableFactory),以及相關Sink程式
經過上面三步,就可以實現一個簡單的connector了。接下來我們就來看,如何實現:
構建 行轉換器(RowConverter)
package name.lijiaqi.converter;
import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;
/**
* @author lijiaqi
*/
public class GBase8cRowConverter extends AbstractJdbcRowConverter {
public GBase8cRowConverter(RowType rowType) {
super(rowType);
}
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "gbase8c";
}
}
構建 方言(Dialect)
package name.lijiaqi.dialect;
import name.lijiaqi.converter.GBase8cRowConverter;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import java.util.Optional;
/**
*
* @author lijiaqi
*/
public class GBase8cDialect implements JdbcDialect {
private static final long serialVersionUID = 1L;
@Override
public String dialectName() {
return "gbase8c";
}
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:opengauss:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new GBase8cRowConverter(rowType);
}
@Override
public String getLimitClause(long l) {
return null;
}
@Override
public void validate(TableSchema schema) throws ValidationException {
JdbcDialect.super.validate(schema);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.opengauss.Driver");
}
@Override
public String quoteIdentifier(String identifier) {
return "'" + identifier + "'";
}
@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
}
@Override
public String getRowExistsStatement(String tableName, String[] conditionFields) {
return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
}
@Override
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
}
@Override
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
}
@Override
public String getDeleteStatement(String tableName, String[] conditionFields) {
return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
}
@Override
public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
}
}
註冊動態表工廠(DynamicTableFactory),以及相關Sink程式
首先建立 GBase8cSinkFunction
用於接受RowData資料輸入,並將其Sink到配置的資料庫中
package name.lijiaqi.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
/**
* @author lijiaqi
*/
public class GBase8cDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
private final DataType dataType;
public GBase8cDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
this.jdbcOptions = jdbcOptions;
this.encodingFormat = encodingFormat;
this.dataType = dataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
System.out.println("SinkRuntimeProvider");
System.out.println(dataType);
// SerializationSchema<RowData> serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
GBase8cSinkFunction gbasedbtSinkFunction = new GBase8cSinkFunction(jdbcOptions,dataType);
return SinkFunctionProvider.of(gbasedbtSinkFunction);
}
@Override
public DynamicTableSink copy() {
return new GBase8cDynamicTableSink(jdbcOptions, encodingFormat, dataType);
}
@Override
public String asSummaryString() {
return "GBase8c Table Sink";
}
}
構建 GBase8cDynamicTableSink
package name.lijiaqi.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
/**
* @author lijiaqi
*/
public class GBase8cDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
private final DataType dataType;
public GBase8cDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
this.jdbcOptions = jdbcOptions;
this.encodingFormat = encodingFormat;
this.dataType = dataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
System.out.println("SinkRuntimeProvider");
System.out.println(dataType);
// SerializationSchema<RowData> serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
GBase8cSinkFunction gbasedbtSinkFunction = new GBase8cSinkFunction(jdbcOptions,dataType);
return SinkFunctionProvider.of(gbasedbtSinkFunction);
}
@Override
public DynamicTableSink copy() {
return new GBase8cDynamicTableSink(jdbcOptions, encodingFormat, dataType);
}
@Override
public String asSummaryString() {
return "GBase8c Table Sink";
}
}
構建 GBase8cDynamicTableFactory
package name.lijiaqi.table;
import name.lijiaqi.dialect.GBase8cDialect;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.util.HashSet;
import java.util.Set;
/**
* @author lijiaqi
*/
public class GBase8cDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "gbase8c";
private static final String DRIVER_NAME = "org.opengauss.Driver";
public static final ConfigOption<String> URL = ConfigOptions
.key("url")
.stringType()
.noDefaultValue()
.withDescription("the jdbc database url.");
public static final ConfigOption<String> DRIVER = ConfigOptions
.key("driver")
.stringType()
.defaultValue(DRIVER_NAME)
.withDescription("the jdbc driver.");
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions
.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
// public static final ConfigOption<String> FORMAT = ConfigOptions
// .key("format")
// .stringType()
// .noDefaultValue()
// .withDescription("the format.");
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(TABLE_NAME);
requiredOptions.add(USERNAME);
requiredOptions.add(PASSWORD);
// requiredOptions.add(FORMAT);
return requiredOptions;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
JdbcOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new GBase8cDynamicTableSource(jdbcOptions, physicalSchema);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
// SerializationFormatFactory.class,
// FactoryUtil.FORMAT);
final ReadableConfig config = helper.getOptions();
// validate all options
helper.validate();
// get the validated options
JdbcOptions jdbcOptions = getJdbcOptions(config);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
// table sink
return new GBase8cDynamicTableSink(jdbcOptions, null, dataType);
}
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
final JdbcOptions.Builder builder = JdbcOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setDialect(new GBase8cDialect());
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
}
}
接下來通過SPI註冊動態表:建立檔案 resources\META-INF\services\org.apache.flink.table.factories.Factory
內容註冊為 name.lijiaqi.table.GBase8cDynamicTableFactory
至此,我們的Flink connector 就構建完成,接下來,我們要使用其,來完成一個真正的專案。
CDC實戰
下面是專案的整體架構圖,我們通過flink cdc 從mysql獲取變更資料,然後通過 flink sql 將資料 sink 到 GBase8c裡
接下來,我們看一下如何通過Flink SQL實現CDC ,只需3條SQL語句即可。
建立資料來源表
// 資料來源表
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" memo STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'dafei1288',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 't1'\n" +
")";
建立輸出表,輸出到opengauss ,這裡 connector設定成opengauss
String url = "jdbc:postgresql://127.0.0.1:5432/test";
String userName = "jacky";
String password = "[email protected]";
String gbasedbtSinkTable = "t1";
// 輸出目標表
String sinkDDL =
"CREATE TABLE test_cdc_sink (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" memo STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED \n " +
") WITH (\n" +
" 'connector' = 'gbase8c',\n" +
// " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
" 'url' = '" + url + "',\n" +
" 'username' = '" + userName + "',\n" +
" 'password' = '" + password + "',\n" +
" 'table-name' = '" + gbasedbtSinkTable + "' \n" +
")";
這裡我們直接將資料匯入
String transformSQL =
"insert into test_cdc_sink select * from mysql_binlog";
完整參考程式碼
package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToGBase8cMain {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 資料來源表
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" memo STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'dafei1288',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 't1'\n" +
")";
String url = "jdbc:postgresql://127.0.0.1:5432/test";
String userName = "jacky";
String password = "[email protected]";
String gbasedbtSinkTable = "t1";
// 輸出目標表
String sinkDDL =
"CREATE TABLE test_cdc_sink (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" memo STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED \n " +
") WITH (\n" +
" 'connector' = 'gbase8c',\n" +
// " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
" 'url' = '" + url + "',\n" +
" 'username' = '" + userName + "',\n" +
" 'password' = '" + password + "',\n" +
" 'table-name' = '" + gbasedbtSinkTable + "' \n" +
")";
String transformSQL =
"insert into test_cdc_sink select * from mysql_binlog";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("sync-flink-cdc");
}
}
執行結果
工程地址如下 , 歡迎start,for,pull requests
http://github.com/dafei1288/flink-connector-gbase8c
- END -
歷史文章導讀 :
你好,我是 +7 ,一個大資料領域的硬核原創作者。
做過後端架構、資料庫中介軟體、資料平臺&架構、產品。
專注大資料領域,資料庫領域實時動態&技術提升&個人成長&職場進階,歡迎關注。
如果檔案對您有點幫助,請關注、分享,幫助更多人~非常感謝
- Flink connecton for gbase8c
- 資料資源分享
- 端午節跟大佬們捲起來
- 十分鐘搭建實驗分散式資料庫環境
- 完整性和一致性基石——GBase8s鎖淺析
- 別吐槽,學就完事了~
- 通過UDR擴充套件GBase8s查詢行為的工程實踐
- 史上最全系列 | 大資料框架知識點彙總(資源分享、還不快拿去!)
- 非科班如何成功轉行大資料?
- Springboot Openjpa 整合 GBase8s 最佳實踐
- 淺談開源之道
- 資料倉庫漫談-前世今生
- 這就是一個數據人想要的...
- Java 核心類庫一覽
- 33張圖解flink sql應用提交(建議收藏!)
- 位元組面試:說透了資料結構,這次穩了!
- 兩隻松鼠的故事:flink-connector-opengauss
- 如何用Flink整合hudi,構架滄湖一體化解決方案
- 一圖全解kafka在zookeeper中的資料結構
- 手動實現一門圖靈完備的程式語言——Brainfuck