Alink、Tensorflow on Flink 在京東的應用

語言: CN / TW / HK

▼ 關注「 Apache Flink 」,獲取更多技術乾貨 

摘要: 文整理自京東搜尋推薦演算法工程師張穎、 劉露 在 Flink Forward Asia 2021 的分享。主要內容包括:

  1. 背景

  2. 京東搜尋推薦機器學習現狀

  3. 基於 Alink 實現線上學習

  4. Tensorflow on Flink 應用

  5. 規劃

Tips: 點選 「閱讀原 文」 檢視原文影片 & 演講PDF~

一、背景

搜尋和推薦是網際網路應用的兩個核心入口,大多數流量都來自於搜尋和推薦這兩個場景。京東零售按站點,分為主站、京喜、海外站以及一些垂直領域站點。

對於搜尋業務來講,每個站點下會有關鍵詞搜尋、下拉發現、以及店鋪、優惠券、訂單等細分頁面的搜尋;對推薦業務來講,依照應用場域不同,劃分了大大小小几百種推薦位。

以上每一種業務場景下,都包含了十多種策略環節,需要機器學習模型支援。基於海量的商品資料、海量的使用者行為,作為機器學習的特徵樣本。

除了搜推廣領域中典型的意圖識別、召回、排序、相關性模型以外,京東搜尋推薦為了更好的維護使用者、商家、平臺這三方的生態,在智慧運營、智慧風控、效果分析這些環節,也越來越多的引入模型進行決策。

二、京東搜尋推薦機器學習現狀

我們依據服務場景和服務時效的差異,將這些機器學習場景進行分成了三類:

  • 一種模型是在使用者訪問搜尋或推薦頁面時,即時請求到的商品召回、排序、意圖識別等模型, 這類模型在服務層面對響應時間要求極高,預估服務位於線上系統中。

  • 另一種模型是對服務響應時間要求不高,但對模型的訓練和預估有一定的時效性要求,比如實時使用者畫像、實時反作弊模型,這裡我們把它稱作是近線場景。

  • 第三種是純離線的模型場景,比如商品或使用者的長期畫像、針對於各種素材標籤的知識圖譜, 這些場景的訓練和預測對時效要求相對較低,全在離線環境下進行。

我們來看下當前主要的模型服務架構是怎麼樣的:

京東搜尋和推薦系統由於業務系統本身的差異,分別由不同的 kernel 鏈式模組,組成為搜尋系統和推薦系統。

一次使用者搜尋,會逐級請求鏈路上的各級服務,先對關鍵詞過 QP 服務,走意圖識別模型;再由召回服務並行請求各路的召回,會依次呼叫召回模型、相關性模型、粗排模型;然後排序服務彙總結果集後,會呼叫精排模型、重排模型等。

一次使用者訪問推薦的業務過程,有一些差異,但整體上流程比較接近。

這兩個大業務下層,會共享一些離線、近線基礎用途的模型,比如使用者的畫像、素材標籤、各種指標分析。

他們訪問的模型服務架構,都由訓練 + 預估兩部分組成,中間由模型倉庫和引數服務橋接起來;特徵方面,線上場景需要特徵伺服器,離線場景則由資料鏈路組成。

從模型形態上,我們可以把現有模型劃分成兩種形態:

  • 左側一類模型,單體規模相對複雜,採用資料並行的形式對同一組引數進行訓練,使用自研引數伺服器對超大規模稀疏引數進行訓練,訓練和預估的架構相互分離。

  • 右側一類模型,單體模型相對簡單,資料量和業務粒度繁多,按不同業務粒度進行資料劃分,分別建模,由流式計算框架來驅動資料流轉,做到訓練和預估的架構一體。

基於線上服務和離線訓練的架構差異,多數模型系統會是這種線上和離線分離的系統形態。訓練過程是基於 Tensorflow、Pytorch 進行一層封裝。

樣本生產和預處理,是基於 Flink 構建出的樣本鏈路框架,其中很多線上業務的特徵,源於線上服務的 Featurelog 特徵日誌;模型訓練和樣本生產構成了離線部分, 依賴一些公共的基礎元件比如 Hive、Kafka、HDFS;預估過程基於自研的預估引擎,在 CPU 或 GPU 上進行 Inference 計算,大規模稀疏向量由獨立的引數伺服器提供;特徵服務為預估過程提供輸入資料,也是由自研的特徵服務構成,由於預估時特徵來源和訓練時不同,有一層統一的特徵資料獲取介面,以及對應的特徵抽取庫。

特徵抽取和模型預估構成了線上部分和離線部分分離。

在模型迭代的形態上,對時效有較高要求的模型,一般是先離線使用歷史累積的批式資料訓練得到 Base 模型,部署上線之後,繼續用實時資料流樣本在其基礎上持續的訓練和迭代上線。

由於預估和訓練在兩套架構下,持續迭代的過程就涉及兩套架構的互動、資料傳遞,以及一致性方面的要求。

訓練以及預估需要結合資料狀態,自主實現容錯轉移、故障恢復的能力。如何將資料的分散式處理和模型的分佈模式結合為一個整體,便於部署和維護,也是一個不易實現的功能。對不同模型,載入和切換預訓練引數的模式也難做到統一。

三、基於 Alink 實現線上學習

首先,我們來分析一下線上學習系統的痛點:

  1. 離線/流式訓練架構難以統一: 典型的線上學習首先由離線的大批資料訓練出一個模型作為 basic model,之後在這個 basic model 的基礎上持續的進行流式訓練,但是這個鏈路下流式訓練和離線訓練是兩套不同的系統、程式碼體系,比如說,一般 offline train 和 online train 是兩套不同的架構體系。offline 的訓練可能是一個普通的離線任務,線上的訓練可能是單機啟動的一個持續的訓練任務,這兩種任務系統不同、體系不同,甚至如果線上訓練是用 Spark/Flink 跑的話,可能程式碼本身也不同。

  2. 資料模型: 上述講到了整個訓練架構難以統一,因此,一個業務引擎裡面使用者需要維護兩套環境、兩套程式碼,許多共性不能複用,資料的質量和一致性很難保障;且流批底層資料模型、解析邏輯可能不一致,導致我們需要做大量的拼湊邏輯,甚至為了資料一致性需要做大量的同比、環比、二次加工等的資料對比,效率極差,並且非常容易出錯。

  3. 預估服務: 傳統的模型預估都是需要部署一個單獨的模型服務,然後由任務以 http/rpc 形式去呼叫來獲取預估結果,但是這種模式需要多餘的人力去維護服務端,且實時/離線預估場景下 rpc/http server 並不需要一直存在,它們只需要隨著任務的開始而開始,隨著任務的結束而結束就可以了;且離線訓練出來的模型如何服務於線上又是一個令人頭疼的問題。

  4. 模型升級: 模型任何形式的升級都會對模型帶來一定的影響,在這裡,我們主要討論模型的升級對模型引數丟失帶來的影響。

這是一個簡單的線上學習的經典流程圖,下面我來解釋一下這個流程圖在 Alink 鏈路是如何實現的:

  1. 離線訓練任務: 該 Alink 任務去 hdfs load 訓練資料,先將訓練資料進行特徵工程等的加工之後,將模型進行離線的訓練,訓練完成之後將 model info 和模型引數資料寫入 parameter server,該任務天級執行,每次執行訓練比如說 28 天的資料。

  2. 實時訓練任務: 實時任務方面,該 Alink 任務從 kafka 讀取樣本資料,將樣本資料進行一 定積累之後比如說小時級、分鐘級、條數等進行小批量的訓練,先去 parameter server pull 模型引數和超引數據,load 模型之後如果有預估需求的話,可能進行一次 predict,如果沒有預估需求,可以直接進行模型訓練,並且將訓練之後的模型資料 push 給 parameter server。

接下來我們主要來看看實時學習的模型如何服務於線上預估的場景:

  • 首先,實時的訓練肯定不會影響模型結構的,即實時訓練只會影響模型引數的更新;

  • 第二,預估和訓練的 ps 肯定是要分開的,因此,這個問題就變成了如何去同步預估和訓練的 ps 的資料。

在這裡業界大概有兩種實現方案:

  • 方案 A:這個是針對一些小模型的訓練,可以讓 Alink 的任務直接將訓練好的引數同時 push 給離線 PS 和線上 PS 。

  • 方案 B: 引入一個類似 PS controller 的角色,該角色負責計算引數,同時將引數同時 push 給離線 PS 和線上 PS。

不過,我們也可以讓 Alink 的訓練任務寫訓練 PS,同時構造一個類似 ps server 的角色來同步引數,將 server 的更新同時寫一個類似 kafka 的佇列,啟動一個預估 ps server 消費 kafka 佇列 裡面的引數資訊,這樣做到訓練 PS 和預估 PS 之間的一個數據同步。

方案很多,選擇自己合適的就好了。

下面我們來先看一下模型版本升級為什麼會帶來的引數丟失:

  • 假設 1 號凌晨的時候訓練的前 28 天的資料,訓練完了之後將引數寫入了引數伺服器,1 號到 2 號之間一直在流式的訓練,一直在增量寫引數伺服器,一直到 2 號凌晨。

  • 2 號凌晨的時候開始訓練前 28 天的資料,假設訓練時間為 1h,此時如果直接寫入 PS 的話,那麼該 1h 的資料將被直接覆蓋,對於一些時間不敏感的模型倒也還好,至少不會報錯。 但是對於該業務裡面 prophet 時間序列模型來說會出問題,因為該模型引數少了 1h 的資料, 模型可能會因此降級準確度。

其實總結起來就是模型迭代的時候,由於離線訓練完成需要一定時間,如果直接覆蓋的話,會造成這段時間的引數丟失。因此,我們必須保證 PS 裡面的引數在時間上是連續的。

這個圖裡面我們主要介紹了 PS 冷啟動和熱切換的流程:

  1. 模型訓練冷啟動之後因為引數丟失問題模型暫時不可用,等待第一次 warm start 之後模型進入可用狀態;

  2. Parameter Server 支援多 scope 多 versiion,模型熱切換的時候只更新 ps new scope,warm start 的時候更新所有 scope;

  3. 模型每次 predict 的時候都只 pull old scope 的資料,進行 warm start 的時候 pull new scope 。

下面詳細接受一下整個鏈路的流程:

  1. 冷啟動的時候因為離線的任務訓練模型需要一定時間,因此,這時候 PS 裡面的引數缺少了該時間段的資料,所以只能先進行 warm start 將引數補全,並寫入 PS old scope 和 PS new scope;

  2. 之後進行正常的預測和 warm start 過程,其中 predict 的時候只 pull ps old scope,因為 ps new scope 裡面的資料會再熱切換的時候被覆蓋造成引數丟失,丟失引數的 ps 不能進行預測;

  3. 等到第二天凌晨的時候進行熱切換,只更新 ps new scope;

  4. 之後正常 pull ps old scope 進行 predict,pull ps new scope 進行 warm start 的流程。

接下來我來介紹一下流式訓練的痛點:

  1. 對於線上的訓練不支援 failover。大家應該都知道,線上訓練難免會因為各種各樣的原因 (比如網路抖動) 中斷,這種情況下,合適的 failover 策略是非常重要的。我們將 Flink 的 Failover 策略引入我們自研的模型訓練運算元,進而支援模型的 Failover。

  2. 合適的 pretrain 策略:任何模型的訓練 embedding 層都是不需要每次從 PS 裡面 pull 的, 一般業界會自研一些類似 local ps 的形式來在本地儲存這些稀疏向量,當然我們也可以將這些 local ps 引入到 Flink 內部來解決這個問題,但是對於 flink 來說,我們在一些場景下完全可以用狀態後端來代替 local ps。 利用 Flink 的 state 和 parameter server (引數伺服器) 融合,init 或者是 failover 的時候將 parameter server 的部分熱資料 load 到 state 裡面對模型進行 pretrain。

  3. 很難實現分散式的需求。如果是一些本身是支援分散式的架構倒還好,但是有一些演算法本身是不支援分散式的 (比如 facebook 開源的 prophet),在這種情況下如果資料量大而且還不用分散式的話,跑完一大批資料可能會極其耗費時間;Alink 天然支援分散式,Alink 是基於 Flink 的上層演算法庫,因此,Alink 具有 Flink 所有的分散式功能,支援 Flink Master 的所有排程策略,甚至可以支援各種精細的資料分發策略。

流式訓練的 failover 策略:

線上分散式訓練的時候經常會有某臺機器因為某些原因 (如網路) 異常的情況,這種情況下如果要恢復一般有兩種情況:

  1. 允許資料丟失

    一般的訓練任務都是允許少量資料丟失的,因此我們希望可以犧牲一些資料進而換來整體任務的持續訓練,引入區域性恢復的策略可以大大提高任務的持續性,避免了任務因為一些外部原因 造成的單點故障而全部恢復的情況。

  2. 不允許資料丟失

    在這裡我們只討論 at least once 的情況 (exactly once 要求 PS 支援事務),假如說業務對資料的要求比較高,我們可以採取 global failover 的策略,當然了,一般單點重部署異常的情況下也會走 global failover 的策略在該業務中,我們採用區域性恢復的策略來優先保證任務的持續訓練。

下面詳細介紹一下訓練任務的重啟的時候策略:

  1. global recovery。這裡就是 Flink 裡面常用的 Failover 的概念,不再過多贅述。

  2. singal task recovery。 在該情況下某個 taskmanager 因為網路異常出現了心跳超時,此時為了保證資料一致性,Flink 任務會發生 failover 並且從上次的 checkpoint 恢復,但是如果允許少量的資料丟失且為了保證任務的持續輸出,可以開啟區域性恢復,此時任務只會重啟該 taskmanager,可以保證訓練的持續性。

  3. 單點重部署異常。如果任務出現了任何原因的故障,導致任務單點恢復的過程中出現了異常導致單點恢復失敗,這種時候就發生了單點重部署異常,該異常無法解決,只能通過將任務 failover 來解決問題,此時可根據任務需要配置從 checkpoint 恢復或者是不恢復持續訓練。

  4. 這裡我著重介紹一下任務 failover 的時候從 checkpoint 恢復的場景:任務 fail 的時候首先執行 save 方法,將當前 PS 的狀態 snapshot 儲存起來,將 Flink 狀態後端的資料也儲存起來,任務恢復的時候執行 load 方法,將 PS 恢復。仔細想可以發現,該操作會造成部分引數的重複訓練 (cp 的時間點和 save 的時間點不一致),希望大家注意。

基於 Alink 的流式訓練 pretrain 策略大致可分為冷啟動、全域性恢復和單點恢復三個模式:

  1. 冷啟動的時候大概是先從 PS 裡面 pull 模型引數和超參資訊,然後初始化 ListState、 MapState、ValueState 等狀態後端,同時初始化 PS 的 scope 和 version 資訊。

  2. 全域性恢復也就是 Flink 預設的 Failover 策略,在該模式下任務首先會 save model,即將 PS 裡面的模型資訊序列化至硬碟上,之後 save flink 任務裡面的狀態後端的資料,然後初始化的時候就不需要再 pull 超參等資訊了,而是直接選擇從狀態後端恢復超參,並且 reload 模型的 引數進行持續的訓練。

  3. singal task recovery,該模式是允許少量的資料丟失且為了保證任務的持續輸出才會採取的,在該模式下任務只會重啟該 tm,可以最大程度的保證訓練任務的穩定持續訓練。

  1. 當前比較流行的 3D 並行、5D 並行架構裡面,資料並行是最基礎也是最重要的一個環節。

  2. Flink 最最基礎的資料分發策略有包括 rebalence、rescale、hash、broadcast 等的多種選擇, 且使用者可以通過實現 streampartitioner 自由的控制資料的分發策略,使用者可以通過 load balance 等自由的實現資料並行來解決資料傾斜帶來的模型引數間互相等待的問題。

  3. 在該模式下,我們打通了 Alink 模式下分散式呼叫 python 方法的通路,可以最大程度的提高資料並行的效率。

  4. 資料並行是忽略流、批的,我們整合 Alink 的 mapper 元件,實現了 train 和 update model variable 批流一體化。

四、Tensorflow on Flink 應用

下面我先介紹一下 Tensorflow on Flink 預估服務和傳統的線上預估鏈路的不同:

  1. 區別於線上預估,實時/離線預估不需要服務一直存在,且 load 到 tm 內部可以大幅節約人力維護和資源成本。

  2. 整個鏈路架構不同導致資料模型、資料處理、模型訓練、模型推理等需要分別 維護不同的系統和程式碼結構。

Tensorflow on Flink 預估服務目前有多個方案,比如:

  • 方案 A:部署一個 rpc 或者是 http server,用 flink 通過 rpc 或者是 http 以 client 的方式去呼叫。

  • 方案 B: 將 Tensorflow 模型 load 到 flink tm 內部,直接呼叫。

其中方案 A 有如下弊端:

  1. rpc 或者是 http server 端需要多餘的維護人力。

  2. 實時/離線預估和線上預估不同的點是該 rpc 或者是 http server 端並不需要一直存在, 它們只需要隨著任務的開始而開始,隨著任務的結束而結束就可以了,一直存在是對資源的浪費,但是如果改成這種架構,那麼無疑會更加的耗費人力維護成本。

  3. 還是上面的架構不統一問題,rpc 或者是 http server 端和實時/離線資料處理往往不是一套系統,這就還是涉及到了之前一直強調的架構不同意問題,不再贅述。

五、規劃

  1. 採用 Flink sql 實現批流一體的模型訓練,爭取使模型訓練更加方便。

  2. Tensorflow Inference on Flink 實現支援大模型,基於 PS 實現動態 embedding 的 儲存:搜尋、推薦等業務場景中存在大量的 id 類特徵,id 類特徵通常採用 embedding 的方式,這些特徵在特定情況下會急劇膨脹,進而吞掉 taskmanager 的大部分記憶體,且原生 tensorflow 的 variable 使用起來會有諸如需要預先指定維度大小,不能支援動態擴容等不便,因此,我們計劃將內嵌的 Parameter Server 替換為我們自研 PS,支援千億規模的分散式 serving。

  3. 將 PS 裡面的 embedding 動態 load 到 taskmanager 的 state 裡面,實現降低對 PS 訪問壓力的需求:Flink 內部通常使用 keyby 操作來將某些固定的 key hash 到不同的 subtask 上,因此我們可以將這些 key 所對應的 embedding 快取到 state 裡面,降低對 PS 的訪問壓力。

六、鳴謝

  1. 首先感謝京東資料與智慧部資料時效 Flink 優化團隊的所有同事的幫助與支援。

  2. 感謝 Alink 社群全部同事的 幫助與支援。

  3. 感謝阿里雲端計算平臺事業部 Flink 生態技術團隊所有同事的幫助與支援。

下面是 Alink 和 flink-ai-extend 的 github 連結,歡迎大家 star。

https://github.com/alibaba/Alink.git

https://github.com/flink-extended/flink-ai-extended.git

FFA 2021 相關推薦

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群~

戳我,檢視原文影片 & 演講PDF~