精準水位在流批一體資料倉庫的探索和實踐
作者 | 浮生若夢的石頭
導讀
隨著實時計算技術在大資料中的廣泛應用,資料的時效性得到大幅度,但是實際應用場景中,除了時效性,還面臨著更高的技術要求。
本文結合實時計算的水位技術在流批一體資料倉庫中的探索和實踐,重點闡述了水位技術的概念和相關理論實踐,尤其就水位在實時計算系統中的特性、邊界定義和應用,最後重點描述了一種改進的精準水位的設計和實現。該技術架構目前在百度實際業務場景下表現成熟和穩定,藉此分享給大家,希望對大家有參考價值。
全文7118字,預計閱讀時間18分鐘。
01 業務背景
為了提升產品研發、策略迭代、資料分析以及運營決策的效率,業務對資料的時效性要求越來越高。
雖然我們很早就基於實時計算實現了實時資料倉庫的建設,但是還是無法取代離線資料倉庫,實時和離線資料倉庫各自一套開發和維護的成本高,最重要的是業務的口徑還不能100%對齊。所以我們一直在致力於建設一套流批一體資料倉庫,在實現整體資料加工效率提速的同時,還能保證資料如離線資料那樣可靠,能支援100%業務場景,從而實現整體降本提效。
△流批一體資料倉庫建設思路
02 流批一體資料倉庫的技術難點
要想端到端實現流批一體資料倉庫,作為底層技術架構的實時計算系統,面臨著很多技術難點和挑戰:
1、端到端資料的嚴格不重不丟,以保證資料的完整性;
2、實時資料的視窗和離線資料的視窗,包含資料是對齊的(99.9% ~ 99.99%);
3、實時計算需要支援精準的視窗計算,以保證實時反作弊策略的準招效果;
4、實時計算系統和百度內部大資料生態打通,並有實際大規模線上穩定執行實踐。
以上2和3點,都需要高可靠的水位機制來確保實時資料的進度感知和精準切分。
於是本篇文章就精準水位在流批一體資料倉庫中的探索和實踐的經驗,分享給大家。
03 水位概念和通用實現的現狀
3.1 水位的必要性
在介紹水位(Watermark)的概念之前,需要先插入2個概念:
-
Event time, 事件發生時間。我們一般理解為使用者真實行為發生的時間,具體對應是日誌中記錄使用者行為發生的時間戳。
-
Processing time, 資料處理時間。我們一般理解為系統處理資料的時間。
那水位(watermark)具體有什麼用處?
在實際實時資料處理過程中,資料是無邊界的(Unbounded), 那麼基於Window這種視窗計算或其他類似場景就面臨一個實際的問題:
怎麼知道某個視窗的資料是完整的?什麼時候才能觸發視窗計算()?
大多數情況下,我們使用Event Time來觸發視窗計算(或者資料分割槽切分,對標離線)。然而實際的情況是實時日誌總有不同程度的延遲(在日誌採集、日誌傳輸和日誌處理等階段),即如下圖所示,實際上會發生水印的傾斜(即資料會出現亂序)。在這種情況下 , Watermark機制就很有必要存在,來確保資料的完整性。
△水位傾斜現象
3.2 水位的定義和特點
水位(watermark)的定義目前業界沒有統一的說法,結合**Streaming Systems**一書(作者是Google Dataflow 研發團隊)中定義,個人以為比較確切:
The watermark is a monotonically increasing timestamp of the oldest work not yet completed.
從定義我們可以概括出水位的2大基本特性:
-
水位是連續遞增的(不可回退)
-
水位是一個時間戳
然而在實際生產系統中,水位如何去計算,以及實際的效果是什麼樣子?結合目前業界不同的實時計算系統,對於水位的支援還是不一樣的。
3.3 目前水位現狀和麵臨的挑戰
在目前業界的實時計算系統中,比如Apache Flink(Google Dataflow的開源實現)、Apache Spark(僅侷限Structured Streaming框架)中,都是支援水位的,下面就以社群最火爆的Apache Flink列舉一下水位的實現機制:
但是以上水位的實現機制和效果,在日誌源端出現大面積日誌延遲傳輸的情況下,水位還依舊會更新(新舊資料亂序傳輸)推進,會導致對應的視窗資料不完整,視窗計算不準確。因此,在百度內部,我們基於日誌採集和傳輸系統、實時計算系統探索了一種改進的、相對精準的水位機制,以確保實時資料在視窗計算、資料落地(sink 到AFS/Hive)等應用場景下,視窗資料的完整性問題,以滿足實現流批一體資料倉庫的要求。
△Flink中水位生成策略
GEEK TALK
04 全域性水位的設計和應用
4.1 水位中心化管理的設計
為了使得水位在實時計算中更精準,我們設計出一種中心化的水位管理思路,即實時計算的各個節點,包含source、operator、sinker等都會把自己計算的水位資訊,統一上報給全域性的Watermark Server,由Watermark Server 來進行水位資訊的統一管理。
△中心化水位設計
Watermark Server :維護一個水位的資訊表(hash_table),包含實時計算程式(APP)整體拓撲資訊(Source、Operator 和Sinker等)各個層級對應的水位資訊,以便於進行全域性水位(比如low watermark)的計算,Watermark Server 定期和state做互動,以保證水位資訊的不丟失。
Watermark Client:水位更新客戶端,在source、worker和sinker等實時運算元中,負責向Watermark Server 上報和請求水位資訊(比如上游或者全域性水位),通過baidu-rpc服務請求回撥。
Low watermark(低水位):Low watermark是一個時間戳,用來標記實時資料處理過程中最早(oldest)的沒有處理的資料的時間(Low watermark, which pessimistically attempt to capture the event time of the oldest unprocessed record the system is aware of.)。它承諾未來不會有早於該時間戳的資料到達。這裡的時間計算一般基於eventtime,即事件發生時間,例如日誌中使用者行為發生的時間,而較少使用資料處理時間(processing time,某些場景也可以用),watermark計算的公式為(來自Google MillWheel 論文):
Low Watermark of A = min(oldest work of A, low watermark of C : C outputs to A)
但是在實際系統設計中,low watermark又可以按照運算元處理的邊界區分如下:
-
Input Low Watermark: Oldest work not yet sent to this streaming stage.
InputLowWatermark(Stage) = min { OutputLowWatermark(Stage’) | Stage’ is upstream of Stage}
輸入最低水位,可以理解為將要輸入當前運算元,即上游運算元處理過的資料的watermark。
-
Output Low Watermark: Oldest work not yet completed by this streaming stage.
OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) }
輸出最低水位,可以理解為當前運算元未處理過資料的最早的(oldest)水位,即處理過資料的水位。
具體如下圖所示,理解會更形象些。
△Low watermark的邊界定義
4.2 如何實現精準水位
4.2.1、精準水位的前提條件
目前實時計算系統在實時資料倉庫的應用場景,我們都是使用low watermak來觸發視窗計算(因為這樣更可靠),從3.1中low watermark的定義我們可知:low watermark是層級迭代計算的,水位是否精準,取決於最上游(即source)水位的精準度。於是為了提升源頭水位計算的精準度,我們需要前提條件:
-
日誌在服務端的單臺伺服器上是按照時間(event_time)有序生產的
-
日誌在採集時候,除了真實的使用者行為日誌,還需要包含其他資訊,比如伺服器tag(hostname)和日誌時間(msg_time)等資訊,如下圖所示
△日誌打包資訊
-
日誌是實時點對點發布到訊息佇列,以保證訊息佇列單個分割槽(partition)內,單個伺服器的日誌是嚴格有序的
△源端日誌點對點發布到訊息佇列,保證單分割槽日誌是有序的
4.2.2、水位的計算方式
1、Watermark server
初始化:
首先作為獨立的執行緒(thread)啟動。根據配置的日誌傳輸任務的BNS(Baidu Naming Service,百度名字服務,提供服務名稱到服務端所有執行例項的對映)來解析日誌源的伺服器列表(hostname list);根據配置的APP拓撲關係,初始化watermark資訊表,並持久化寫入Table(百度分散式kv儲存引擎)。
普通水位資訊更新:接收到Client到水位資訊並更新對應粒度(Processor粒度或者keygroup粒度)的水位,對區域性水位進行更新
精準水位計算:
現實中,如果要求源端的日誌100%都精確的到達,會造成頻繁的延遲或者延遲太久(如果下發採用全域性Low watermark邏輯)。原因是:在日誌端伺服器例項太多的情況下(比如實際上我們有的日誌有例項6000 - 10000個),總有線上服務的例項會出現日誌實時上傳的延遲的情況,那麼這就需要在資料的完整性和時效性之間做一個折中,比如以百分比的形式來精準控制允許延遲的例項個數(比如配置99.9% 或者99.99%來設定允許源端日誌出現延遲的比例),來精準控制最源端水位的精確度。
精準水位需要特殊配置,根據Source端實時上報的伺服器和日誌進度的對映關係,以及配置的允許延遲例項的比例,來計算Source端的output low watermark。
計算全域性low Watermark:會計算一個全域性最小的水位,返回給Client端的請求
狀態持久化:定期把全域性水位資訊持久化寫入外部儲存,以便於狀態恢復
2、Watermark Client
Source 端:解析日誌包,並獲日誌包裡面的機器名等資訊和原始的日誌。原始日誌經過ETL處理後,並根據原始的日誌獲取最新時間戳(event_timestamps),Source通過Watermark Client API 把解析到hostname和最新時間戳(event_timestamps)的對映關係表定期上報(目前配置的1000ms)到Watermark Server。
△Source通過解析日誌獲取的伺服器和日誌進度對映關係
Operator端:
Input low Watermark計算 : 獲取上游(Upstream)的output low watermark,作為input low watermark來決定是否觸發視窗計算等操作;
output low Watermark計算:根據日誌、狀態(state)等處理進度(oldest work)來計算自己的output low watermark,並上報到Watermark Server,以便於下游運算元(Download Processor)使用。
△Watermark Client 工作流程
Sinker端:
Sinker端和上面的普通實時運算元(Operator)一樣,會計算Input Low Watermark和 Output Low Watermark來更新自己的水位,
額外需要請求一個全域性的Low Watermark 來決定資料的輸出視窗是否關閉。
4.3 精準水位在系統間的傳遞
水位傳遞的必要性
很多時候,實時系統並不是孤立的,多個實時計算系統之間存在著資料的互動,最為常見的方式是兩個實時資料處理系統是上下游的關係。
具體表現為:兩個實時資料處理系統之間通過訊息佇列(比如社群的Apache Kafka)來實現資料的傳遞,那麼在這種情況下,如何實現精準水位的傳遞呢?
具體實現步驟如下:
1、上游實時計算系統的日誌源,保證日誌是點對點發布的,這樣可以保證全域性水位的精準度(具體比例是可調的);
2、在上游實時計算系統的輸出端(sinker/exporter 到訊息佇列端),需要保證使用全域性low watermark的下發,目前我們採用把全域性水位資訊列印到每條日誌上面來實現傳遞;
3、在下游實時資料計算系統的Source端,需要解析日誌攜帶的水位資訊欄位(來自上游實時計算系統),並開始作為水位的輸入(Input Low Watermark),開啟層層水位的迭代計算和全域性水位的計算;
4、在下游實時資料計算系統的Operator/Sinker端,可仍舊可以用日誌的Event Time來實現具體資料切分,來作為視窗計算的輸入,但是觸發視窗計算的機制,仍舊以Watermark Server 返回的全域性Low Watermark為準,以保證資料資料的完整性。
△精準水位在實時計算系統之間的傳遞機制
05 實際效果和後續展望
5.1 實際線上效果
5.1.1 落地資料的實測效果(完整性)
實際線上測試,採用精準水位(配置水位精度99.9%,即只允許千分之一的源端例項延遲),在日誌沒有延遲的情況下,實時落地的資料和離線資料,在同一個時間視窗(Event Time)下效果對比如下(基本都是十萬分以下):
△源端日誌沒有延遲的情況下資料完整性效果
在源端日誌出現延遲的情況下(<=0.1%源端日誌例項延遲的情況下,水位還會持續更新),資料diff效果整體基本在千分之1 左右(受到日誌源端點對點日誌本身可能存在資料不均情況的影響):
在源端日誌出現大面積延遲的情況下(>0.1%源端日誌例項延遲的情況下),由於使用了精準的水位機制(水位精度99.9%),全域性水位不會更新,實時資料寫AFS的視窗不會關閉,一直等待延遲資料的到來和全域性水位得更新才會關閉視窗,以保證資料的完整性,實際測試結果如下(在千分之1.1-千分之1.2之間,受到日誌源端例項本身存在不均情況的影響):
5.2 總結和展現
經過實際精準水位的研究和實際線上的應用,基於精準水位的實時資料倉庫,在具備時效性提升的同時,具備了更高、靈活資料的精度機制,在穩定性優化後,實際上完全已經替代之前的離線和實時兩套資料倉庫系統,實現了真正意義上的流批一體資料倉庫。
同時基於中心化的水位機制,也後續面臨著效能優化、高可用(故障恢復機制的完善)和更精細粒度精準水位的挑戰(在視窗計算觸發機制下)。
——END——
參考文獻:
[1] T. Akidau, A. Balikov, K. Bekiroğlu, S. Chernyak, J. Haberman, R. Lax, S. McVeety, D. Mills, P. Nordstrom, and S. Whittle. Millwheel: Fault-tolerant stream processing at internet scale. Proc. VLDB Endow., 6(11):1033–1044, Aug. 2013.
[2] T. Akidau, R. Bradshaw, C. Chambers, S. Chernyak, R. J. Fernández-Moctezuma, R. Lax, S. McVeety, D. Mills, F. Perry, E. Schmidt, et al. The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing. Proceedings of the VLDB Endowment, 8(12):1792–1803, 2015.
[3] T. Akidau, S. Chernyak, and R. Lax. Streaming Systems. O’Reilly Media, Inc., 1st edition, 2018.
[4] "Watermarks - Measuring Time and Progress in Streaming Pipelines", Slava Chernyak , Google Inc
[5] P. Carbone, A. Katsifodimos, S. Ewen, V. Markl, S. Haridi, and K. Tzoumas. Apache flink: Stream and batch processing in a single engine. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 36(4), 2015.
推薦閱讀:
- 精準水位在流批一體資料倉庫的探索和實踐
- 視訊編輯場景下的文字模版技術方案
- 淺談活動場景下的圖演算法在反作弊應用
- 百度工程師帶你玩轉正則
- Serverless:基於個性化服務畫像的彈性伸縮實踐
- 百度APP iOS端記憶體優化-原理篇
- 從稀疏表徵出發、召回方向的前沿探索
- 效能平臺數據提速之路
- 採編式AIGC視訊生產流程編排實踐
- 百度工程師漫談視訊理解
- PGLBox 超大規模 GPU 端對端圖學習訓練框架正式釋出
- 百度工程師淺談分散式日誌
- 百度工程師帶你瞭解Module Federation
- 巧用Golang泛型,簡化程式碼編寫
- Go語言DDD實戰初級篇
- 百度工程師帶你玩轉正則
- Diffie-Hellman金鑰協商演算法探究
- 貼吧低程式碼高效能規則引擎設計
- 淺談許可權系統在多利熊業務應用
- 分散式系統關鍵路徑延遲分析實踐