Flink SQL 知其所以然(二十四):SQL DDL!

語言: CN / TW / HK

SQL 語法篇

1.DDL:Create 子句

大家好,我是 老羊 ,今天來學一波 Flink SQL 中的 DDL。

注意:原始碼公眾號後臺回覆 1.13.2 最全 flink sql 獲取。

CREATE 語句用於向當前或指定的 Catalog 中註冊庫、表、檢視或函式。註冊後的庫、表、檢視和函式可以在 SQL 查詢中使用。

目前 Flink SQL 支援下列 CREATE 語句:

  1. :star: CREATE TABLE

  2. :star: CREATE DATABASE

  3. :star: CREATE VIEW

  4. :star: CREATE FUNCTION

此節重點介紹建表,建資料庫、檢視和 UDF 會在後面的擴充套件章節進行介紹。

1.1.建表語句

下面的 SQL 語句就是建表語句的定義,根據指定的表名建立一個表,如果同名表已經在 catalog 中存在了,則無法註冊。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]

<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
[catalog_name.][db_name.]table_name

<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

1.2.表中的列

  1. :star: 常規列(即物理列)

物理列是資料庫中所說的常規列。其定義了物理介質中儲存的資料中欄位的名稱、型別和順序。

其他型別的列可以在物理列之間宣告,但不會影響最終的物理列的讀取。

舉一個僅包含常規列的表的案例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
  1. :star: 元資料列

元資料列是 SQL 標準的擴充套件,允許訪問資料來源本身具有的一些元資料。元資料列由 METADATA 關鍵字標識。

例如,我們可以使用元資料列從 Kafka 資料中讀取 Kafka 資料自帶的時間戳(這個時間戳不是資料中的某個時間戳欄位,而是資料寫入 Kafka 時,Kafka 引擎給這條資料打上的時間戳標記),然後我們可以在 Flink SQL 中使用這個時間戳,比如進行基於時間的視窗操作。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時間戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);

元資料列可以用於後續資料的處理,或者寫入到目標表中。

舉例:

INSERT INTO MyTable 
SELECT
user_id
, name
, record_time + INTERVAL '1' SECOND
FROM MyTable;

如果自定義的列名稱和 Connector 中定義 metadata 欄位的名稱一樣的話, FROM xxx 子句是可以被省略的。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時間戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);

關於 Flink SQL 的每種 Connector 都提供了哪些 metadata 欄位,詳細可見官網文件 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/

如果自定義列的資料型別和 Connector 中定義的 metadata 欄位的資料型別不一致的話,程式執行時會自動 cast 強轉。但是這要求兩種資料型別是可以強轉的。舉例如下:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 將時間戳強轉為 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

預設情況下,Flink SQL planner 認為 metadata 列是可以 讀取 也可以  寫入 的。但是有些外部儲存系統的元資料資訊是隻能用於讀取,不能寫入的。

那麼在往一個表寫入的場景下,我們就可以使用 VIRTUAL 關鍵字來標識某個元資料列不寫入到外部儲存中(不持久化)。

以 Kafka 舉例:

CREATE TABLE MyTable (
-- sink 時會寫入
`timestamp` BIGINT METADATA,
-- sink 時不寫入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);

在上面這個案例中,Kafka 引擎的 offset 是隻讀的。所以我們在把  MyTable 作為資料來源(輸入)表時,schema 中是包含  offset 的。在把  MyTable 作為資料匯(輸出)表時,schema 中是不包含  offset 的。如下:

-- 當做資料來源(輸入)的 schema
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)

-- 當做資料匯(輸出)的 schema
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)

所以這裡在寫入時需要注意,不要在 SQL 的 INSERT INTO 語句中寫入 offset 列,否則 Flink SQL 任務會直接報錯。

  1. :star: 計算列

計算列其實就是在寫建表的 DDL 時,可以拿已有的一些列經過一些自定義的運算生成的新列。這些列本身是沒有以物理形式儲存到資料來源中的。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的計算列,計算方式為 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);

注意!!!

計算列可以包含其他列、常量或者函式,但是不能寫一個子查詢進去。

小夥伴萌這時會問到一個問題,既然只能包含列、常量或者函式計算,我就直接在 DML query 程式碼中寫就完事了唄,為啥還要專門在 DDL 中定義呢?

結論:沒錯,如果只是簡單的四則運算的話直接寫在 DML 中就可以,但是計算列一般是用於定義時間屬性的(因為在 SQL 任務中時間屬性只能在 DDL 中定義,不能在 DML 語句中定義)。比如要把輸入資料的時間格式標準化。處理時間、事件時間分別舉例如下:

  • :star: 處理時間:使用 PROCTIME() 函式來定義處理時間列

  • :star: 事件時間:事件時間的時間戳可以在宣告 Watermark 之前進行預處理。比如如果欄位不是 TIMESTAMP(3) 型別或者時間戳是巢狀在 JSON 字串中的,則可以使用計算列進行預處理。

注意!!!和虛擬 metadata 列是類似的,計算列也是隻能讀不能寫的。

也就是說,我們在把 MyTable 作為資料來源(輸入)表時,schema 中是包含  cost 的。

在把 MyTable 作為資料匯(輸出)表時,schema 中是不包含  cost 的。舉例:

-- 當做資料來源(輸入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)

-- 當做資料匯(輸出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

1.3.定義 Watermark

Watermark 是在 Create Table 中進行定義的。具體 SQL 語法標準是  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

其中:

  1. rowtime_column_name
    TIMESTAMP(3)
    TIMESTAMP_LTZ(3)
    
  2. :star:  watermark_strategy_expression :定義 Watermark 的生成策略。Watermark 的一般都是由  rowtime_column_name 列減掉一段固定時間間隔。SQL 中 Watermark 的生產策略是:當前 Watermark 大於上次發出的 Watermark 時發出當前 Watermark。

注意:

  1. 如果你使用的是事件時間語義,那麼必須要設設定事件時間屬性和 WATERMARK 生成策略。

  2. Watermark 的發出頻率:Watermark 發出一般是間隔一定時間的,Watermark 的發出間隔時間可以由  pipeline.auto-watermark-interval 進行配置,如果設定為 200ms 則每 200ms 會計算一次 Watermark,然如果比之前發出的 Watermark 大,則發出。如果間隔設為 0ms,則 Watermark 只要滿足觸發條件就會發出,不會受到間隔時間控制。

Flink SQL 提供了幾種 WATERMARK 生產策略:

  1. WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
    一般都用這種 Watermark 生成策略
    
  2. :star: 嚴格升序:設定方式為  WATERMARK FOR rowtime_column AS rowtime_column一般基本不用這種方式 。如果你能保證你的資料來源的時間戳是嚴格升序的,那就可以使用這種方式。嚴格升序代表 Flink 任務認為時間戳只會越來越大,也不存在相等的情況,只要相等或者小於之前的,就認為是遲到的資料。
  3. :star: 遞增:設定方式為  WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND一般基本不用這種方式 。如果設定此類,則允許有相同的時間戳出現。

1.4.Create Table With 子句

先看一個案例:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

可以看到 DDL 中 With 子句就是在建表時,描述資料來源、資料匯的具體外部儲存的元資料資訊的。

一般 With 中的配置項由 Flink SQL 的 Connector(連結外部儲存的聯結器) 來定義,每種 Connector 提供的 With 配置項都是不同的。

注意:

  1. Flink SQL 中 Connector 其實就是 Flink 用於連結外部資料來源的介面。舉一個類似的例子,在 Java 中想連線到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去連結。對映到 Flink SQL 中,在 Flink SQL 中要連線到 Kafka,需要使用 kafka connector

  2. Flink SQL 已經提供了一系列的內建 Connector,具體可見 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/

回到上述案例中,With 聲明瞭以下幾項資訊:

  1. :star:  'connector' = 'kafka' :宣告外部儲存是 Kafka
  2. :star:  'topic' = 'user_behavior' :宣告 Flink SQL 任務要連線的 Kafka 表的 topic 是 user_behavior
  3. :star:  'properties.bootstrap.servers' = 'localhost:9092' :宣告 Kafka 的 server ip 是 localhost:9092
  4. :star:  'properties.group.id' = 'testGroup' :宣告 Flink SQL 任務消費這個 Kafka topic,會使用 testGroup 的 group id 去消費
  5. :star:  'scan.startup.mode' = 'earliest-offset' :宣告 Flink SQL 任務消費這個 Kafka topic 會從最早位點開始消費
  6. :star:  'format' = 'csv' :宣告 Flink SQL 任務讀入或者寫出時對於 Kafka 訊息的序列化方式是 csv 格式

從這裡也可以看出來 With 中具體要配置哪些配置項都是和每種 Connector 決定的。

1.5.Create Table Like 子句

Like 子句是 Create Table 子句的一個延伸。舉例:

下面定義了一張 Orders 表:

CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);

但是忘記定義 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定義一張帶 Watermark 的新表:

CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆蓋了原 Orders 表中 scan.startup.mode 引數
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句宣告是在原來的 Orders 表的基礎上定義 Orders_with_watermark 表
LIKE Orders;

上面這個語句的效果就等同於:

CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);

不過這種不常使用。就不過多介紹了。如果小夥伴萌感興趣,直接去官網參考具體注意事項:

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like