從100w核到450w核:位元組跳動超大規模雲原生離線訓練實踐

語言: CN / TW / HK

本文整理自位元組跳動基礎架構研發工程師單既喜在 ArchSummit 全球架構師峰會上的演講,主要介紹位元組跳動離線訓練發展的三個階段和關鍵節點,以及雲原生離線訓練中非常重要的兩個部分——計算排程和資料編排,最後將結合前兩部分分享位元組跳動在實踐中沉澱的4個案例。

作者|單既喜-位元組跳動基礎架構研發工程師

業務背景

雲原生離線訓練框架支撐了位元組跳動內部“推薦”“廣告”“搜尋”等場景,如頭條推薦、抖音影片推薦、穿山甲廣告、千川圖文廣告、抖音搜尋等業務的超大規模深度學習訓練——以上場景的機器學習訓練均是基於 Primus 訓練框架完成。

整個機器學習生態從上到下分為“平臺層”“框架層”“資源層” 3個部分。位元組跳動演算法工程師使用 Reckon 訓練平臺完成了模型編寫、訓練、上線的全部過程。Reckon 訓練平臺中包含基於 TF 深度優化定製的 4 大深度學習框架——Lagrange 框架、Lagrange-Lite、蒲公英、美洲豹,這4個框架均通過 Primus 框架進行託管

在託管觀察中,Primus 作為分散式機器學習排程與資料融合框架,實現了雲原生訓練框架部署、分散式訓練資料讀取的全部過程,Primus 框架以雲原生的方式執行在 YARN 和 Kubernetes 排程系統中,並通過 HDFS、FeatureStore 等方式獲取訓練資料交給 TF Worker 進行訓練

位元組跳動在離線訓練方向的發展歷程

雲原生計算是軟體開發中的一種方法,它利用雲端計算“在現代動態環境(例如公共雲、私有云和混合雲)中構建和執行可擴充套件的應用程式”。通過宣告性程式碼部署的容器、微服務、無伺服器功能和不可變基礎設施等技術是這種架構風格的常見元素。

位元組跳動在雲原生離線訓練方向的發展大概分為三個階段:單角色雲原生訓練 1.0,多角色雲原生訓練 2.0,雲原生訓練 3.0 三個階段。

單角色雲原生訓練 1.0

離線訓練框架 1.0 系統誕生於2015年10月(內部代號 Zion)。

離線訓練 Zion 框架是基於 Hadoop Streaming 架構在深度學習場景下的深度定製,每個訓練作業對應一個 Hadoop YARN 上的 Zion 任務,具有(PS-Worker)架構分散式訓練器、多資料格式多資料來源混合訓練、HDFS 樣本讀取、訓練訓練進度 Checkpoint 功能。

(PS-Worker)架構分散式訓練器基於 Google 的 Tensorflow 框架深度定製,主要採用 Worker-PS 架構進行訓練。此架構分為 PS 端與 Worker 端兩個部分——其中 PS(ParameterServer) 是引數伺服器,主要功能是儲存並更新引數;Worker 是模型訓練器,按訓練資料分片,主要功能是讀資料,對變數求梯度。

離線訓練框架 1.0 對每個模型建立一套 Worker 例項,每個例項 Worker 和預部署在 Mesos 上的服務化 PS 完成通訊、讀取樣本、計算梯度、模型 Dump 的全過程。

離線訓練框架 1.0 於 2019 年進行了系統級重構,新一代離線訓練框架 2.0 增加了“多角色彈性排程”“多角色 Failover 能力”“訓練進度增量 Checkpoint ”等功能,提供“靈活”“高效”“易用”的模型訓練能力。

多角色雲原生訓練 2.0

在 “雲原生訓練 1.0” 實施過程中,我們發現了很多影響系統穩定性、易用性、維護性的問題。

問題1:訓練作業排程集中化問題

位元組跳動所有的離線訓練作業管理都是基於集中式的訓練排程服務(對應開源系統的 TF-Extend)。這個排程服務通過輪訓的方式,完成每個訓練作業的 PS 資源和 YARN 資源申請,如 PS 模型載入、YARN 訓練任務建立、PS 模型儲存等整個訓練宣告週期的各項工作,因此隨著訓練作業的增加,集中式排程出現了效能瓶頸,且排程服務的升級與不穩定等影響了較多的訓練作業執行。

問題2: PS 資源與 Worker 資源匹配問題

離線訓練 1.0 階段,公司所有的 PS 均通過服務化的方式申請使用。採用服務化的方式是為了解決 PS 分片修復、服務擴容、分片 Reshard 等需要複雜運維操作的問題。同時,通過服務化方式也可以實現多個訓練作業 PS 資源共享,提高物理機資源利用率。

但是,隨著業務量的增長,服務化 PS 逐漸暴露出了與訓練 Worker 難匹配的問題:

  • 資源不匹配:新增的訓練物理資源需要分別充值到 PS 服務端並上線,同時充值到 YARN 服務中才能進行訓練;
  • 網路不匹配:需要解決服務化 PS 與 YARN 訓練資源之間的跨機房、跨網段導致的通訊開銷。

離線訓練 2.0 增加了獨佔式的 API Server ,用於提供雲原生分散式排程能力:

  • 伴生式訓練管理 Norbert Driver:將每個核心排程中樞的作業都配備對應的排程大腦,通過宣告式的 API Server 控制每個排程的拓撲——Worker 角色和 PS 角色。
  • 宣告式 API Server:在每個離線訓練 Job 中,都內建了一個獨佔式的 API Server。Norbert 訓練管理 Driver 大腦通過宣告式 API Service,釋出控制訓練拓撲、動態新增資料來源、動態建立角色等訓練需求;Primus 框架 Watch 並響應宣告,完成重新申請容器、重新規劃角色、重新構建 Task 等具體工作。
  • 伴生式 Parameter Server:宣告式 API Server 建立的伴生 PS 角色,實現每套訓練作業專屬自己的 Prameter Server,能夠支援 PS Shard Failover、自動 PS 分片 ReHash、PS 資源彈性擴縮容等能力,徹底解決了服務化 PS 和訓練 Worker 的資源匹配難題。

基於雲原生訓練的 2.0 架構,位元組跳動離線訓練的作業規模從 2020 年至 2022 年,實現了從 150 萬核到 400 萬核的突破,並且 Flink Spark 一起成為公司離線 YARN 叢集的 TOP 計算框架。

雲原生離線訓練 3.0

雲原生訓練 2.0 資源部署在位元組跳動深度定製的離線排程 YARN 叢集中。為進一步實現離線線上資源並池、離線與線上訓練統一,雲原生離線訓練 3.0 基於 Operator 架構增加了對 Kubernetes 執行環境的支援,實現了 YARN+Kubernetes 的雲原生多 Runtime 訓練。

當內部大量資源從 YARN 遷移到 Kubernetes 後,系統不再為每個作業都產生一個 API Server 而是複用 Kubernetes 叢集的全域性 API Server,由 Norbert Driver 向全域性 API Server 釋出訓練需求宣告。

3.0 階段整個離線訓練的框架拓撲可以達到每天 10000 Job 的量級,單最大作業數 4000 個,每天 400 萬 vCore 的總量。框架同時支援 YARN Runtime 與 K8s Runtime 等多種Runtime,目前已經有約 160 萬核的離線訓練作業部署在 Kubernetes 叢集上(佔總訓練量的40%)。

雲原生離線訓練-彈性排程

位元組跳動雲原生離線訓練包含了兩個重要的組成部分——彈性排程和資料編排。

彈性計算排程簡述

雲原生的計算排程體系是通過位元組自研的 Primus Operator 打造實現的,具有以下四個特點:

  • 容器化:在 Kubernetes 和 YARN 上大規模踐行容器化帶來的隔離和環境準備方面的優勢;
  • 彈性 API-Server:通過自研的 API Server 在 Kubernetes 上覆用 API Server 的形式實現彈性作業排程的能力;
  • 多角色+異構:不僅能調動 CPU 還能調動有狀態的 GPU,實現多角色異構架構的能力;
  • 微服務:實現排程 Operator 及神經中樞 Norbert 微服務之間的通訊互聯。

Primus Operator 總體基於開源 Cookie Builder 架構,擁有四個流轉狀態:首先觀察整個 Job 的狀態,然後將狀態 Update 到 Job CRD 的 Status 內,再去檢視使用者/作業需求方的作業拓撲期望,計算需要申請的 POD 資源,最後在 Reconcile 時實現第二步 Update 結果和第三步 Compute 期望值之間的協調,從而完成整個狀態的流轉。

彈性計算排程架構

每個 Job CRD 都有 Spec 和 Status 兩個部分,為了實現多角色排程,我們進一步打造了 CRD 家族。除了上文提到的 Job CRD,每個 Job 會關聯若干個 PrimusRole CRD。同時針對資料部分,我們抽象了 PrimusData CRD。在 PrimusRole CRD 中,每一個角色都對應一個 Role 的 CRD。所有 Primus Job 的拓撲最終被協調出來的結果,就是在 Kubernetes 或者 YARN 中的一個作業框架(如上圖下方)。

我們可以看到,TensorFlow 和 PS Worker 等相關的作業都被創建出來,同時每一個 Job 都有自己的總控中樞,即我們基於 Java 寫的 Primus AM Pod。這個中樞主要負責協調整個過程、記錄訓練進度、提供 UI 展示、記錄歷史過程。基於這樣一個體系,我們完成了 Primus Job 的建立。

Primus Job 建立成功後,當某一個副本失敗時,我們就可以通過排程大腦獲取到當前副本的資訊,每個角色對應的若干副本,多個角色就組成了整個彈性排程的拓撲。

下面來看彈性排程策略到底有多彈性?我們為了彈性排程都解決了哪些問題?

針對原生的 TensorFlow,我們將其分為自研的 Dynamic 策略和針對原生 TensorFlow 的剛性策略:

  • 原生的 Dynamic 策略指角色可以動態地互相服務,可以在任何時刻重啟角色,不要求所有角色重啟之後才能開始訓練;
  • 剛性策略指對於原生的 TensorFlow 需要支援 Work 和 PS 服務的互相發現,所以基於這種策略,在所有角色都申請到資源後統一發送啟動命令,實現 IP 加埠的相互傳遞。

後面我們引入了 Order 策略,以彈性的方式申請 Worker 角色,大大減少等待的週期,避免了在等待過程中造成的資源浪費。

彈性計算排程應用

應用1:訓練 SlowStart 優化模型訓練

針對 Worker 無狀態的 Sailor 角色,我們採用彈性宣告,通過不斷修改 API Server 控制角色內的副本數進行訓練。

在開始階段,我們以兩個副本的方式進行慢速訓練,使模型快速找到區域性最優狀態。

模型趨於穩定後,我們再不斷地擴充套件模型 Worker 的數量,實現大吞吐的模型訓練,從而提高模式訓練的速度。

應用2:Gang 性多角色排程支援

針對有狀態的 Parameter Server 的角色,我們引入了剛性的申請策略:

  • 在 YARN 叢集上,通過修改 YARN 排程器實現了 GangScheduler,支援對 PS 拓撲的資源 Gang 性申請與釋放;
  • 在 Kubernetes 叢集上,通過自研的 Pod Group 實現了 PS 角色的精細化資源管理,同時支援了排程打散、最小 Gang 性數量、排程親和與反親和策略等複雜場景的 PS 排程需求。

應用3:超大規模的在離線混部訓練

  • 混部 Smart Resource: 彈性排程不僅控制角色數量的多少,並且可以控制角色的規格,從而提升叢集利用率,比起通過宣告式的 API 動態修改角色的規格,Smart Resource 將混部資源的利用率從20%提升到了70%;
  • 潮汐/反潮汐策略: 資源利用存在高峰和低谷,針對這一情形,我們應用了面向 API Server 的彈性排程機制——在線上業務低峰時,我們有資源用於訓練,於是我們就拉起更多角色,提高訓練效率;在線上業務的晚高峰時,我們又會把訓練資源縮容到0,把離線訓練的機器學習暫時掛起,出讓資源去支援線上業務,如抖音、頭條的推薦,但此時 Job 還是正在執行的。藉此,我們達到了更好地節省資源和開支、提高資源叢集利用率的目的。

雲原生離線訓練-資料編排

在離線模型訓練中,訓練樣本資料管理、讀取、加工等對模型訓練起到了至關重要的作用。

樣本資料在位元組跳動內部不同場景下存放在不同的系統中——有儲存在 HDFS 中的檔案類樣本資源,也有存在 Kafka 裡的流式訓練樣本資源,還有團隊自研的 Feature Store 樣本資源。

雲原生離線訓練框架(Primus)同時覆蓋多種資料來源的編排,支援不同資料來源在天、小時、分鐘級的編排策略;能夠實現上面提到的三個訓練資源中的交叉組合、過濾、打散、對齊等豐富資料編排能力。

同時,在元資料編排過程中,訓練框架有新資料的感知和增量編排能力。Primus通過持續掃描 HDFS 和 Feature Store 的新增資料進行模型更新,保證訓練效果能夠匹配使用者最新行為。

多資料來源訓練元資料編排

在廣告等 CVR 轉化模型中,天然地需要對同一使用者不同 APP 上的行為資料進行建模並預測。

位元組跳動的演算法工程師依託雲原生離線訓練的資料編排能力,對抖音、頭條和西瓜業務的三個資料來源進行了建模訓練,每個資料來源分別按 00 小時、01 小時、02 小時進行儲存,同時在頭條和西瓜中每個小時都進行一次聚合,最終達到在 00 小時分別消費了頭條、西瓜和抖音的一個數據,而在第 01 小時通過多個數據源的重新排列,避免了模型編排的趨向性,在第 02 小時進一步進行資料來源打散。

這個例子充分展示了我們在 Partition 內 Shuffle,按小時 Group By,以及持續追新的能力。

訓練樣本元資料管理

在樣本元資料分發階段,我們將多個元資料組成了 DataStreamA,在流式階段叫 DataStreamB,這是一個多階段訓練的過程。這兩個 DataStream 都組成了同一個 PrimusData CRD。

  • DataStream 裡的若干個 DataSource 被按天、按小時聚合之後,會通過 Primus AM 實現檔案的切分,切分的力度是按天、按小時聚合之後的原始 HDFS 路徑或者 Feature Store 目錄。切分的結果是若干個 Split 通過心跳的方式下發 Task 到 Executor。
  • 隨著心跳,我們還會每時每刻回發當前訓練中 Task 的消費進度,以方便 FellOver 的時候,我們能夠從斷點當中繼續消費來進行訓練。同時,訓練的進度被 Primus AM 記錄到 HDFS 中,並且持久化,以方便整個 Application 掛掉之後,我們可以從 HDFS 的訓練狀態當中得到恢復。

跨程序資料傳輸實踐

  • 基於匿名管道的資料傳輸: Executor 裡有兩個程序,一個是 TensorFlow Worker,用於從管道里讀取我們通過 HDFS 解析之後的樣本資料;另一個是 Executor JVM 資料程序,進行 HDFS Client 讀取後,將序列化的樣本通過 Linux 匿名管道傳輸給 TensorFlow Worker 程序。

在實踐過程中,我們發現匿名管道天然存在兩個問題:跨程序通訊和多個 Producer 競爭搶鎖,由此也就增加了從使用者態到核心態拷貝的開銷和資源競爭的問題。

  • 高階資料傳輸方式: 如 Domain Socket,我們採用 Producer 和 Worker 通過兩個 TCP Socket 傳輸的方式,避免了多個 Producer 的管道競爭,但這樣依然會存在核心帶的拷貝以及序列化和反序列化的開銷。隨後,我們引入了跨程序之間 Share Memory 機制,做到了多程序管理。最後我們採用 JNI 統一程序機制合併兩個程序,實現了樣本讀取、加工、傳輸全流程 Lib 化,徹底解決了跨程序之間的 IPC 開銷。

案例與最新實踐

上文闡述了我們在資料編排和計算排程方面的積累與沉澱,下面介紹我們將這兩部分組合起來,同時結合業務的需求,在實踐中進行運用的重要案例。

從服務化 PS 到 雲原生 全伴生 PS

在 1.0 階段,我們沒有將 PS 納入到雲原生中,而是採用了服務化的 PS,這種方式存在如下弊端:

  • 需要同機房撮合
  • 資源利用率低
  • 運維與部署難度大
  • 隔離性差(網路、記憶體頻寬、CPU)

於是我們就引入了雲原生化的 PS on YARN / K8s SavePoint,即伴生 Parameter Server 訓練機制,這一演進同時也伴隨著我們的作業規模從 150 萬核到 400 萬核的增長。我們在這一階段實現瞭如下功能:

  • PS 拓撲剛性排程:在 YARN 和 Kubernetes 上都實現了 PS 剛性申請和剛性排程;
  • 服務發現 ( consul -> 自研)、健康檢查:通過自研的宣告式 API Server 實現了服務發現,同時實現了 Parameter Server 的健康檢查、定時檢查以及故障恢復機制;
  • 單作業 + 容器化部署:基於容器化的方式隔離不同的 Parameter Server,避免它們 CPU 利用之間的相互干擾;
  • PS 程序 Numa 隔離:Numa Bind:避免 CPU 跨 Numa 訪問記憶體帶來的效能退化;
  • PS SavePoint 機制:定時記錄 Parameter Server 目前整個拓撲中最新的模型狀態;同時我們為資料也設定了 SavePoint 機制,將兩個檢查點進行對齊和同時恢復,從而實現伴生 Primus Server 訓練的異常恢復;
  • 全鏈路 Incremental Checkpoint:不止 Parameter Server 可以增量 Checkpoint,訓練 Worker 也可以增量 Checkpoint,這就意味著在任何時刻,當一臺 Primus Server 發生故障之後,它只需用增量的方式去恢復這一個單點即可;
  • PS Smart Resource 機制:不斷地壓縮 Parameter Server 宣告規格和它的使用規格之間的 Gap,提高叢集的利用率。

PrimusFlow 訓練資料實時預處理

在模型的調研中一般會面臨的問題是:一個等待和兩個浪費——即 Spark 預處理的等待、模型訓練過程中 Spark 計算的開銷和儲存的開銷。

為此,我們引入了伴生資料預處理的模型訓練機制——PrimusFlow。一方面,它可以支援豐富的資料來源,任何一個被 Spark 預處理的資料來源,都可以被 PrimusFlow 機制處理,我們通過 Spark 讀取,Load 中間狀態進行訓練。另外,PrimusFlow 支援更豐富的調研模式,支援行級 Shuffle,我們可以進行資料預處理,按行或按某個使用者 ID 進行加工處理,以此來提升模型訓練效果。

此外,我們採用多資料流編排,先對 Spark 進行預處理,在下一個階段用上一個 Spark 預處理的結果進行模型訓練,同時在這一階段並行進行第二個單元的 Spark 預處理,由此真正實現了無需等待的單元排程模式。

通過 PrimusFlow 機制,加上行級 Shuffle 的能力,我們在非常多的場景中都取得了模型效果提升 10% 以上的收益,並且在國外很多場景進行了落地。通過上述方式,我們解決了一個等待和兩個浪費的問題。

訓練批流一體架構

在實踐過程中,我們發現批式處理框架也有流式消費的能力。目前的模型建模,一方面是燒腦建模,另一方面是燒卡建模。離線訓練在不斷地燒卡,同時流式訓練過程中也需要燒卡,這主要是因為現有的 Flink 流式訓練消費已經無法滿足晚高峰時抖音推薦裡複雜模型需要的訓練能力,因此就需要增加算力,引入 GPU 資源。但 Flink 並不支援異構角色的 Task Manager,而 Primus 天然就是一個異構多角色的訓練框架。

基於上述需求,我們在 Primus 中加入了流式訓練的能力,打造了多角色異構的流式訓練框架,同時支援 GPU 排程、伴生的 API Server 及故障恢復,同時我們實現了 All to All 的 Shuffle 能力,即引入 Rebalance 機制提高流式訓練的能力。

此外,因為 Primus 框架本身就是批流協同的框架,既能夠支援多角色 GPU 的批訓練,也能夠支援多角色異構的流訓練,在離線訓練完畢之後,能夠直接切換到流式訓練階段去複用同一個 Partition Server,我們以此實現了流批協同和流批一體的目的。

Primus Native 樣本資料傳輸 Library

Primus Native 系統是針對位元組跳動深度學習的資料子系統增強,分別在 Data Master 部分和 Data Executor 部分進行了雲原生改造,發展為更加靈活、更加高效的深度學習資料引擎。

元資料 Meta Manager 編排部分: 我們不僅引入了宣告式 API,也引入了 Python for Java 的 Gateway 架構,這個架構支撐起了 Primus Native 的資料宣告體系。相比於宣告式 API 的資料定義方法,Python Gateway 架構在靈活性+擴充套件性方面更有優勢:

  • 使用者可以更加靈活的利用 Primus Native Python UDF 靈活控制樣本按檔案時間排列、按特定欄位排列等高度自由的樣本檔案編排策略;
  • 實現了 Python 資料和 Java 資料的靈活轉換,訓練器可以更好地獲取當前任務編排和任務消費樣本的詳細資訊,靈活地進行訓練效果評估、抽樣等操作。

訓練 Worker 讀取部分: 我們引入了 SO 化的資料傳輸機制,合併兩個程序到一個訓練程序內部,徹底免除了序列化和反序列化的開銷、使用者態到核心態的資料拷貝,也節省了雲原生環境下單容器內多程序的維護難度。

通過上述優化,針對一個標準推薦訓練任務:

  • 總吞吐量從 3.3GB/s 提升至 13.5GB/s,提升了4倍;
  • 單節點吞吐率從 411MB/s 提升到 1.2GB/s,提升了4倍;
  • CPU 使用率從 2.25 核心提升到 5.25 核心,提升了2.3倍。

總結

綜上所述,我們在本文中闡述了位元組跳動離線訓練發展的三個階段,以及雲原生離線訓練中非常重要的兩個部分——計算排程和資料編排,最後結合前兩部分分享了位元組跳動在實踐中沉澱的重要實踐案例。