Flink-cdc-connector源碼分析

語言: CN / TW / HK

什麼是cdc

CDC 的全稱是 Change Data Capture ,在廣義的概念上,只要能捕獲數據變更的技術,我們都可以稱為 CDC 。通常我們説的 CDC 技術主要面向數據庫的變更,是一種用於捕獲數據庫中數據變更的技術。 CDC 技術應用場景非常廣泛:

  • 數據同步,用於備份,容災
  • 數據分發,一個數據源分發給多個下游
  • 數據採集(E),面向數據倉庫/數據湖的 ETL 數據集成

常用的cdc技術實現

目前業界開源的實現技術主要有,canal , debezium , maxwell 這三種 ,具體比較這裏不詳細解讀了(參考:https://zhuanlan.zhihu.com/p/75357900

Flink-cdc-connector源碼分析

flink-cdc-connector 使用了debezium-embeded-engine來實現從mysql獲取binlog日誌事件。

debezium本來是作為一個kafka connect plugin service 來實現的,後面 debezium社區把它抽象出來了,可以獨立於kafka connect 來使用

我們先來看一下debezium嵌入式使用demo 代碼如下:

public static void main(String[] args) {
    // Define the configuration for the Debezium Engine with MySQL connector...
    final Properties props = new Properties();
    props.setProperty("name", "my-engine");
    props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
    props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
    props.setProperty("offset.storage.file.filename",
        "/Users/taox/xdf/git/myself/learning/debezium-learning/src/main/resources/offsets.dat");
    props.setProperty("offset.flush.interval.ms", "20000");
    /* begin connector properties */
    props.setProperty("database.hostname", "localhost");
    props.setProperty("database.port", "3306");
    props.setProperty("database.user", "root");
    props.setProperty("database.password", "root");
    props.setProperty("database.server.id", "857442");
    props.setProperty("database.server.name", "my-app-connector2");
    props.setProperty("snapshot.mode", "schema_only");
    props.setProperty("database.include.list", "flink_sql");
    props.setProperty("table.include.list", "flink_sql.orders");
    props.setProperty("database.history",
        "io.debezium.relational.history.FileDatabaseHistory");
    props.setProperty("database.history.file.filename",
        "/Users/taox/xdf/git/myself/learning/debezium-learning/src/main/resources/dbhistory.dat");

    // Create the engine with this configuration ...
    try {
        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
          .using(props)
          .notifying(record -> {
            System.out.println("key: " + record.key());
            System.out.println("value: " + record.value());
          }).using((success, message, error) -> {
            System.out.println("complete:" + message);
            if (error != null) {
              ExceptionUtils.rethrow(error);
            }
          }).build();

      // Run the engine asynchronously ...
      ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
          Thread thread = new Thread(r);
          thread.setDaemon(false);
          return thread;
        }
      });
      executor.execute(engine);
      // Do something else or wait for a signal or an event
      System.out.println("end");
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
    }
    // Engine is stopped when the main code is finished
  }

使用 DebeziumEngine.create 方法構建 DebeziumEngine 對象。 用標準的Properties對象傳入配置屬性,其中重要的屬性如下:

  • connector.class: 連接器類,如MysqlConnector
  • offset.storage:binlog position存儲實現類

需要把binlog position保存起來,以便程序掛掉後重啟還可以接着從保存的位點開始消費

  • database.history:主庫歷史schemea存儲實現類

這裏需要記錄mysql庫表的schema信息,以便恢復數據和擴充信息(因為binlog裏並不包含字段信息)

  • database.* :mysql master 相關信息 最後使用ExecutorService來啟動,當接受到binlog的insert,update,delete事件後 通過 notifying 回調通知,業務處理代碼主要在這個地方完成。

flink-cdc-connector中使用 我們下來看一下在flink datastream 中用法,代碼如下:

public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
        .hostname("localhost")
        .port(3306)
        .databaseList("flink_sql") // monitor all tables under inventory database
        .username("root")
        .password("root")
        .tableList("flink_sql.orders")
        .startupOptions(StartupOptions.latest())
        .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env
        .addSource(sourceFunction)
        .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute();
  }

通過MySQLSource.build 來構建一個flink SourceFunction *** DebeziumSourceFunction*** , 我們來看一下實現: 這個類的繼承結構如下:

public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> 

該算子實現了 CheckpointedFunction,CheckpointListener來做狀態存儲(主要是存儲offsets,databasehistory)。 在看一下 run 方法,通過 DebeziumEngine.build來創建DebeziumEngine,並啟動

public void run(SourceContext<T> sourceContext) throws Exception {
        properties.setProperty("name", "engine");
        properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
        if (restoredOffsetState != null) {
            // restored from state
            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
        }
        // DO NOT include schema payload in change event
        properties.setProperty("key.converter.schemas.enable", "false");
        properties.setProperty("value.converter.schemas.enable", "false");
        // DO NOT include schema change, e.g. DDL
        properties.setProperty("include.schema.changes", "false");
        // disable the offset flush totally
        properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
        // disable tombstones
        properties.setProperty("tombstones.on.delete", "false");
        // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
        // continue to read binlog
        // see
        // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
        // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
        properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
        if (engineInstanceName == null) {
            // not restore from recovery
            engineInstanceName = UUID.randomUUID().toString();
            FlinkDatabaseHistory.registerEmptyHistoryRecord(engineInstanceName);
        }
        // history instance name to initialize FlinkDatabaseHistory
        properties.setProperty(
                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);

        // we have to filter out the heartbeat events, otherwise the deserializer will fail
        String dbzHeartbeatPrefix =
                properties.getProperty(
                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
        this.debeziumConsumer =
                new DebeziumChangeConsumer<>(
                        sourceContext,
                        deserializer,
                        restoredOffsetState == null, // DB snapshot phase if restore state is null
                        this::reportError,
                        dbzHeartbeatPrefix);

        // create the engine with this configuration ...
        this.engine =
                DebeziumEngine.create(Connect.class)
                        .using(properties)
                        .notifying(debeziumConsumer)
                        .using(OffsetCommitPolicy.always())
                        .using(
                                (success, message, error) -> {
                                    if (!success && error != null) {
                                        this.reportError(error);
                                    }
                                })
                        .build();

        if (!running) {
            return;
        }

        // run the engine asynchronously
        executor.execute(engine);
        debeziumStarted = true;

        // initialize metrics
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        metricGroup.gauge(
                "currentFetchEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getFetchDelay());
        metricGroup.gauge(
                "currentEmitEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getEmitDelay());
        metricGroup.gauge("sourceIdleTime", (Gauge<Long>) () -> debeziumConsumer.getIdleTime());

        // on a clean exit, wait for the runner thread
        try {
            while (running) {
                if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    break;
                }
                if (error != null) {
                    running = false;
                    shutdownEngine();
                    // rethrow the error from Debezium consumer
                    ExceptionUtils.rethrow(error);
                }
            }
        } catch (InterruptedException e) {
            // may be the result of a wake-up interruption after an exception.
            // we ignore this here and only restore the interruption state
            Thread.currentThread().interrupt();
        }
    }

其中,offset.storage 使用了FlinkOffsetBackingStore 類來實現 position的持久化。database.history 使用了FlinkDatabaseHistory類來存儲歷史schema信息。

我們先來看一下 FlinkOffsetBackingStore 這個類的主要作用是存儲和獲取offset,這裏並不直接和flink的state storeage打交道,看一下類註釋

A implementation of OffsetBackingStore backed on Flink's state mechanism.
The OFFSET_STATE_VALUE in the WorkerConfig is the raw position and offset data in JSON format. It is set into the config when recovery from failover by DebeziumSourceFunction before startup the DebeziumEngine. If it is not a restoration, the OFFSET_STATE_VALUE is empty. DebeziumEngine relies on the OffsetBackingStore for failover recovery.

注意其中的 OFFSET_STATE_VALUE 變量,這個在任務失敗恢復的時候會通過 DebeziumEngine 配置來初始化值,這個值從flink state 裏設置。 而真正做offset狀態持久化存儲的是在 DebeziumSourceFunction類的 snapshotState 方法裏 ,這裏會繼續調用 snapshotOffsetState 方法,看一下代碼,

private void snapshotOffsetState(long checkpointId) throws Exception {
        offsetState.clear();

        final DebeziumChangeConsumer<?> consumer = this.debeziumConsumer;

        byte[] serializedOffset = null;
        if (consumer == null) {
            // the consumer has not yet been initialized, which means we need to return the
            // originally restored offsets
            if (restoredOffsetState != null) {
                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
            }
        } else {
            byte[] currentState = consumer.snapshotCurrentState();
            if (currentState == null && restoredOffsetState != null) {
                // the consumer has been initialized, but has not yet received any data,
                // which means we need to return the originally restored offsets
                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
            } else {
                serializedOffset = currentState;
            }
        }

        if (serializedOffset != null) {
            offsetState.add(serializedOffset);
            // the map cannot be asynchronously updated, because only one checkpoint call
            // can happen on this function at a time: either snapshotState() or
            // notifyCheckpointComplete()
            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
            // truncate the map of pending offsets to commit, to prevent infinite growth
            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                pendingOffsetsToCommit.remove(0);
            }
        }
    }

裏面用到裏 offsetState 這個 ListState<byte[]> 來存儲offset , 它的值通過 consumer.snapshotCurrentState(); 這個方法來獲取 ,看一下方法實現

public byte[] snapshotCurrentState() throws Exception {
        // this method assumes that the checkpoint lock is held
        assert Thread.holdsLock(checkpointLock);
        if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
            return null;
        }

        return stateSerializer.serialize(debeziumOffset);
    }

offset的值就在 debeziumOffset 變量裏,這個變量的值回在DebeziumChangeConsumer.handleBatch 裏進行更新 ,當每來一個crud 事件的時候,就回調用這個方法,這個方法最終回調用 emitRecordsUnderCheckpointLock 這個進行 event向下遊發送並更新debeziumOffset 變量的值。

注意: 這裏會使用同步checkpointLock , emit the records, using the checkpoint lock to guarantee atomicity of record emission and offset state update 。

做完以上步驟後,offsetState就會通過checkpoint機制持久化了。最後一步是通過 notifyCheckpointComplete() 回調方法,來把已經checkpoint的offsetState值,寫回 debezium FlinkOffsetBackingStore 存儲裏,實際調用:

DebeziumOffset offset =
                    DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
            consumer.commitOffset(offset);

最終回調用:FlinkOffsetBackingStore.set 方法。到此 offset 狀態持久化存儲流程就走完了。 再來看一下狀態恢復流程,調用 DebeziumSourceFunction.initializeState 方法,context.isRestored() 返回true , 通過 offsetState 獲取到狀態值,賦值給 restoredOffsetState 變量,最後在 run 方法裏會把這個變量值設置給 properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState); debezium 啟動時,會初始化 FlinkOffsetBackingStore 調用它的 configure 方法,完成offset 初始化。

@Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.offsetState =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
        this.historyRecordsState =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));

        if (context.isRestored()) {
            restoreOffsetState();
            restoreHistoryRecordsState();
        } else {
            if (specificOffset != null) {
                byte[] serializedOffset =
                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
                LOG.info(
                        "Consumer subtask {} starts to read from specified offset {}.",
                        getRuntimeContext().getIndexOfThisSubtask(),
                        restoredOffsetState);
            } else {
                LOG.info(
                        "Consumer subtask {} has no restore state.",
                        getRuntimeContext().getIndexOfThisSubtask());
            }
        }
    }

以上完成了,flink-cdc-connector 啟動,offset存儲和恢復源碼分析,對於 FlinkDatabaseHistory 大致原理差不多,讀者可以自行閲讀源碼。

最後總結一下flink-cdc-connector的一些特性,參考github文檔裏總結

  • Exactly-Once Processing (可以實現精確一致的處理)

The MySQL CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with exactly-once processing even failures happen. Please read How the connector performs database snapshot.

  • Startup Reading Position (可以從指定位點消費binlog)

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer.

  • Single Thread Reading (只能是一個並行度處理)

The MySQL CDC source can't work in parallel reading, because there is only one task can receive binlog events.

  • Supports reading database snapshot and continues to read binlogs(支持做初始化快照,但初始化快照需要對數據庫加鎖)

目前社區正在進行flink cdc 2.0的開發,來解決單並行度和數據庫加鎖的問題 參考:https://github.com/ververica/flink-cdc-connectors/pull/233