Flink-cdc-connector源码分析

语言: CN / TW / HK

什么是cdc

CDC 的全称是 Change Data Capture ,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。 CDC 技术应用场景非常广泛:

  • 数据同步,用于备份,容灾
  • 数据分发,一个数据源分发给多个下游
  • 数据采集(E),面向数据仓库/数据湖的 ETL 数据集成

常用的cdc技术实现

目前业界开源的实现技术主要有,canal , debezium , maxwell 这三种 ,具体比较这里不详细解读了(参考:https://zhuanlan.zhihu.com/p/75357900

Flink-cdc-connector源码分析

flink-cdc-connector 使用了debezium-embeded-engine来实现从mysql获取binlog日志事件。

debezium本来是作为一个kafka connect plugin service 来实现的,后面 debezium社区把它抽象出来了,可以独立于kafka connect 来使用

我们先来看一下debezium嵌入式使用demo 代码如下:

public static void main(String[] args) {
    // Define the configuration for the Debezium Engine with MySQL connector...
    final Properties props = new Properties();
    props.setProperty("name", "my-engine");
    props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
    props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
    props.setProperty("offset.storage.file.filename",
        "/Users/taox/xdf/git/myself/learning/debezium-learning/src/main/resources/offsets.dat");
    props.setProperty("offset.flush.interval.ms", "20000");
    /* begin connector properties */
    props.setProperty("database.hostname", "localhost");
    props.setProperty("database.port", "3306");
    props.setProperty("database.user", "root");
    props.setProperty("database.password", "root");
    props.setProperty("database.server.id", "857442");
    props.setProperty("database.server.name", "my-app-connector2");
    props.setProperty("snapshot.mode", "schema_only");
    props.setProperty("database.include.list", "flink_sql");
    props.setProperty("table.include.list", "flink_sql.orders");
    props.setProperty("database.history",
        "io.debezium.relational.history.FileDatabaseHistory");
    props.setProperty("database.history.file.filename",
        "/Users/taox/xdf/git/myself/learning/debezium-learning/src/main/resources/dbhistory.dat");

    // Create the engine with this configuration ...
    try {
        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
          .using(props)
          .notifying(record -> {
            System.out.println("key: " + record.key());
            System.out.println("value: " + record.value());
          }).using((success, message, error) -> {
            System.out.println("complete:" + message);
            if (error != null) {
              ExceptionUtils.rethrow(error);
            }
          }).build();

      // Run the engine asynchronously ...
      ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
          Thread thread = new Thread(r);
          thread.setDaemon(false);
          return thread;
        }
      });
      executor.execute(engine);
      // Do something else or wait for a signal or an event
      System.out.println("end");
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
    }
    // Engine is stopped when the main code is finished
  }

使用 DebeziumEngine.create 方法构建 DebeziumEngine 对象。 用标准的Properties对象传入配置属性,其中重要的属性如下:

  • connector.class: 连接器类,如MysqlConnector
  • offset.storage:binlog position存储实现类

需要把binlog position保存起来,以便程序挂掉后重启还可以接着从保存的位点开始消费

  • database.history:主库历史schemea存储实现类

这里需要记录mysql库表的schema信息,以便恢复数据和扩充信息(因为binlog里并不包含字段信息)

  • database.* :mysql master 相关信息 最后使用ExecutorService来启动,当接受到binlog的insert,update,delete事件后 通过 notifying 回调通知,业务处理代码主要在这个地方完成。

flink-cdc-connector中使用 我们下来看一下在flink datastream 中用法,代码如下:

public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
        .hostname("localhost")
        .port(3306)
        .databaseList("flink_sql") // monitor all tables under inventory database
        .username("root")
        .password("root")
        .tableList("flink_sql.orders")
        .startupOptions(StartupOptions.latest())
        .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env
        .addSource(sourceFunction)
        .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute();
  }

通过MySQLSource.build 来构建一个flink SourceFunction *** DebeziumSourceFunction*** , 我们来看一下实现: 这个类的继承结构如下:

public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> 

该算子实现了 CheckpointedFunction,CheckpointListener来做状态存储(主要是存储offsets,databasehistory)。 在看一下 run 方法,通过 DebeziumEngine.build来创建DebeziumEngine,并启动

public void run(SourceContext<T> sourceContext) throws Exception {
        properties.setProperty("name", "engine");
        properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
        if (restoredOffsetState != null) {
            // restored from state
            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
        }
        // DO NOT include schema payload in change event
        properties.setProperty("key.converter.schemas.enable", "false");
        properties.setProperty("value.converter.schemas.enable", "false");
        // DO NOT include schema change, e.g. DDL
        properties.setProperty("include.schema.changes", "false");
        // disable the offset flush totally
        properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
        // disable tombstones
        properties.setProperty("tombstones.on.delete", "false");
        // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
        // continue to read binlog
        // see
        // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
        // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
        properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
        if (engineInstanceName == null) {
            // not restore from recovery
            engineInstanceName = UUID.randomUUID().toString();
            FlinkDatabaseHistory.registerEmptyHistoryRecord(engineInstanceName);
        }
        // history instance name to initialize FlinkDatabaseHistory
        properties.setProperty(
                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);

        // we have to filter out the heartbeat events, otherwise the deserializer will fail
        String dbzHeartbeatPrefix =
                properties.getProperty(
                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
        this.debeziumConsumer =
                new DebeziumChangeConsumer<>(
                        sourceContext,
                        deserializer,
                        restoredOffsetState == null, // DB snapshot phase if restore state is null
                        this::reportError,
                        dbzHeartbeatPrefix);

        // create the engine with this configuration ...
        this.engine =
                DebeziumEngine.create(Connect.class)
                        .using(properties)
                        .notifying(debeziumConsumer)
                        .using(OffsetCommitPolicy.always())
                        .using(
                                (success, message, error) -> {
                                    if (!success && error != null) {
                                        this.reportError(error);
                                    }
                                })
                        .build();

        if (!running) {
            return;
        }

        // run the engine asynchronously
        executor.execute(engine);
        debeziumStarted = true;

        // initialize metrics
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        metricGroup.gauge(
                "currentFetchEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getFetchDelay());
        metricGroup.gauge(
                "currentEmitEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getEmitDelay());
        metricGroup.gauge("sourceIdleTime", (Gauge<Long>) () -> debeziumConsumer.getIdleTime());

        // on a clean exit, wait for the runner thread
        try {
            while (running) {
                if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    break;
                }
                if (error != null) {
                    running = false;
                    shutdownEngine();
                    // rethrow the error from Debezium consumer
                    ExceptionUtils.rethrow(error);
                }
            }
        } catch (InterruptedException e) {
            // may be the result of a wake-up interruption after an exception.
            // we ignore this here and only restore the interruption state
            Thread.currentThread().interrupt();
        }
    }

其中,offset.storage 使用了FlinkOffsetBackingStore 类来实现 position的持久化。database.history 使用了FlinkDatabaseHistory类来存储历史schema信息。

我们先来看一下 FlinkOffsetBackingStore 这个类的主要作用是存储和获取offset,这里并不直接和flink的state storeage打交道,看一下类注释

A implementation of OffsetBackingStore backed on Flink's state mechanism.
The OFFSET_STATE_VALUE in the WorkerConfig is the raw position and offset data in JSON format. It is set into the config when recovery from failover by DebeziumSourceFunction before startup the DebeziumEngine. If it is not a restoration, the OFFSET_STATE_VALUE is empty. DebeziumEngine relies on the OffsetBackingStore for failover recovery.

注意其中的 OFFSET_STATE_VALUE 变量,这个在任务失败恢复的时候会通过 DebeziumEngine 配置来初始化值,这个值从flink state 里设置。 而真正做offset状态持久化存储的是在 DebeziumSourceFunction类的 snapshotState 方法里 ,这里会继续调用 snapshotOffsetState 方法,看一下代码,

private void snapshotOffsetState(long checkpointId) throws Exception {
        offsetState.clear();

        final DebeziumChangeConsumer<?> consumer = this.debeziumConsumer;

        byte[] serializedOffset = null;
        if (consumer == null) {
            // the consumer has not yet been initialized, which means we need to return the
            // originally restored offsets
            if (restoredOffsetState != null) {
                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
            }
        } else {
            byte[] currentState = consumer.snapshotCurrentState();
            if (currentState == null && restoredOffsetState != null) {
                // the consumer has been initialized, but has not yet received any data,
                // which means we need to return the originally restored offsets
                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
            } else {
                serializedOffset = currentState;
            }
        }

        if (serializedOffset != null) {
            offsetState.add(serializedOffset);
            // the map cannot be asynchronously updated, because only one checkpoint call
            // can happen on this function at a time: either snapshotState() or
            // notifyCheckpointComplete()
            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
            // truncate the map of pending offsets to commit, to prevent infinite growth
            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                pendingOffsetsToCommit.remove(0);
            }
        }
    }

里面用到里 offsetState 这个 ListState<byte[]> 来存储offset , 它的值通过 consumer.snapshotCurrentState(); 这个方法来获取 ,看一下方法实现

public byte[] snapshotCurrentState() throws Exception {
        // this method assumes that the checkpoint lock is held
        assert Thread.holdsLock(checkpointLock);
        if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
            return null;
        }

        return stateSerializer.serialize(debeziumOffset);
    }

offset的值就在 debeziumOffset 变量里,这个变量的值回在DebeziumChangeConsumer.handleBatch 里进行更新 ,当每来一个crud 事件的时候,就回调用这个方法,这个方法最终回调用 emitRecordsUnderCheckpointLock 这个进行 event向下游发送并更新debeziumOffset 变量的值。

注意: 这里会使用同步checkpointLock , emit the records, using the checkpoint lock to guarantee atomicity of record emission and offset state update 。

做完以上步骤后,offsetState就会通过checkpoint机制持久化了。最后一步是通过 notifyCheckpointComplete() 回调方法,来把已经checkpoint的offsetState值,写回 debezium FlinkOffsetBackingStore 存储里,实际调用:

DebeziumOffset offset =
                    DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
            consumer.commitOffset(offset);

最终回调用:FlinkOffsetBackingStore.set 方法。到此 offset 状态持久化存储流程就走完了。 再来看一下状态恢复流程,调用 DebeziumSourceFunction.initializeState 方法,context.isRestored() 返回true , 通过 offsetState 获取到状态值,赋值给 restoredOffsetState 变量,最后在 run 方法里会把这个变量值设置给 properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState); debezium 启动时,会初始化 FlinkOffsetBackingStore 调用它的 configure 方法,完成offset 初始化。

@Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.offsetState =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
        this.historyRecordsState =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));

        if (context.isRestored()) {
            restoreOffsetState();
            restoreHistoryRecordsState();
        } else {
            if (specificOffset != null) {
                byte[] serializedOffset =
                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
                LOG.info(
                        "Consumer subtask {} starts to read from specified offset {}.",
                        getRuntimeContext().getIndexOfThisSubtask(),
                        restoredOffsetState);
            } else {
                LOG.info(
                        "Consumer subtask {} has no restore state.",
                        getRuntimeContext().getIndexOfThisSubtask());
            }
        }
    }

以上完成了,flink-cdc-connector 启动,offset存储和恢复源码分析,对于 FlinkDatabaseHistory 大致原理差不多,读者可以自行阅读源码。

最后总结一下flink-cdc-connector的一些特性,参考github文档里总结

  • Exactly-Once Processing (可以实现精确一致的处理)

The MySQL CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with exactly-once processing even failures happen. Please read How the connector performs database snapshot.

  • Startup Reading Position (可以从指定位点消费binlog)

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer.

  • Single Thread Reading (只能是一个并行度处理)

The MySQL CDC source can't work in parallel reading, because there is only one task can receive binlog events.

  • Supports reading database snapshot and continues to read binlogs(支持做初始化快照,但初始化快照需要对数据库加锁)

目前社区正在进行flink cdc 2.0的开发,来解决单并行度和数据库加锁的问题 参考:https://github.com/ververica/flink-cdc-connectors/pull/233