基於Flink+Hudi在興盛優選營銷域實時數倉的實踐

語言: CN / TW / HK

1.前言

什麼是流處理?引用Streaming101[1]裡面的一句話:一種資料處理引擎,設計時考慮了無限資料集。(為了完整性,這個定義包括真正的流式傳輸系統(Apache Flink、Apache Storm)和微批處理系統(Apache Spark旗下的兩款微批流處理引擎SparkStreming、Structured Streaming))。

流處理系統的出現在大資料領域是一件大事,基於流處理系統的程式設計能夠更加及時的處理資料,而且有充分的理由相信每個企業都會好好的使用,因為大部分的企業都渴望更及時的資料,以便於開展更快的戰略決策。並且現代商業中越來越普遍的海量、無限資料集,比如埋點日誌,此類是適合使用流處理系統來做清洗加工計算,因為流處理系統是在資料到達時對其進行處理,這既能保證時間的時延,也能保證資源的平滑使用,從而產生更一致的和可預測的資源消耗,所以實時數倉的建設是離不開流處理系統的。

2.背景

隨著電商業務的快速發展和運營的精細化要求,傳統的批處理(t+1、h+1模式)已經不太能滿足業務運營的需求,尤其在營銷領域,對於實時資料的獲取變得更加急迫,他們想能在更短的時間內(分鐘)獲取最新的資料,方便他們動態的調整運營策略,從而獲得更好的運營效果和GMV增長。同時由於現階段流處理系統愈發成熟,尤其是Apache Flink的快速發展,對電商的某些複雜場景也能保證資料的精準一次處理,從而保證資料的準確性,使得高質量可應用的實時數倉成為可能。

3.技術方案

圖表1 實時數倉架構圖

圖1就是興盛優選的實時數倉基本架構,除了埋點日誌這種走append hive的方式外,其它的業務域數倉(包含營銷域)均是走這一套upsert架構,這套架構可以分為三個部分;

第一部分,資料同步。資料的接入是藉助深圳研發團隊的資料同步功能(ops),通過cannal實時監控各個業務線生產資料庫的binlog日誌,然後傳送到kafka訊息佇列裡面(本層的kafka可以看做實時數倉ODS層,資料跟ODS層的hudi表資料是一致的),以供數倉開發或者其他業務線開發訂閱kafka訊息進行實時的流處理;

第二部分,資料的加工。資料的加工目前數倉團隊都是用的flink進行資料上的加工打寬,然後回寫到kafka裡面(本層的kafka可以看做實時數倉 dwd層),方便資料的複用以及流量削峰和資料的統一規範。其中資料開發藉助datastudio平臺用flinksql的方式能實現一些場景比較簡單的業務,比如:營銷的實時發券表。如果場景過於複雜,需要多條流進行join,用flinksql實現可能會由於上游資料延遲,導致雙流join的資料沒有關聯上,進而導致實時數倉丟失資料或者欄位值預設。一般遇到這種多流join的場景我們會基於flink的DataStream API的方式進行實現,DataStream API的方式相對sql比較靈活,但是更底層一點,細節考慮多一些。sql的話是比較上層的抽象,底層細節已經封裝好了,靈活性也大打折扣,無法滿足一些特殊的複雜場景。營銷實時核銷表的就是一個這樣的多流join的案例,下面會重點調這個場景講下怎麼實現的;

第三部分,資料的落地。資料的落地也是藉助中介軟體團隊的資料同步功能(ops),通過SparkStreaming寫入到hudi,hudi是支援upsert寫入的、是冪等的、是支援事務的,所以hudi是很適合電商這塊的業務,目前數倉的hudi表都是merge on read格式的表,mor表支援快照查詢、增量查詢、讀優化,一個表落地到hudi一般分為以ro/rt,比如營銷核銷實時表,它落地到hudi就分成以ro結尾、rt結尾的兩個表,fct_marketing_xxx_rt、fct_marketing_xxxx_ro,ro結尾的是Read Optimized的縮寫,讀取ro表執行效率高,rt可以理解成realtime,但是官網[6]提供的的是Snapshot這個語義,對應前面提到的快照讀,rt表的優勢是實時性高,能讀取最新的commit資料。

一般寫入hudi資料延遲基本是在3~5分鐘。同步到hudi的資料,業務方可以實現10分鐘級別、小時級別、天級別的離線任務排程,也可以利用OLAP引擎(presto)做即席查詢,靈犀有一部分資料就是通過presto的訪問數倉的資料直接展示到看板上的,極大減少了其他資料團隊的開發成本,避免了重複造輪子;如果業務方需要更快速的響應跟查詢下,分鐘級別滿足不了他們需求,也可以將可複用的實時數倉dwd的kafka資料通步到華為的Gaussdb,同步到Gaussdb的時延基本為秒級,由於該款OLAP引擎是儲存計算一體化的,所以查詢效能相比presto來說也更優。

4.資料流向圖

圖表2 資料流向圖

圖2是營銷實時核銷的一個數據流向,其中支付券主表、支付券子表、訂單子表這三個都是接的生產的業務庫的實時產生的binlog訊息,然後通過資料同步元件同步到kafka,後面flink先對支付券兩條流進行雙流join,同時遲到的資料我們會在程式碼裡面的測流捕獲到,等這支付的兩條流合併成一條流,我們在跟子訂單的流進行雙流join,同時兩邊遲到的資料我們都會在測流捕獲到,等三條流都join完畢,我們就獲取Gaussdb的實時維度表資料,確保關聯的維度資訊是最新的,然後回寫到dwd層kafka,最後同步到hudi或者Gaussdb的dwd層供業務方使用。

5.實現細節

針對flink的自身的幾個雙流join的缺點以及實時訂單以及遇到的難點痛點:

1.Interval Join的缺點:

a.針對歷史資料處理。Interval Join是基於EventTime事件時間來處理資料的,沒有考慮重新消費歷史資料會導致資料關聯不上丟失資料的情況,因為Interval Join設定的視窗長度不能滿足消費歷史資料的時間跨度,但是如果將Interval Join的視窗長度設定很大,一旦消費的歷史資料時間跨度長加上資料量也大,這樣會導致flink的快取的狀態會很大,導致影響checkpoint的完成,進而影響程式的穩定性;

b.針對正在進行實時資料處理。Interval Join的視窗邊界不好確定,如果資料一旦延遲超過了設定的邊界,這樣會導致資料關聯不上進而會導致丟資料,過大的視窗長度會影響程式穩定性,同第一點;

2.Region Join(Inner Join、Left Join)會儲存流進資料的所有狀態,會導致flink程式的狀態太大,並且同Interval join,消費歷史資料會導致狀態瞬間膨脹很大,導致checkpoint時間過長導致任務失敗重啟,進而會被壓導致flink程式崩潰。

3.之前訂單生產的雙流join就是用的Interval Join,一旦訂單kafka資料有延遲,超過了設定的視窗長度,就要通過很麻煩的補數方式來修復訂單資料,不能通過重置kafka的offset至出問題之前的點位來回放資料,從而導致補數方式流程多而複雜,每次補數都浪費不少人力精力。

針對以上的缺點,我們數倉團隊內部想拋棄之前的Interval Join的方式,來用全新的雙流join的方式解決資料丟失、恢復回溯資料困難麻煩、快取狀態過大的問題。

這裡舉例是實時營銷核銷的場景,實時營銷核銷有3條流需要關聯,分別是t_payment_ticket(支付券主表)、t_payment_ticket_item(支付券子表)、t_trade_order_item_area(訂單子表)。

圖3是t_payment_ticket跟t_payment_ticket_item進行雙流join程式碼塊,首先,我們是用了兩個比較底層的運算元,第一個是connet運算元把兩條流連線起來,Interval Join跟Region Join底層也是用connet將兩條流連線起來的,然後通過業務主鍵分組KeyBy,最後用process運算元對兩條流進行計算。

圖表3 支付券主表跟支付券子表join程式碼塊

Process Function裡面的核心功能可以分為四部分:

1.快取兩條流的資料到狀態裡面。

在process function的procesElement1方法裡面[圖4],支付券主表的資料一進來我們會快取到定義的ValueState[圖5]的狀態裡面,與此同時在process function的procesElement2方法裡面[圖6],支付券子表的資料一進來會快取在定義的MapState[圖5]裡面,因為支付券子表主鍵是子訂單id跟券id所以定義了一個MapState,防止同一個券的不同子訂單的資料丟失。

2.從相對流的狀態裡面找到同一個key的資料,然後join輸出。

在procesElement1方法裡面[圖4],支付券主表會去支付券子表儲存的狀態資料,如果能在狀態裡找到對應的支付券子表資料,我們就把它們關聯起來然後輸出出去,並通過迭代器把已經找的支付券子表資料從狀態中移除。同理procesElement2方法裡面[圖6],唯一不同的是支付券子表通過在狀態裡面找到的支付券主表資料不能移除,因為他們之間的關聯關係是一對多,如果一但某條流資料發生延遲,那個一對多關係的一是總是要往下游輸出,資料才能在第二次加工時關聯上。

圖表 4支付券主表資料處理程式碼塊

圖表5 state初始化程式碼塊

圖表6 支付券子表資料處理程式碼塊

3.註冊定時器,觸發定時器,在OnTimer方法裡測流輸出延遲資料。

在procesElement1、procesElement2方法裡面都註冊了一個定時器,兩個定時器共享一個狀態資料[圖5],定時器時間是根據業務理論上會產生多大的延遲自定義的,然後通過ProcessTime註冊定時器,然後更新定時器的狀態資料。最後等到達指定的定時器時間會回撥onTimer方法[圖7],

onTimer方法裡面的功能就是輸出延遲資料跟清空狀態,通過flink的側流輸出,輸出支付券主表或者支付券子表延遲資料到下游,下游通過獲取程式碼中的OutPutTag設定的Name。在已經確定遲到的資料往下游發了,我們會把之前定義的state[圖5]裡面的所有狀態資料都清除。

4.捕獲延遲資料再加工

我們會在main方法裡面捕獲在定時器OnTimer方法作了tag標記的延遲資料,然後呼叫同樣的邏輯[圖1]然後再處理一遍,只是定時器的時間長短不一樣,第二道邏輯處理的時候可以根據數量大小靈活設定定時器時間大小,以免狀態過大導致程式不穩定,通過從現有生產跑的任務觀察來看,流入第二道處理邏輯的資料量不會太多,經過兩道程式最後我們把正常雙流join的資料跟遲到的雙流join資料union起來成一條新的完整的流跟訂單子表關聯,跟訂單子表關聯的邏輯基本跟之前概述的是一樣的,後面不做過多介紹。

圖表7 定時器處理程式碼塊

圖表8 側流輸出二次加工程式碼塊

通過Datastream API的方式,解決了上面提到的狀態過大,視窗時間邊界不好確定,因為Flink 自帶的Interval Join採用的是EventTime事件時間,而程式碼的方案是ProcessTime處理時間,兩者的區別就是EventTime是資料產生的時間,比如消費3天前的資料,視窗必須開3天;而ProcessTime是flink程式處理的時間,同樣消費3天前的資料,比如我開窗2個小時,我只要保證我的資料能在2個小時之內消費完歷史資料並趕上正常的流資料,就不會發生丟失資料的情況,所以第三個問題資料回溯問題也一起解決了。

6.總結

在業務對於實時資料指標需求的驅動下,建設資料指標更加及時的實時數倉是一個非常重要的方向,目前數倉團隊的實時數倉建設已經覆蓋了訂單、日誌、營銷、售後、門店、使用者、物流、商品這幾大域。建設實時數倉的難點就在於面對複雜處理邏輯情況下如何保證資料的準確性,資料準確性是考核數倉非常重要的一個指標,所以無論離線還是實時我們都盡最大的努力保證資料的準確性,尤其是在實時數倉這塊,我們團隊探索了很多方法,也遇到不少問題和補過很多次數,經過不斷的總結和分析,我們當前初步探索出一些針對複雜場景的解決辦法。後續我們團隊將會根據業務需求針對更多的業務域和通用場景輸出的更多的實時寬表,在保證準確性及時性的前提下,保證有更多的業務可以使用到實時的指標資料。

參考文獻:

[1]https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/

[2]https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

[3]Apache Flink:Stateful Computations over Data Streams

[4]https://storage.googleapis.com/pub-tools-public-publication-data/pdf/43864.pdf

[5]Overview-Spark 3.3.0 Documentation (apache.org)

[6]Overview|Apache Hudi