騰訊雲 AI 視覺產品基於流計算 Oceanus(Flink)的計費資料去重嘗試

語言: CN / TW / HK

作者:kaibinli(李凱斌),騰訊 CSIG 專家工程師

| 導語:介紹下最近使用 Flink 來對計費資料進行去重的具體做法

一. 背景

AI 視覺產品在我們騰訊雲-人工智慧的產品目錄下,包括人臉識別、人臉特效、人臉核身、影象識別、文字識別等。   流計算 Oceanus 在騰訊雲-大資料的產品目錄下,是基於 Apache Flink 構建的企業級實時大資料分析平臺。   AI 視覺產品是按呼叫量計費,畢竟涉及到錢,使用者對計量資料準確是非常敏感的;另外呼叫量本身也比較大,如何保證資料的準確一致也是一個比較大的挑戰。

  • 資料不準: 主要包括資料丟失和資料重複(當然可能有其他問題比如上報的資料本身錯誤等,暫不屬於本次討論範圍)。  

  • 資料丟失: 相當於呼叫量少算,會影響我們的收入。一方面我們通常重試、持久化等方式儘量減少資料的丟失,目標當然是完全不丟,但很難做到100%不丟。另一方面很少量的資料丟失對於實際收入影響很小,對使用者基本沒有影響。  

  • 資料重複: 相當於呼叫量多算就會多收使用者錢,使用者一旦發現肯定會投訴過來。所以是必須要去解決的,但是資料量很大,要做到精確去重比較難。

整體的背景和處理邏輯可以參考如下業務流程圖, 本次主要介紹下我們在資料去重方面的一些嘗試。

系統架構圖:

二. 思路與調研

去重的觸發時機: 資料重複的原因主要是各種重試:包括上游傳輸環節的超時重試和下游計算環節的系統重啟導致的資料重算。因為我們通常使用的是最終的資料,只要保證最終資料不重複即可,所以只要在最後的計算環節進行一次去重就可以,前面的環節不用處理。

去重的技術手段: 保證資料處理中不重複、不丟失(資料一致),通常有 2 個技術手段:事務和冪等可重入(冪等重入可能出現部分資料插入了的時間段,沒有事務還能保證過程中的精確,但如上所述我們只要最終資料一致,所以冪等也是可以的)。

事務的實現難度高,尤其在分散式或多個元件要用到 2PC 之類的事務,更加複雜;所以通常事務都是元件本身成熟的實現,很少從頭開發的;而冪等通常是使用資料的唯一鍵來保證去重,但是在我們資料累計這裡不適用,因為聚合時的資料的順序和數量在每次計算時不是固定的,所以如果出現重啟要重新計算時並不能生成和上次一樣的唯一鍵,就難以使用鍵去重。

經過調研發現 Flink 本身是支援 2PC 事務和內部的狀態儲存,可以做到 exactly-once,當然使用起來會有成本(包括學習成本、問題排查等,Flink 的開發入門和資料可以參見 Flink 入門 1-零基礎使用者實現簡單 Flink 任務 [2] 和 Flink 入門9-Jar 作業開發 [3])。 考慮到後續我們資料量增加後的資料處理能力以及其他一些流處理的場景都還是會用到 Flink,所以與其自己 DIY 不如使用成熟開源的元件,也符合當前開源協同的趨勢,所以決定直接使用 Flink 裡面的去重,下面是 Flink 實現 2PC 的流程:

2PC 事務有 2 個角色:協調者(發起者、控制者)和參與者(要支援本地事務),如上圖所示。 Flink 的 JobManager 是協調者;Flink 內部的狀態、流程屬於內部參與者;Kafka 作為 Source 和 Sink 是外部參與者,尤其是作為 Sink 的 Kafka 要選擇支援事務的版本(>=0.11)。

Flink 介紹事務的可以參考 官方文件[4]。

Spark vs Flink 在 excatly-once 上的對比討論,也說到很多基本概念的理解: https://zhuanlan.zhihu.com/p/77677075

三. 實現

基本的流程和上圖一致,基本程式碼如下:

簡單介紹下每個步驟,同時討論下可能的問題: 

0. 前提:

Flink 開啟 Checkpoint,相當於 Flink 打開了 redo、undo log 等持久化的機制,是事務的基礎。

1. 事務開始: 

從上一個處理完成 offset 消費 Kafka 資料,當然這裡 Kafka 裡面的資料格式需要自行去解析,可以做些簡單的處理。 

2. 事務處理:

2.1 按照使用者 KeyBy 分流,提高併發

按照使用者分流可以保證同一個使用者在同一個處理流中,從而保證資料去重(不同使用者的資料認為不會重複的)。當然這裡也有一個數據傾斜的問題:如果某個使用者呼叫量特別大就會導致部分流負載很高,拖累這個處理的速度,目前我們的資料分佈根據測試還好。通過學習瞭解到如果資料傾斜嚴重可以再次選擇更好的 Key 分流:比如可以按照使用者 RequestId 的字首進行分流更均勻,另外 Flink 也提供了 rebalance 的介面強制將資料打散,當然要符合邏輯資料分佈要求。 

2.2 宣告 map 狀態儲存處理過的資料,用於去重

測試時可以選 memory,但是線上還是要使用 Rocksdb 應對資料量大的場景,同時要開啟 TTL 機制避免狀態太大對記憶體和 Checkpoint 儲存和恢復時產生太大的 IO 壓力,開始時建議先選擇比較短的 TTL,觀察記憶體和負載再逐步調大,目前我們 TTL 可以到 15 分,希望可以逐步調大到 1 小時 - 10 小時。   當然當這裡如果資料量特別大時,用到的儲存也就很,很容易磁碟、記憶體產生大的壓力,所以這裡要進行實際的測試和調整:比如增多機器提高併發,或者使用 Rocksdb 增量式的 Checkpoint 等。   這裡儲存資料的時間長短決定了去重的資料的範圍,如果太大如上所述對儲存壓力很大,造成 Flink 執行不穩定;但如果太小隻能小區域性去重,對於跨度比較大的資料重複不能應對,比如跨天的資料也可能重複,在離線上報的鏈路中就可能跨天重試的,通常在實時上報的鏈路不會出現,對於這種長時間還有重複的,目前想到有 2 個處理的方向(還沒具體落地):  

  1. 使用 Redis 儲存處理過的資料(不要求很及時),上報時先去這裡去重;問題首先是對儲存壓力增大不少,同時要增加一次查重的耗時

  2. 要求上報方記錄下上報的結果不要重複上報,即使重複上報時間間隔也不能太長這裡雖然對業務不友好,但是可以理解,畢竟極端情況下也有現在 30 號要從新上報 1 號的資料也可能出現,那如果用方案 1,就要 1 個月的資料完全儲存下來成本太大。

2.2.1 去重 Key 的選擇

通常來說直接選 RequestId 就可以,當然保險起見,加上使用者維度也是可以的(可以應對下 RequestId 少量重複的情況)。但測試中發現幾個問題: 

  1. 使用者一次的請求,到後臺業務對應多次處理都上報給計費了,組合結果後返回給使用者——這樣就會導致只統計了其中一個操作,其他操作被去重過濾。解決的方法就是去重 Key 也加上 action。 

  2. 接入層重試的情況,第一次請求處理失敗上報了失敗,然後接入層重試成功了上報了成——這樣就會導致只統計了失敗的,成功的上報被過濾了。解決的方法也是加上錯誤碼。      

3. 資料聚合:

3.1 目前 window 選擇1分鐘粒度聚合彙總

3.2 聚合 Key 根據業務需要進行選擇

3.3 出庫到 Kafka 時生成一個 uuid

uuid 是 java 自帶的函式生成,相當於一個全域性唯一隨機數,好處就是有了唯一鍵,後面資料處理、入庫時就很方便。

4. 輸出到 Kafka:

目前 Flink 內建的支援事務的 Sink 只有 Kafka>0.11。當然可以根據 Flink 的 2PC (兩階段提交) 介面自行去實現需要後端的 Sink,比如 MySQL、PG。這裡我們使用 Kafka 作為輸出除了簡單成熟,另外就是考慮到如果資料量增大,Kafka 這裡的大資料能力就是天生的,資料庫就需要擴容或替換——當然這裡增加了一個 Kafka 到 PG 的同步的流程,流程變的更長了;但是考慮到後續資料量大和解耦的考慮,還是推薦出庫到 Kafka。

下面是 Flink 2PC 的 Sink 要繼承實現的介面: 

public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {

// 開始一個事務,返回事務資訊的控制代碼
protected abstract TXN beginTransaction() throws Exception;

// 預提交(即提交請求)階段
protected abstract void preCommit(TXN transaction) throws Exception;


// 正式提交階段
protected abstract void commit(TXN transaction);


// 取消事務
protected abstract void abort(TXN transaction);
}

4.1 開啟 Kafka 事務出庫  

如上所述,Flink 的 Kafka 聯結器在流計算 Oceanus (Flink) 平臺已經支援,可以直接使用。這裡是事務出庫到 Kafka 的,那麼後續讀取 Kafka 這裡的資料也要配置 read_commited 的級別的讀,整個鏈路資料一致。

四. 問題

在上面的流程說明中已經就每步可能的問題進行了說明和討論,但是肯定還有新的問題,就需要後續運營過程中發現進行修復。這裡我們預料比較麻煩的問題是:   如果 2PC 事務過程中出現異常問題時,是否可以比較快、完美的恢復回來;否則可能出現死鎖或啟動不起來的情況。   以上是最近 2-3 個月的實現的情況,後面還會繼續驗證、繼續發現問題,所以還是要進一步的學習和理解 Flink 的底層機制,甚至可以進行程式碼級的貢獻——這一步肯定非常難,短時間不可能完成,初期投入很多但是產出不多,但是可以肯定的是值得長期投入。   本文作為 Flink 應用的一次嘗試,如發現有錯誤請直接指出,同時歡迎有相同需求的同學一起討論。

五. 參考連結

[1] Flink 實踐教程:入門 1-零基礎使用者實現簡單 Flink 任務: https://cloud.tencent.com/developer/article/1895677

[2] Flink 實踐教程:入門9-Jar 作業開發: https://cloud.tencent.com/developer/article/1907822

[3] Flink 事務介紹: https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html    

流計算 Oceanus  限量秒殺專享活動火爆進行中↓↓

點選文末 「閱讀原文」 ,瞭解騰訊雲流計算 Oceanus 更多資訊 ~

騰訊雲大資料

長按二維碼
關注我們