Flink SQL 知其所以然(二十二):SQL 的時間語義!(建議收藏)

語言: CN / TW / HK

SQL 的時間語義

hello,我是老羊,今天跟著老羊的思路學習 Flink SQL 的時間語義:

  1. 處理時間
    事件時間
    攝入時間
    
  2. 事件時間(SQL 常用)
    處理時間(SQL 幾乎不用,DataStream 少用)
    攝入時間(不用)
    

1.Flink 三種時間屬性簡介

time
  1. :star: 事件時間:指的是資料本身攜帶的時間,這個時間是在事件產生時的時間,而且在 Flink SQL 觸發計算時,也使用資料本身攜帶的時間。這就叫做  事件時間目前生產環境中用的最多
  2. :star: 處理時間:指的是具體運算元計算資料執行時的機器時間(例如在運算元中 Java 取 System.currentTimeMillis()) ), 在生產環境中用的次多
  3. :star: 攝入時間:指的是資料從資料來源進入 Flink 的時間。 攝入時間用的最少,可以說基本不使用

小夥伴萌要注意到:

  1. :star: 上述的三種時間概念不是由於有了資料而誕生的,而是有了 Flink 之後根據實際的應用場景而誕生的。以事件時間舉個例子,如果只是資料攜帶了時間,Flink 也消費了這個資料,但是在 Flink 中沒有使用資料的這個時間作為計算的觸發條件,也不能把這個 Flink 任務叫做事件時間的任務。

  2. :star: 其次,要認識到,一般一個 Flink 任務只會有一個時間屬性,所以時間屬性通常認為是一個任務粒度的。舉例:我們可以說 A 任務是事件時間語義的任務,B 任務是處理時間語義的任務。當然了,一個任務也可以存在多個時間屬性。

2.Flink 三種時間屬性的應用場景

講到這裡,xdm 會問,博主上面寫的 3 種時間屬性到底對我們的任務有啥影響呢?3 種時間屬性的應用場景是啥?

先說結論,在 Flink 中時間的作用:

  1. 主要體現在包含時間視窗的計算中
    滾動視窗
    滑動視窗
    
  2. :star:  次要體現在自定義時間語義的計算中 :舉個例子,比如使用者可以自定義每隔 10s 的本地時間,或者消費到的資料的時間戳每增大 10s,就把計算結果輸出一次,時間在此類應用中也是一種標識任務進度的作用。

博主以 滾動視窗 的聚合任務為例來介紹一下事件時間和處理時間的對比區別。

  1. :star: 事件時間案例:還是以之前的 clicks 表拿來舉例。

tumble window

上面這個案例的視窗大小是 1 小時,需求方需要按照使用者點選時間戳 cTime 劃分資料(劃分滾動視窗),然後計算出 count 聚合結果(這樣計算能反映出事件的真實發生時間),那麼就需要把  cTime 設定為視窗的劃分時間戳,即程式碼中  tumble(cTime, interval '1' hour)

上面這種就叫做事件時間。即用資料中自帶的時間戳進行視窗的劃分(點選操作真實的發生時間)。

後續 Flink SQL 任務在執行的過程中也會實際按照 cTime 的當前時間作為一小時視窗結束觸發條件並計算一個小時視窗內的資料。

  1. :star: 處理時間案例:還是以之前的 clicks 表拿來舉例。

還是上面那個案例,但是這次需求方不需要按照資料上的時間戳劃分資料(劃分滾動視窗),只需要資料來了之後, 在 Flink 機器上的時間作為一小時視窗結束的書法條件並計算。

那麼這種觸發機制就是處理時間。

  1. :star: 攝入時間案例:在 Flink 從外部資料來源讀取到資料時,給這條資料帶上的當前資料來源運算元的本地時間戳。下游可以用這個時間戳進行視窗聚合,不過這種幾乎不使用。

3.SQL 指定時間屬性的兩種方式

如果要滿足 Flink SQL 時間視窗類的聚合操作,SQL 或 Table API 中的 資料來源表 就需要提供時間屬性(相當於我們把這個時間屬性在  資料來源表 上面進行宣告),以及支援時間相關的操作。

那麼來看看 Flink SQL 為我們提供的兩種指定時間戳的方式:

  1. :star:  CREATE TABLE DDL 建立表的時候指定
  2. :star:  可以在 DataStream 中指定 ,在後續的 DataStream 轉的 Table 中使用

一旦時間屬性定義好,它就可以像普通列一樣使用,也可以在時間相關的操作中使用。

4.SQL 事件時間案例

來看看 Flink 中如何指定事件時間。

  1. :star:  CREATE TABLE DDL 指定時間戳的方式。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 使用下面這句來將 user_action_time 宣告為事件時間,並且宣告 watermark 的生成規則,即 user_action_time 減 5 秒
-- 事件時間列的欄位型別必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 型別
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然後就可以在視窗運算元中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

從上面這條語句可以看到,如果想使用事件時間,那麼我們的時間戳型別必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 型別。很多小夥伴會想到,我們的時間戳一般不都是秒或者是毫秒(BIGINT 型別)嘛,那這種情況怎麼辦?

解決方案必須要有啊。如下。

CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 1. 這個 ts 就是常見的毫秒級別時間戳
ts BIGINT,
-- 2. 將毫秒時間戳轉換成 TIMESTAMP_LTZ 型別
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- 3. 使用下面這句來將 user_action_time 宣告為事件時間,並且宣告 watermark 的生成規則,即 user_action_time 減 5 秒
-- 事件時間列的欄位型別必須是 TIMESTAMP 或者 TIMESTAMP_LTZ 型別
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  1. :star:  DataStream 中指定事件時間。

之前介紹了 Table 和  DataStream 可以互轉,那麼 Flink 也提供了一個能力,就是在 Table 轉為 DataStream 時,指定時間戳欄位。如下案例:

public class DataStreamSourceEventTimeTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 1. 分配 watermark
DataStream<Row> r = env.addSource(new UserDefinedSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {
@Override
public long extractTimestamp(Row element) {
return (long) element.getField("f2");
}
});
// 2. 使用 f2.rowtime 的方式將 f2 欄位指為事件時間時間戳
Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");

tEnv.createTemporaryView("source_table", sourceTable);

// 3. 在 tumble window 中使用 f2
String tumbleWindowSql =
"SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
+ "FROM source_table\n"
+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
;

Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

tEnv.toDataStream(resultTable, Row.class).print();

env.execute();
}


private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

private volatile boolean isCancel;

@Override
public void run(SourceContext<Row> sourceContext) throws Exception {

int i = 0;

while (!this.isCancel) {

sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

Thread.sleep(10L);
i++;
}

}

@Override
public void cancel() {
this.isCancel = true;
}

@Override
public TypeInformation<Row> getProducedType() {
return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
TypeInformation.of(Long.class))
;
}
}
}

5.SQL 處理時間案例

來看看 Flink SQL 中如何指定處理時間。

  1. :star:  CREATE TABLE DDL 指定時間戳的方式。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 使用下面這句來將 user_action_time 宣告為處理時間
user_action_time AS PROCTIME()
) WITH (
...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然後就可以在視窗運算元中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  1. :star:  DataStream 中指定處理時間。
public class DataStreamSourceProcessingTimeTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 1. 分配 watermark
DataStream<Row> r = env.addSource(new UserDefinedSource());

// 2. 使用 proctime.proctime 的方式將 f2 欄位指為處理時間時間戳
Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");

tEnv.createTemporaryView("source_table", sourceTable);

// 3. 在 tumble window 中使用 f2
String tumbleWindowSql =
"SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
+ "FROM source_table\n"
+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
;

Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

tEnv.toDataStream(resultTable, Row.class).print();

env.execute();
}


private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

private volatile boolean isCancel;

@Override
public void run(SourceContext<Row> sourceContext) throws Exception {

int i = 0;

while (!this.isCancel) {

sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

Thread.sleep(10L);
i++;
}

}

@Override
public void cancel() {
this.isCancel = true;
}

@Override
public TypeInformation<Row> getProducedType() {
return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
TypeInformation.of(Long.class))
;
}
}
}

往期推薦

揭祕位元組跳動埋點資料實時動態處理引擎(附原始碼)

END

你好,我是老羊,一個網際網路大廠實時資料開發工程師。

在公眾號發表的 18w 字、132 案例、48 圖 實時計算入門 《Flink SQL 成神之路》 、實時計算進階 《Flink SQL 知其所以然》 、實時計算面試 《Flink 對線面試官》 系列深受讀者好評, 關注公眾號傳送【Flink SQL】限時免費領取《Flink SQL 成神之路》PDF

我每週至少更新一篇原創,不僅分享大資料相關資訊,更多關注于思維模式的升級,以及程式人生的感悟,希望每一篇推送都能給到你不一樣的啟發。我正前行在實現自己目標的路上, 下方關注後可以加我微信交流  願你所往之地皆為熱土,願你所遇之人皆為摯友  。

長按上方掃碼二維碼,加我微信

:point_up_2:點選關注|設為星標|乾貨速遞:point_up_2:

動動小手,讓更多需要的人看到~