Flink 並行流中 watermark 機制無法觸發視窗計算的原因分析

語言: CN / TW / HK

場景描述

Kafka Source 接收並處理來自 Kafka 的點選資料(指定事件時間),開一個滾動視窗(Tumble Windows) 每 10 秒統計一次 pv 並將結果輸出到 Print Sink 中。

CREATE TABLE sourceTable (
    message STRING,
    time_ltz AS TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(JSON_VALUE(message, '$.request_body'),'$.clickTime') AS INTEGER),0),
    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '3' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'matrix_json_click_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxx:9527',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE sinkTable (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    click_pv BIGINT
)
WITH (
    'connector' = 'print'
);

INSERT INTO sinkTable(
window_start,
window_end,
click_pv
)
select window_start,window_end,COUNT(*)
 FROM TABLE (
 TUMBLE( TABLE sourceTable, DESCRIPTOR(time_ltz) ,  INTERVAL '10' SECOND))
 GROUP BY window_start, window_end;

執行以上的 FlinkSQL 後觀察發現數據流正常,但是一直沒能觸發視窗計算,沒有結果輸出。

watermark

watermark 是用於處理亂序事件的。流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下流到 operator 的資料都是按照事件產生的時間順序來的,但是也不排除由於網路等原因導致部分資料延遲到達,產生亂序。對於遲到的資料我們又不能無限期地等待下去,因此需要有個衡量事件時間進度的機制來保證一個特定的時間後必須觸發 window 進行計算,這個特別的機制就是 watermark。

並行流中的 watermark

在 多並行度的情況下,source 的每個 sub task 通常獨立生成水印。watermark 通過 operator 時會推進 operators 處的當前 event time,同時 operators 會為下游生成一個新的 watermark。多並行度的情況下 watermark對齊會取所有 channel 最小的 watermark。

並行流中的 watermark

原因分析

由於目標 Topic 只有一個分割槽而 source 並行度設定為 2 ,這導致了只有一個執行緒可以處理該分割槽而另一個執行緒獲取不到資料,因此一直沒能獲取最小的 watermark。最終導致一直無法觸發視窗計算。

解決方案

可通過手動設定並行度來解決,保證 source 並行度 <= 目標 Topic Partition 分割槽數。這裡將 source 並行度設定為 1 之後便可正常輸出結果。

並行度設定為 1

正常輸出