精準水位在流批一體數據倉庫的探索和實踐

語言: CN / TW / HK

作者 |  浮生若夢的石頭

導讀 

隨着實時計算技術在大數據中的廣泛應用,數據的時效性得到大幅度,但是實際應用場景中,除了時效性,還面臨着更高的技術要求。

本文結合實時計算的水位技術在流批一體數據倉庫中的探索和實踐,重點闡述了水位技術的概念和相關理論實踐,尤其就水位在實時計算系統中的特性、邊界定義和應用,最後重點描述了一種改進的精準水位的設計和實現。該技術架構目前在百度實際業務場景下表現成熟和穩定,藉此分享給大家,希望對大家有參考價值。

全文7118字,預計閲讀時間18分鐘。

01 業務背景

為了提升產品研發、策略迭代、數據分析以及運營決策的效率,業務對數據的時效性要求越來越高。

雖然我們很早就基於實時計算實現了實時數據倉庫的建設,但是還是無法取代離線數據倉庫,實時和離線數據倉庫各自一套開發和維護的成本高,最重要的是業務的口徑還不能100%對齊。所以我們一直在致力於建設一套流批一體數據倉庫,在實現整體數據加工效率提速的同時,還能保證數據如離線數據那樣可靠,能支持100%業務場景,從而實現整體降本提效。

圖片

△流批一體數據倉庫建設思路

02 流批一體數據倉庫的技術難點

要想端到端實現流批一體數據倉庫,作為底層技術架構的實時計算系統,面臨着很多技術難點和挑戰:

1、端到端數據的嚴格不重不丟,以保證數據的完整性;

2、實時數據的窗口和離線數據的窗口,包含數據是對齊的(99.9% ~ 99.99%)

3、實時計算需要支持精準的窗口計算,以保證實時反作弊策略的準招效果;

4、實時計算系統和百度內部大數據生態打通,並有實際大規模線上穩定運行實踐。

以上2和3點,都需要高可靠的水位機制來確保實時數據的進度感知和精準切分。

於是本篇文章就精準水位在流批一體數據倉庫中的探索和實踐的經驗,分享給大家。

03 水位概念和通用實現的現狀

3.1 水位的必要性

在介紹水位(Watermark)的概念之前,需要先插入2個概念:

  • Event time, 事件發生時間。我們一般理解為用户真實行為發生的時間,具體對應是日誌中記錄用户行為發生的時間戳。

  • Processing time, 數據處理時間。我們一般理解為系統處理數據的時間。

那水位(watermark)具體有什麼用處?

在實際實時數據處理過程中,數據是無邊界的(Unbounded), 那麼基於Window這種窗口計算或其他類似場景就面臨一個實際的問題:

怎麼知道某個窗口的數據是完整的?什麼時候才能觸發窗口計算()?

大多數情況下,我們使用Event Time來觸發窗口計算(或者數據分區切分,對標離線)。然而實際的情況是實時日誌總有不同程度的延遲(在日誌採集、日誌傳輸和日誌處理等階段),即如下圖所示,實際上會發生水印的傾斜(即數據會出現亂序)。在這種情況下 , Watermark機制就很有必要存在,來確保數據的完整性。

圖片 △水位傾斜現象

3.2 水位的定義和特點

水位(watermark)的定義目前業界沒有統一的説法,結合**Streaming  Systems**一書(作者是Google Dataflow 研發團隊)中定義,個人以為比較確切:

The watermark is a monotonically increasing timestamp of the oldest work not yet completed.

從定義我們可以概括出水位的2大基本特性:

  • 水位是連續遞增的(不可回退)

  • 水位是一個時間戳

然而在實際生產系統中,水位如何去計算,以及實際的效果是什麼樣子?結合目前業界不同的實時計算系統,對於水位的支持還是不一樣的。

3.3 目前水位現狀和麪臨的挑戰

在目前業界的實時計算系統中,比如Apache Flink(Google Dataflow的開源實現)、Apache Spark(僅侷限Structured Streaming框架)中,都是支持水位的,下面就以社區最火爆的Apache Flink列舉一下水位的實現機制:

但是以上水位的實現機制和效果,在日誌源端出現大面積日誌延遲傳輸的情況下,水位還依舊會更新(新舊數據亂序傳輸)推進,會導致對應的窗口數據不完整,窗口計算不準確。因此,在百度內部,我們基於日誌採集和傳輸系統、實時計算系統探索了一種改進的、相對精準的水位機制,以確保實時數據在窗口計算、數據落地(sink 到AFS/Hive)等應用場景下,窗口數據的完整性問題,以滿足實現流批一體數據倉庫的要求。

圖片

△Flink中水位生成策略

GEEK TALK

04 全局水位的設計和應用

4.1 水位中心化管理的設計

為了使得水位在實時計算中更精準,我們設計出一種中心化的水位管理思路,即實時計算的各個節點,包含source、operator、sinker等都會把自己計算的水位信息,統一上報給全局的Watermark Server,由Watermark Server 來進行水位信息的統一管理。

圖片

△中心化水位設計

Watermark Server :維護一個水位的信息表(hash_table),包含實時計算程序(APP)整體拓撲信息(Source、Operator 和Sinker等)各個層級對應的水位信息,以便於進行全局水位(比如low watermark)的計算,Watermark Server 定期和state做交互,以保證水位信息的不丟失。

Watermark Client:水位更新客户端,在source、worker和sinker等實時算子中,負責向Watermark Server 上報和請求水位信息(比如上游或者全局水位),通過baidu-rpc服務請求回調。

Low watermark(低水位):Low watermark是一個時間戳,用來標記實時數據處理過程中最早(oldest)的沒有處理的數據的時間(Low watermark, which pessimistically attempt to capture the event time of the oldest unprocessed record the system is aware of.)。它承諾未來不會有早於該時間戳的數據到達。這裏的時間計算一般基於eventtime,即事件發生時間,例如日誌中用户行為發生的時間,而較少使用數據處理時間(processing time,某些場景也可以用),watermark計算的公式為(來自Google MillWheel 論文):

Low Watermark of A =  min(oldest work of A, low watermark of C : C outputs to A)

圖片

但是在實際系統設計中,low watermark又可以按照算子處理的邊界區分如下:

  • Input Low Watermark: Oldest work not yet sent to this streaming stage.

    InputLowWatermark(Stage) = min { OutputLowWatermark(Stage’) | Stage’ is upstream of Stage}

    輸入最低水位,可以理解為將要輸入當前算子,即上游算子處理過的數據的watermark。

  • Output Low Watermark: Oldest work not yet completed by this streaming stage.

    OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) }

    輸出最低水位,可以理解為當前算子未處理過數據的最早的(oldest)水位,即處理過數據的水位。

    具體如下圖所示,理解會更形象些。

圖片△Low watermark的邊界定義

4.2 如何實現精準水位

4.2.1、精準水位的前提條件

目前實時計算系統在實時數據倉庫的應用場景,我們都是使用low watermak來觸發窗口計算(因為這樣更可靠),從3.1中low watermark的定義我們可知:low watermark是層級迭代計算的,水位是否精準,取決於最上游(即source)水位的精準度。於是為了提升源頭水位計算的精準度,我們需要前提條件:

  • 日誌在服務端的單台服務器上是按照時間(event_time)有序生產的

  • 日誌在採集時候,除了真實的用户行為日誌,還需要包含其他信息,比如服務器tag(hostname)和日誌時間(msg_time)等信息,如下圖所示

圖片

△日誌打包信息

  • 日誌是實時點對點發布到消息隊列,以保證消息隊列單個分區(partition)內,單個服務器的日誌是嚴格有序的

圖片

△源端日誌點對點發布到消息隊列,保證單分區日誌是有序的

4.2.2、水位的計算方式

1、Watermark server

初始化

首先作為獨立的線程(thread)啟動。根據配置的日誌傳輸任務的BNS(Baidu Naming Service,百度名字服務,提供服務名稱到服務端所有運行實例的映射)來解析日誌源的服務器列表(hostname list);根據配置的APP拓撲關係,初始化watermark信息表,並持久化寫入Table(百度分佈式kv存儲引擎)。

普通水位信息更新:接收到Client到水位信息並更新對應粒度(Processor粒度或者keygroup粒度)的水位,對局部水位進行更新

精準水位計算

現實中,如果要求源端的日誌100%都精確的到達,會造成頻繁的延遲或者延遲太久(如果下發採用全局Low watermark邏輯)。原因是:在日誌端服務器實例太多的情況下(比如實際上我們有的日誌有實例6000 - 10000個),總有線上服務的實例會出現日誌實時上傳的延遲的情況,那麼這就需要在數據的完整性和時效性之間做一個折中,比如以百分比的形式來精準控制允許延遲的實例個數(比如配置99.9% 或者99.99%來設置允許源端日誌出現延遲的比例),來精準控制最源端水位的精確度。

精準水位需要特殊配置,根據Source端實時上報的服務器和日誌進度的映射關係,以及配置的允許延遲實例的比例,來計算Source端的output low watermark。

計算全局low Watermark:會計算一個全局最小的水位,返回給Client端的請求

狀態持久化:定期把全局水位信息持久化寫入外部存儲,以便於狀態恢復

2、Watermark Client

Source 端:解析日誌包,並獲日誌包裏面的機器名等信息和原始的日誌。原始日誌經過ETL處理後,並根據原始的日誌獲取最新時間戳(event_timestamps),Source通過Watermark Client API 把解析到hostname和最新時間戳(event_timestamps)的映射關係表定期上報(目前配置的1000ms)到Watermark Server。

圖片

△Source通過解析日誌獲取的服務器和日誌進度映射關係

Operator端

Input low Watermark計算 :   獲取上游(Upstream)的output low watermark,作為input  low watermark來決定是否觸發窗口計算等操作;

output low Watermark計算:根據日誌、狀態(state)等處理進度(oldest work)來計算自己的output low watermark,並上報到Watermark  Server,以便於下游算子(Download Processor)使用。

圖片

△Watermark Client 工作流程

Sinker端

Sinker端和上面的普通實時算子(Operator)一樣,會計算Input  Low Watermark和 Output  Low  Watermark來更新自己的水位,

額外需要請求一個全局的Low Watermark 來決定數據的輸出窗口是否關閉。

4.3 精準水位在系統間的傳遞

水位傳遞的必要性

很多時候,實時系統並不是孤立的,多個實時計算系統之間存在着數據的交互,最為常見的方式是兩個實時數據處理系統是上下游的關係。

具體表現為:兩個實時數據處理系統之間通過消息隊列(比如社區的Apache Kafka)來實現數據的傳遞,那麼在這種情況下,如何實現精準水位的傳遞呢?

具體實現步驟如下

1、上游實時計算系統的日誌源,保證日誌是點對點發布的,這樣可以保證全局水位的精準度(具體比例是可調的);

2、在上游實時計算系統的輸出端(sinker/exporter 到消息隊列端),需要保證使用全局low watermark的下發,目前我們採用把全局水位信息打印到每條日誌上面來實現傳遞;

3、在下游實時數據計算系統的Source端,需要解析日誌攜帶的水位信息字段(來自上游實時計算系統),並開始作為水位的輸入(Input Low Watermark),開啟層層水位的迭代計算和全局水位的計算;

4、在下游實時數據計算系統的Operator/Sinker端,可仍舊可以用日誌的Event Time來實現具體數據切分,來作為窗口計算的輸入,但是觸發窗口計算的機制,仍舊以Watermark Server 返回的全局Low Watermark為準,以保證數據數據的完整性。

圖片

△精準水位在實時計算系統之間的傳遞機制

05 實際效果和後續展望

5.1 實際線上效果

5.1.1 落地數據的實測效果(完整性)

實際線上測試,採用精準水位(配置水位精度99.9%,即只允許千分之一的源端實例延遲),在日誌沒有延遲的情況下,實時落地的數據和離線數據,在同一個時間窗口(Event Time)下效果對比如下(基本都是十萬分以下):

圖片

△源端日誌沒有延遲的情況下數據完整性效果

在源端日誌出現延遲的情況下(<=0.1%源端日誌實例延遲的情況下,水位還會持續更新),數據diff效果整體基本在千分之1 左右(受到日誌源端點對點日誌本身可能存在數據不均情況的影響):

圖片

在源端日誌出現大面積延遲的情況下(>0.1%源端日誌實例延遲的情況下),由於使用了精準的水位機制(水位精度99.9%),全局水位不會更新,實時數據寫AFS的窗口不會關閉,一直等待延遲數據的到來和全局水位得更新才會關閉窗口,以保證數據的完整性,實際測試結果如下(在千分之1.1-千分之1.2之間,受到日誌源端實例本身存在不均情況的影響):

圖片

5.2 總結和展現

經過實際精準水位的研究和實際線上的應用,基於精準水位的實時數據倉庫,在具備時效性提升的同時,具備了更高、靈活數據的精度機制,在穩定性優化後,實際上完全已經替代之前的離線和實時兩套數據倉庫系統,實現了真正意義上的流批一體數據倉庫。

同時基於中心化的水位機制,也後續面臨着性能優化、高可用(故障恢復機制的完善)和更精細粒度精準水位的挑戰(在窗口計算觸發機制下)。

——END——

參考文獻:

[1] T. Akidau, A. Balikov, K. Bekiroğlu, S. Chernyak, J. Haberman, R. Lax, S. McVeety, D. Mills, P. Nordstrom, and S. Whittle. Millwheel: Fault-tolerant stream processing at internet scale. Proc. VLDB Endow., 6(11):1033–1044, Aug. 2013.

[2] T. Akidau, R. Bradshaw, C. Chambers, S. Chernyak, R. J. Fernández-Moctezuma, R. Lax, S. McVeety, D. Mills, F. Perry, E. Schmidt, et al. The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing. Proceedings of the VLDB Endowment, 8(12):1792–1803, 2015.

[3] T. Akidau, S. Chernyak, and R. Lax. Streaming Systems. O’Reilly Media, Inc., 1st edition, 2018.

[4] "Watermarks - Measuring Time and Progress in Streaming Pipelines", Slava Chernyak , Google Inc

[5] P. Carbone, A. Katsifodimos, S. Ewen, V. Markl, S. Haridi, and K. Tzoumas. Apache flink: Stream and batch processing in a single engine. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 36(4), 2015.

推薦閲讀:

視頻編輯場景下的文字模版技術方案

淺談活動場景下的圖算法在反作弊應用

Serverless:基於個性化服務畫像的彈性伸縮實踐

圖片動畫化應用中的動作分解方法

性能平台數據提速之路

採編式AIGC視頻生產流程編排實踐