Flink 並行流中 watermark 機制無法觸發視窗計算的原因分析
場景描述
Kafka Source 接收並處理來自 Kafka 的點選資料(指定事件時間),開一個滾動視窗(Tumble Windows) 每 10 秒統計一次 pv 並將結果輸出到 Print Sink 中。
CREATE TABLE sourceTable ( message STRING, time_ltz AS TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(JSON_VALUE(message, '$.request_body'),'$.clickTime') AS INTEGER),0), WATERMARK FOR time_ltz AS time_ltz - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'matrix_json_click_log_test', 'properties.bootstrap.servers' = 'xxxxxxxxx:9527', 'properties.group.id' = 'flinkTestGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE sinkTable ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), click_pv BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO sinkTable( window_start, window_end, click_pv ) select window_start,window_end,COUNT(*) FROM TABLE ( TUMBLE( TABLE sourceTable, DESCRIPTOR(time_ltz) , INTERVAL '10' SECOND)) GROUP BY window_start, window_end;
執行以上的 FlinkSQL 後觀察發現數據流正常,但是一直沒能觸發視窗計算,沒有結果輸出。
watermark
watermark 是用於處理亂序事件的。流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下流到 operator 的資料都是按照事件產生的時間順序來的,但是也不排除由於網路等原因導致部分資料延遲到達,產生亂序。對於遲到的資料我們又不能無限期地等待下去,因此需要有個衡量事件時間進度的機制來保證一個特定的時間後必須觸發 window 進行計算,這個特別的機制就是 watermark。
並行流中的 watermark
在 多並行度的情況下,source 的每個 sub task 通常獨立生成水印。watermark 通過 operator 時會推進 operators 處的當前 event time,同時 operators 會為下游生成一個新的 watermark。多並行度的情況下 watermark對齊會取所有 channel 最小的 watermark。
並行流中的 watermark
原因分析
由於目標 Topic 只有一個分割槽而 source 並行度設定為 2 ,這導致了只有一個執行緒可以處理該分割槽而另一個執行緒獲取不到資料,因此一直沒能獲取最小的 watermark。最終導致一直無法觸發視窗計算。
解決方案
可通過手動設定並行度來解決,保證 source 並行度 <= 目標 Topic Partition 分割槽數。這裡將 source 並行度設定為 1 之後便可正常輸出結果。
並行度設定為 1
正常輸出
- Spring中實現非同步呼叫的方式有哪些?
- 帶引數的全型別 Python 裝飾器
- 整理了幾個Python正則表示式,拿走就能用!
- SOLID:開閉原則Go程式碼實戰
- React中如何引入CSS呢
- 一個新視角:前端框架們都卷錯方向了?
- 編碼中的Adapter,不僅是一種設計模式,更是一種架構理念與解決方案
- 手寫程式語言-遞迴函式是如何實現的?
- 一文搞懂模糊匹配:定義、過程與技術
- 新來個阿里 P7,僅花 2 小時,做出一個多執行緒永動任務,看完直接跪了
- Puzzlescript,一種開發H5益智遊戲的引擎
- @Autowired和@Resource到底什麼區別,你明白了嗎?
- CSS transition 小技巧!如何保留 hover 的狀態?
- React如此受歡迎離不開這4個主要原則
- LeCun再炮轟Marcus: 他是心理學家,不是搞AI的
- Java保證執行緒安全的方式有哪些?
- 19個殺手級 JavaScript 單行程式碼,讓你看起來像專業人士
- Python 的"self"引數是什麼?
- 別整一坨 CSS 程式碼了,試試這幾個實用函式
- 再有人問你什麼是MVCC,就把這篇文章發給他!