Shuttle:高可用 高效能 Spark Remote Shuffle Service

語言: CN / TW / HK

01 開篇

大資料計算的興起,源於Google的MapReduce論文,MapReduce的原理很簡單,其流程核心則是Map和Reduce兩階段資料交換,也即Shuffle。

Shuffle對大資料計算影響很大,從公開的資料:Facebook[1]、LinkedIn[2]、阿里[3] 等公司的資料看,Shuffle影響的任務和任務計算時間上都有較高佔比。從OPPO的線上任務看,68%的Spark任務都有Shuffle計算。

大資料計算引擎的技術演進,一直離不開對Shuffle的優化,無論是從執行計劃方面優化,儘量避免Shuffle運算元還是各種Shuffle機制的演進,都是為了儘量縮短Shuffle的耗時。

Shuffle不僅影響作業執行效率,對計算穩定性也有較大影響,大資料開發的同學一般都有這樣的經歷:莫名的 Shuffle Fetch Fail 錯誤,甚至任務會因此頻繁失敗,不得不優化任務計算邏輯。

02 背景

Shuffle成為大資料計算效率和穩定性的關鍵因素的原因是什麼?

我們認為主要有兩點:

1、磁碟的碎片讀寫,Spill多次寫磁碟和Reduce只拉取部分Partition資料,影響效率

2、Reduce 讀取Map端本地資料,需要 MxR 次遠端網路讀,影響穩定性

 MapReduce shuffle示意圖[4]

Shuffle技術演進,主線也是沿著解決上面兩個問題推進。比較有里程碑意義的有兩個方向:     

ESS:External Shuffle Service,ESS 原理是Map 任務在計算節點本地將相同Partition  資料合併到一起;

RSS:Remote Shuffle Service,RSS 原理是Map 任務將相同Partition資料Push到遠端的RSS,RSS將同一Partition的資料合併。

ESS vs RSS  示意圖

ESS 和 RSS 都是為了解決前面我們提到的碎片讀寫和RPC連線過多的問題,ESS是緩解了這種情況,沒有RSS解決的徹底。

Spark社群提供了Remote Shuffle Service的介面,各家公司可以自己實現自己的RSS。所以,近兩年在Spark 平臺的RSS技術方案如雨後春筍,紛紛公開亮相。

03 相關工作

我們先看一下各家的解決方案,目前公開資料和原始碼的方案主要有:

Uber的RSS [5]:2020年開源,底層儲存基於本地磁碟,Shuffle Server提供讀寫資料功能,對效能有一定的影響,另外,開源時間比較早,但維護較少。

騰訊的FireStorm [6]:2021年11月開源,底層儲存使用HDFS,對穩定性以及效能優化設計考慮較少。

阿里雲 EMR-RSS [7]:2022年1月開源,底層儲存基於本地磁碟,對本地IO做了深入的優化,不過這種基於本地儲存的Shuffle Service,有著天然的限制。

LinkedIn MagNet [2]:MagNet嚴格來說不算真正意義的RSS,只能算是 Push Based 的Shuffle。MagNet在Spark原生Shuffle資料落盤的同時把資料Push到遠端NodeManager的ESS上,同一份資料,會落盤兩次,這樣其實會增加叢集的IO壓力。不過,MagNet已經合入到Spark3.2版本,鑑於此,MagNet的Shuffle才做了這樣的設計。

04 OPPO解決方案-Shuttle

(1)整體架構

首先,介紹一下 Shuttle 的整體架構:

Shuttle 架構圖

Shuttle 主要由兩個角色組成,ShuffleMaster和ShuffleWorker。

ShuffleMaster負責管理ShuffleWorker的狀態,向任務分發可用的ShuffleWorker。

ShuffleWorker負責接收ShuffleWriter傳送的資料,並將同一Partition的資料聚合,寫入分散式儲存。

為保障Master高可用,一個叢集有兩個Master,一個Active和一個Backup Master。

如圖所示,ActiveCluster和StandbyCluster分別有兩個Master。

為什麼會有Active和Standby兩個Cluster ?這也是為了服務的穩定性考慮,主要用於熱升級,下面會詳細介紹。

(2)架構設計考量

我們在設計一個分散式的Shuffle Service系統的時候,從下面幾個方面考慮:

1)資料正確性

資料正確性是生命線,Shuffle資料在Remote Shuffle Service系統走一圈,能否保障資料不出問題?

我們通過Checksum機制保障資料的正確性。每一條寫入Shuttle的資料,都會計算一個Checksum值,最後讀資料的時候同樣對讀取的每一條資料計算Checksum,最後對比Checksum,保證每條資料都被正確讀到且只被讀一次。

2)穩定性

穩定性是分散式系統的基石,在分散式系統中,出現各種問題是必然。

穩定性的保障,是一個系統性的問題,不是某一個Feature或者設計能解決所有穩定性問題,我們從以下幾個方面討論Shuttle的穩定性建設:

A、節點/任務管控

ShuffleMaster和ShuffleWorker在管控方面都有自己的機制。

ShuffleMaster對節點/任務管控的功能主要有:

節點自愈:ShuffleWorker通過心跳向ShuffleMaster上報自身的“健康”資訊。心跳超時或者“健康”資訊異常,ShuffleMaster會暫停向該節點分配新的任務資料流量,Worker節點恢復“健康”後,再向改節點分配任務。

負載均衡:Spark 任務向ShuffleMaster請求可用的ShuffleWorker,Master根據叢集負載決定分配哪些ShuffleWorker;同時,分配Worker的演算法實現是外掛式的,可以定製多種不同的分配策略。

異常攔截:對於使用者短時間提交的大量相同任務,ShuffleMaster會主動攔截,避免影響叢集整體穩定性。

ShuffleWorker流控機制,當任務資料量突增場景下,流控保障Worker的穩定性。流控機制主要從兩方面限制:

記憶體量:ShuffleWorker程序使用總記憶體超過閾值即發生流控

連線數:同時向ShuffleWorker傳送資料的連線數,超過閾值即發生流控

B、多機切換

Map向ShuffleWorker傳送資料,會有多個ShuffleWorker可供選擇,當某個Worker出問題(比如Worker發生流控,或節點掉線),可以切換到備選Worker繼續傳送。

如圖所示,ShuffleWriter在向ShuffleWorker A傳送資料的時候,A節點出現故障,ShuffleWriter切換到B節點繼續傳送資料。

C、分散式儲存

Shuttle採用分散式檔案系統作為儲存底座。

在分散式儲存技術如此發達的今天,我們不需要花費過多精力優化儲存。

專業的事情交給專業的“人”來做,這樣的好處主要有:

1、降低Shuttle系統本身的複雜度,提升自身穩定性

2、分散式檔案系統自身具有良好的穩定性,擴充套件性,負載均衡等優勢

3、適配多種分散式檔案儲存,選擇多樣化,充分利用不同系統優勢

4、使得ShuffleWorker解耦本地儲存能力,存算分離,更易於雲上部署

業界主流的分散式檔案系統,本身對讀寫效能都做了充分的優化。

另外,我們大量使用了公司儲存團隊自研的分散式檔案系統CubeFS[8],CubeFS針對Shuffle場景做了定製化的優化,簡單介紹一下CubeFS的優勢:

CubeFS架構圖

CubeFS是CNCF新一代雲原生分散式儲存產品,相容S3、HDFS、POSIX多種接入協議,提供多副本和糾刪碼兩種儲存引擎,支援多租戶、多AZ部署。

CubeFS創新性採用存算分離架構,提供可擴充套件的元資料服務,低成本的模式可配的糾刪碼引擎,自適應多級快取特性,使得CubeFS在穩定性、擴充套件性、效能與成本、可運維性等方面均表現優秀;對多種接入協議的原生支援,與容器相容性好,拓寬了CubeFS產品生態;CubeFS已經被用於OPPO各個核心業務,如大資料儲存,大資料shuffle、人工智慧、ElasticSearch、MySQL、資料備份等,有力支撐各類業務資料海量儲存需求。

D、熱升級

ShuffleService一旦上線,會為大量任務提供shuffle 服務,不能停服,同時,系統的升級迭代會不斷需要重啟服務。為此,系統必須具備熱升級的能力。

Shuttle有兩種熱升級模式:

1、滾動升級:通過ShuffleMaster逐一加黑-重啟ShuffleWorker。

這種方式針對小規模系統還可行,對於規模比較大的ShuffleService系統,可以考慮第二種模式。

2、叢集切換:ShuffleWorker程序繫結機器IP和埠,一臺機器可以部署多個Worker程序,因此我們在線上同一批機器部署兩套ShuffleService,升級的時候可以直接整體切換服務。

上線以來,經歷線上多次升級變更,無一例因為升級導致的失敗case。

3)效能優化

A、非同步傳輸

資料傳輸和訊息處理,均使用Netty非同步處理機制,對比同步處理機制,效能有明顯優勢。同時,訊息採用Pb格式,提升訊息序列化和反序列化效能。

B、併發讀寫

ShuffleWriter和Reader對於資料的讀寫均採用多執行緒併發處理,在Reader端使用RingBuffer作為底層儲存的緩衝,讀過程非同步化。

C、定製執行緒池

ShuffleWorker會併發處理不同的Map傳送的資料,使用Java原生執行緒池會引入過多的同步機制,影響處理資料速度。為此,我們定製執行緒池,確保同一Partition的資料交由單一執行緒處理,顯著降低同步操作,提升處理速度。

不僅如此,為優化資料傳輸效率,我們根據網路MTU定製資料包大小,精益求精。

4)擴充套件性

A、多叢集路由

ShuffleMaster可配置任務路由規則,多個叢集線上服務,隨時可以切換流量。在叢集出現異常,任務可以選擇切換到正常的叢集。

B、多儲存共存

目前Shuttle支援HDFS、CubeFS、Alluxio、S3等分散式儲存系統,多種儲存可以同時線上提供服務,無論是雲上還是自建叢集,均可應對。

同時,Shuttle設計就考慮到Spark3.x的AQE特性支援,我們線上同時執行著支援Spark2.4和Spark3.1.2版本的Shuttle。

04 業界相關技術對比

針對穩定性,資料正確性保障,效能優化方面,我們跟業界相關工作做了對比。

專案名稱 

特點

優缺點

Uber

RSS

獨立的Service

本機儲存

Shuffle Server多副本複製

Shuffle Server承擔讀寫

無Checksum

本機磁碟限制

無Master管控

LinkedIn

MegNet

非獨立Service

保持原生Shuffle落盤

同時盡力Push到遠端

跟原生Spark繫結緊密(已經Merge 到社群版本)

磁碟IO翻倍

無Checksum

無Master管控

騰訊

FireStorm

獨立Service

分散式儲存(僅HDFS)

使用HDFS儲存

Checksum保障正確性

無Master管控

多執行緒同步訊息傳送

Writer堆內快取資料

阿里

EMR-RSS


獨立Service

本機儲存

無Checksum

有Master管控

對本地儲存有優化

Writer堆外記憶體快取

Shuttle

獨立Service

分散式儲存

多維穩定性保障

多儲存適配

Checksum保障正確性

Master管控叢集

Writer堆外記憶體快取

併發讀寫,Netty非同步傳輸

Shuttle在穩定性和效能優化方面做了很多考量,系統上線後一直提供穩定服務,期間多次升級,無一任務因此失敗,下面會介紹一下我們的效能測試效果。

05 測試效果

文章[3]中,EMR-RSS已經跟其他的開源產品做了詳細的對比測試,且在效能上有明顯的優勢,所以,我們直接跟EMR-RSS對比測試。

(1)測試環境

硬體環境:20臺物理機

機器配置:24塊HDD,記憶體384GB,cpu 48核心。

軟體配置:

Shuttle使用HDFS儲存,均使用預設配置

EMR-RSS使用本地儲存,配置使用所有磁碟。rss.shuffle.writer.mode配置為sort(預設為hash)

測試任務:TeraSort Spark任務

靜態資源分配,Executor 800,分割槽數1000,其他使用預設配置。

(2)測試結果

EMR-RSS 1Tb TeraSort:

Shuttle 1Tb TeraSort:

EMR-RSS 5Tb TeraSort:

Shuttle 5Tb TeraSort:

注:不同規格任務執行時間,兩種技術方案分別執行5次求平局值對比

整體看,Shuttle和EMR-RSS對比TeraSort任務在幾個不同規模資料量上有4%-8%的效能提升。

(3)測試分析

Shuttle的讀資料明顯快,分析原因如下:

1、Shuttle讀資料從HDFS讀取,不佔用ShuffleWorker程序資源;

2、Shuttle讀資料方式是非同步流水線方式。

但是,我們也看到Shuttle在寫資料要比EMR-RSS慢,分析原因如下:

1、Shuttle 的流控機制,在每次傳送資料包會先獲取一次令牌,多一次網路互動。

2、Shuttle的Checksum機制,在每個分割槽資料傳送完畢後,會多發一個Checksum包,且最後的Checksum包是同步方式通訊。

由上分析,Shuttle在保障穩定性和資料正確性上做了一些效能取捨。但是,由於讀資料的 速度更快,不僅彌補了寫資料導致的效能Gap,整體效能還是有提升。

(4)線上效果

目前,OPPO集團大資料計算任務30%的Shuffle資料已經接入 Shuttle,效果最好的大任務執行效率提升50%+;整體效果資料見下圖:

指標名稱

下降幅度

平均e2e時間下降

29.5%

平均成本下降

24%

task執行總時間下降

22%

平均fetch fail task數下降

68->0

06 未來展望

為了讓 Shuttle 能夠影響更多的計算,我們決定將 Shuttle 專案開源[9]。

對於技術演進方向,我們計劃從三個方向進行:

1、接入更多的計算引擎,比如 Flink、Trino等。

2、依託現有的分散式儲存,優化底層儲存,適應Shuffle場景的特殊需求。

3、提供更多的計算服務,不侷限於Remote Shuffle服務。

附錄

[1] Haoyu Zhang, Brian Cho, Ergin Seyfe.  Riffle: Optimized Shuffle Service for Large-Scale Data Analytics.  ACM 2018

[2] Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020

[3] 阿里雲EMR Remote Shuffle Service在小米的實踐.

http://mp.weixin.qq.com/s/xdBmKkKL4nW7EEFnMDxXYQ

[4] 《Hadoop權威指南》

[5] Ubser Spark RSS:

http://github.com/uber/RemoteShuffleService

[6] 騰訊Spark RSS FireStorm:

http://github.com/Tencent/Firestorm

[7] 阿里雲 Spark RSS:

http://github.com/alibaba/RemoteShuffleService

[8] CubeFS:

http://github.com/cubeFS/cubefs

[9] Shuttle:

http://github.com/oppo-bigdata/shuttle


作者簡介

David Fu  OPPO大資料計算平臺架構師

負責大資料計算平臺技術演進設計開發,曾供職於阿里雲,去哪兒網大資料平臺,擁有10年大資料架構,開發經驗

本文分享自微信公眾號 - OPPO數智技術(OPPO_tech)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。