Flink cdc 2.0 嚐鮮

語言: CN / TW / HK

前幾天flink cdc 釋出了2.0版本,於是跟進了一下。2.0版本主要解決1.x裡的幾個重要問題,如下:

  1. 全量 + 增量讀取的過程需要保證所有資料的一致性,因此需要通過加鎖保證,但是加鎖在資料庫層面上是一個十分高危的操作。底層 Debezium 在保證資料一致性時,需要對讀取的庫或表加鎖,全域性鎖可能導致資料庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖許可權。

  2. 不支援水平擴充套件,因為 Flink CDC 底層是基於 Debezium,起架構是單節點,所以Flink CDC 只支援單併發。在全量階段讀取階段,如果表非常大 (億級別),讀取時間在小時甚至天級別,使用者不能通過增加資源去提升作業速度。

  3. 全量讀取階段不支援 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支援 checkpoint 的,因此會存在一個問題:當我們同步全量資料時,假設需要 5 個小時,當我們同步了 4 小時的時候作業失敗,這時候就需要重新開始,再讀取 5 個小時。

其中第一個加鎖問題尤為突出,基本在公司生產環境不可用,因為生產環境不可能讓你去加全域性鎖的。 cdc2.0 借鑑了netflix釋出的DBlog paper來解決加鎖問題(參考論文 https://arxiv.org/pdf/2010.12597v1.pdf) 其核心思想是對錶按照主建進行分片,記為Chunk。在對每一個Chunk進行並行讀取,最後對所有Chunk進行對齊處理,得到最終的增量讀取點,在繼續讀取。具體詳細演算法這裡不展開了,感興趣的可以參考這篇文章(https://mp.weixin.qq.com/s/iwY5975XXp7QOBeV0q4TfQ) 。 2,3兩個問題,通過FLIP-27中新的source介面來解決了,可以參考設計文件(https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

一個例子

注意:(1)必須開啟checkpoint (2)source表必須定義主鍵

try {
      EnvironmentSettings mySetting = EnvironmentSettings
          .newInstance()
          .useBlinkPlanner()
          .inStreamingMode()
          .build();
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(2);
      //開啟checkpoint
      env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));

      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting);

      String ddl1 = ""
          + "CREATE TABLE orders (\n"
          + "  order_id INT primary key not enforced,\n"
          + "  order_date TIMESTAMP(0),\n"
          + "  customer_name STRING,\n"
          + "  price DECIMAL(10, 5),\n"
          + "  product_id INT,\n"
          + "  order_status BOOLEAN\n"
          + ") WITH (\n"
          + "  'connector' = 'mysql-cdc',\n"
          + "  'hostname' = 'localhost',\n"
          + "  'port' = '3306',\n"
          + "  'username' = 'root',\n"
          + "  'password' = 'root',\n"
          + "  'database-name' = 'flink_sql',\n"
          + "  'table-name' = 'orders'\n"
          + ")";
      tableEnv.executeSql(ddl1);

      String ddlPrint = ""
          + "CREATE TABLE sink_print (\n"
          + "  order_id INT primary key not enforced,\n"
          + "  order_date TIMESTAMP(0),\n"
          + "  customer_name STRING,\n"
          + "  price DECIMAL(10, 5),\n"
          + "  product_id INT,\n"
          + "  order_status BOOLEAN\n"
          + ") WITH (\n"
          + "    'connector' = 'print'\n"
          + ")";
      tableEnv.executeSql(ddlPrint);
      StatementSet stmtSet = tableEnv.createStatementSet();
      stmtSet.addInsertSql(""
          + "INSERT INTO sink_print\n"
          + "SELECT * \n"
          + "FROM orders\n");

      stmtSet.execute();

    } catch (Exception e) {
      e.printStackTrace();
      System.out.println(e.toString());
      System.err.println("任務執行失敗:" + e.getMessage());
      System.exit(-1);
    }

在實際測試中發現幾個注意點:

  1. 從snapshot stage 切換到increment binlog stage 的時候,需要在all snapshot 完成後,下一個checkpoint發生時,才開啟後續的increment binlog stage。所有實際測試資料變更後,需要等一下才能觀察到增量的改變。
  2. increment binlog stage 的binlog position總是(binlog_name=“”,position=0),這裡貌似是計算minStartPosition位點的時候的一個bug,我已經提issue討論了,見:https://github.com/ververica/flink-cdc-connectors/issues/329 歡迎一起討論~~~