Apache Flink 漫談系列 - Watermark是個啥?

語言: CN / TW / HK

實際問題(亂序)

在介紹Watermark相關內容之前我們先丟擲一個具體的問題,在實際的流式計算中資料到來的順序對計算結果的正確性有至關重要的影響,比如:某資料來源中的某些資料由於某種原因(如:網路原因,外部儲存自身原因)會有5秒的延時,也就是在實際時間的第1秒產生的資料有可能在第5秒中產生的資料之後到來(比如到Window處理節點).選具體某個delay的元素來說,假設在一個5秒的Tumble視窗(詳見後續Window篇介紹),有一個EventTime是 11秒的資料,在第16秒時候到來了。圖示第11秒的資料,在16秒到來了,如下圖:

那麼對於一個Count聚合的Tumble(5s)的window,上面的情況如何處理才能window2=4,window3=2 呢?

Apache Flink的時間型別

開篇我們描述的問題是一個很常見的TimeWindow中資料亂序的問題,亂序是相對於事件產生時間和到達Apache Flink 實際處理運算元的順序而言的,關於時間在Apache Flink中有如下三種時間型別,如下圖:

  • ProcessingTime 是資料流入到具體某個運算元時候相應的系統時間。ProcessingTime 有最好的效能和最低的延遲。但在分散式計算環境中ProcessingTime具有不確定性,相同資料流多次執行有可能產生不同的計算結果。
  • IngestionTimeIngestionTime是資料進入Apache Flink框架的時間,是在Source Operator中設定的。與ProcessingTime相比可以提供更可預測的結果,因為IngestionTime的時間戳比較穩定(在源處只記錄一次),同一資料在流經不同視窗操作時將使用相同的時間戳,而對於ProcessingTime同一資料在流經不同視窗運算元會有不同的處理時間戳。
  • EventTimeEventTime是事件在裝置上產生時候攜帶的。在進入Apache Flink框架之前EventTime通常要嵌入到記錄中,並且EventTime也可以從記錄中提取出來。在實際的網上購物訂單等業務場景中,大多會使用EventTime來進行資料計算。

開篇描述的問題和本篇要介紹的Watermark所涉及的時間型別均是指EventTime型別。

什麼是Watermark

Watermark是Apache Flink為了處理EventTime 視窗計算提出的一種機制,本質上也是一種時間戳,由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統Event,與普通資料流Event一樣流轉到對應的下游運算元,接收到Watermark Event的運算元以此不斷調整自己管理的EventTime clock。Apache Flink 框架保證Watermark單調遞增,運算元接收到一個Watermark時候,框架知道不會再有任何小於該Watermark的時間戳的資料元素到來了,所以Watermark可以看做是告訴Apache Flink框架資料流已經處理到什麼位置(時間維度)的方式。Watermark的產生和Apache Flink內部處理邏輯如下圖所示:

Watermark的產生方式

目前Apache Flink 有兩種生產Watermark的方式,如下:

  • Punctuated資料流中每一個遞增的EventTime都會產生一個Watermark。 在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游運算元造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
  • Periodic週期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續週期性產生Watermark,否則在極端情況下會有很大的延時。

所以Watermark的生成方式需要根據業務場景的不同進行不同的選擇。

Watermark的介面定義

對應Apache Flink Watermark兩種不同的生成方式,我們瞭解一下對應的介面定義,如下:

  • Periodic Watermarks - AssignerWithPeriodicWatermarks
/**
 * Returns the current watermark. This method is periodically called by the
 * system to retrieve the current watermark. The method may return {@code null} to
 * indicate that no new Watermark is available.
 *
 * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
 * is larger than that of the previously emitted watermark (to preserve the contract of
 * ascending watermarks). If the current watermark is still
 * identical to the previous one, no progress in EventTime has happened since
 * the previous call to this method. If a null value is returned, or theTimestamp
 * of the returned watermark is smaller than that of the last emitted one, then no
 * new watermark will be generated.
 *
 * <p>The interval in which this method is called and Watermarks are generated
 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
 *
 * @see org.Apache.flink.streaming.api.watermark.Watermark
 * @see ExecutionConfig#getAutoWatermarkInterval()
 *
 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 */
 @Nullable
 Watermark getCurrentWatermark();
  • Punctuated Watermarks - AssignerWithPunctuatedWatermarks
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

  /**
   * Asks this implementation if it wants to emit a watermark. This method is called right after
   * the {@link #extractTimestamp(Object, long)} method.
   *
   * <p>The returned watermark will be emitted only if it is non-null and its timestamp
   * is larger than that of the previously emitted watermark (to preserve the contract of
   * ascending watermarks). If a null value is returned, or the timestamp of the returned
   * watermark is smaller than that of the last emitted one, then no new watermark will
   * be generated.
   *
   * <p>For an example how to use this method, see the documentation of
   * {@link AssignerWithPunctuatedWatermarks this class}.
   *
   * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
   */
  @Nullable
  Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
  • AssignerWithPunctuatedWatermarks 繼承了TimestampAssigner介面 -TimestampAssigner
public interface TimestampAssigner<T> extends Function {

  /**
   * Assigns a timestamp to an element, in milliseconds since the Epoch.
   *
   * <p>The method is passed the previously assigned timestamp of the element.
   * That previous timestamp may have been assigned from a previous assigner,
   * by ingestion time. If the element did not carry a timestamp before, this value is
   * {@code Long.MIN_VALUE}.
   *
   * @param element The element that the timestamp will be assigned to.
   * @param previousElementTimestamp The previous internal timestamp of the element,
   *                                 or a negative value, if no timestamp has been assigned yet.
   * @return The new timestamp.
   */
  long extractTimestamp(T element, long previousElementTimestamp);
}

從介面定義可以看出,Watermark可以在Event(Element)中提取EventTime,進而定義一定的計算邏輯產生Watermark的時間戳。

Watermark解決如上問題

從上面的Watermark生成介面和Apache Flink內部對Periodic Watermark的實現來看,Watermark的時間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時間戳不等於Event中的EventTime,Event中的EventTime自產生那一刻起就不可以改變了,不受Apache Flink框架控制,而Watermark的產生是在Apache Flink的Source節點或實現的Watermark生成器計算產生(如上Apache Flink內建的 Periodic Watermark實現), Apache Flink內部對單流或多流的場景有統一的Watermark處理。

回過頭來我們在看看Watermark機制如何解決上面的問題,上面的問題在於如何將遲來的EventTime 位11的元素正確處理。要解決這個問題我們還需要先了解一下EventTime window是如何觸發的?EventTime window 計算條件是當Window計算的Timer時間戳 小於等於 當前系統的Watermak的時間戳時候進行計算。

  • 當Watermark的時間戳等於Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結果如下:

上面對應的DDL定義如下:

create table t1(
  ts timestamp(3),
  other bigint,
  WATERMARK FOR ts AS ts
) with (
  'connector' = 'xx'
)
  • 如果想正確處理遲來的資料可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:

上面對應的DDL定義如下:

create table t1(
  ts timestamp(3),
  other bigint,
  WATERMARK FOR ts AS ts - interval '5' SECOND
) with (
  'connector' = 'xx'
)

上面正確處理的根源是我們採取了 延遲觸發 window 計算 的方式正確處理了 Late Event. 與此同時,我們發現window的延時觸發計算,也導致了下游的LATENCY變大,本例子中下游得到window的結果就延遲了5s.

多流的Watermark處理

在實際的流計算中往往一個job中會處理多個Source的資料,對Source的資料進行GroupBy分組,那麼來自不同Source的相同key值會shuffle到同一個處理節點,並攜帶各自的Watermark,Apache Flink內部要保證Watermark要保持單調遞增,多個Source的Watermark匯聚到一起時候可能不是單調自增的,這樣的情況Apache Flink內部是如何處理的呢?如下圖所示:

Apache Flink內部實現每一個邊上只能有一個遞增的Watermark, 當出現多流攜帶Eventtime匯聚到一起(Join or Union)時候,Apache Flink會選擇所有流入的Eventtime中最小min(stream1, stream2...streamN)的一個向下遊流出。從而保證watermark的單調遞增和保證資料的完整性.如下圖:

小結

本節以一個流計算常見的亂序問題介紹了Apache Flink如何利用Watermark機制來處理亂序問題. 本篇內容在一定程度上也體現了EventTime Window中的Trigger機制依賴了Watermark(後續Window篇章會介紹)。Watermark機制是流計算中處理亂序,正確處理Late Event的核心手段。更多細節歡迎關注《Apache Flink 知其然,知其所以然》系列視訊課程!

作者介紹

孫金城,51CTO社群編輯,Apache Flink PMC 成員,Apache Beam Committer,Apache IoTDB PMC 成員,ALC Beijing 成員,Apache ShenYu 導師,Apache 軟體基金會成員。關注技術領域流計算和時序資料儲存。