Shuttle:高可用 高效能 Spark Remote Shuffle Service
01 開篇
大資料計算的興起,源於Google的MapReduce論文,MapReduce的原理很簡單,其流程核心則是Map和Reduce兩階段資料交換,也即Shuffle。
Shuffle對大資料計算影響很大,從公開的資料:Facebook[1]、LinkedIn[2]、阿里[3] 等公司的資料看,Shuffle影響的任務和任務計算時間上都有較高佔比。從OPPO的線上任務看,68%的Spark任務都有Shuffle計算。
大資料計算引擎的技術演進,一直離不開對Shuffle的優化,無論是從執行計劃方面優化,儘量避免Shuffle運算元還是各種Shuffle機制的演進,都是為了儘量縮短Shuffle的耗時。
Shuffle不僅影響作業執行效率,對計算穩定性也有較大影響,大資料開發的同學一般都有這樣的經歷:莫名的 Shuffle Fetch Fail 錯誤,甚至任務會因此頻繁失敗,不得不優化任務計算邏輯。
02 背景
Shuffle成為大資料計算效率和穩定性的關鍵因素的原因是什麼?
我們認為主要有兩點:
1、磁碟的碎片讀寫,Spill多次寫磁碟和Reduce只拉取部分Partition資料,影響效率
2、Reduce 讀取Map端本地資料,需要 MxR 次遠端網路讀,影響穩定性
MapReduce shuffle示意圖[4]
Shuffle技術演進,主線也是沿著解決上面兩個問題推進。比較有里程碑意義的有兩個方向:
ESS:External Shuffle Service,ESS 原理是Map 任務在計算節點本地將相同Partition 資料合併到一起;
RSS:Remote Shuffle Service,RSS 原理是Map 任務將相同Partition資料Push到遠端的RSS,RSS將同一Partition的資料合併。
ESS vs RSS 示意圖
ESS 和 RSS 都是為了解決前面我們提到的碎片讀寫和RPC連線過多的問題,ESS是緩解了這種情況,沒有RSS解決的徹底。
Spark社群提供了Remote Shuffle Service的介面,各家公司可以自己實現自己的RSS。所以,近兩年在Spark 平臺的RSS技術方案如雨後春筍,紛紛公開亮相。
03 相關工作
我們先看一下各家的解決方案,目前公開資料和原始碼的方案主要有:
Uber的RSS [5]:2020年開源,底層儲存基於本地磁碟,Shuffle Server提供讀寫資料功能,對效能有一定的影響,另外,開源時間比較早,但維護較少。
騰訊的FireStorm [6]:2021年11月開源,底層儲存使用HDFS,對穩定性以及效能優化設計考慮較少。
阿里雲 EMR-RSS [7]:2022年1月開源,底層儲存基於本地磁碟,對本地IO做了深入的優化,不過這種基於本地儲存的Shuffle Service,有著天然的限制。
LinkedIn MagNet [2]:MagNet嚴格來說不算真正意義的RSS,只能算是 Push Based 的Shuffle。MagNet在Spark原生Shuffle資料落盤的同時把資料Push到遠端NodeManager的ESS上,同一份資料,會落盤兩次,這樣其實會增加叢集的IO壓力。不過,MagNet已經合入到Spark3.2版本,鑑於此,MagNet的Shuffle才做了這樣的設計。
04 OPPO解決方案-Shuttle
(1)整體架構
首先,介紹一下 Shuttle 的整體架構:
Shuttle 架構圖
Shuttle 主要由兩個角色組成,ShuffleMaster和ShuffleWorker。
ShuffleMaster負責管理ShuffleWorker的狀態,向任務分發可用的ShuffleWorker。
ShuffleWorker負責接收ShuffleWriter傳送的資料,並將同一Partition的資料聚合,寫入分散式儲存。
為保障Master高可用,一個叢集有兩個Master,一個Active和一個Backup Master。
如圖所示,ActiveCluster和StandbyCluster分別有兩個Master。
為什麼會有Active和Standby兩個Cluster ?這也是為了服務的穩定性考慮,主要用於熱升級,下面會詳細介紹。
(2)架構設計考量
我們在設計一個分散式的Shuffle Service系統的時候,從下面幾個方面考慮:
1)資料正確性
資料正確性是生命線,Shuffle資料在Remote Shuffle Service系統走一圈,能否保障資料不出問題?
我們通過Checksum機制保障資料的正確性。每一條寫入Shuttle的資料,都會計算一個Checksum值,最後讀資料的時候同樣對讀取的每一條資料計算Checksum,最後對比Checksum,保證每條資料都被正確讀到且只被讀一次。
2)穩定性
穩定性是分散式系統的基石,在分散式系統中,出現各種問題是必然。
穩定性的保障,是一個系統性的問題,不是某一個Feature或者設計能解決所有穩定性問題,我們從以下幾個方面討論Shuttle的穩定性建設:
A、節點/任務管控
ShuffleMaster和ShuffleWorker在管控方面都有自己的機制。
ShuffleMaster對節點/任務管控的功能主要有:
節點自愈:ShuffleWorker通過心跳向ShuffleMaster上報自身的“健康”資訊。心跳超時或者“健康”資訊異常,ShuffleMaster會暫停向該節點分配新的任務資料流量,Worker節點恢復“健康”後,再向改節點分配任務。
負載均衡:Spark 任務向ShuffleMaster請求可用的ShuffleWorker,Master根據叢集負載決定分配哪些ShuffleWorker;同時,分配Worker的演算法實現是外掛式的,可以定製多種不同的分配策略。
異常攔截:對於使用者短時間提交的大量相同任務,ShuffleMaster會主動攔截,避免影響叢集整體穩定性。
ShuffleWorker流控機制,當任務資料量突增場景下,流控保障Worker的穩定性。流控機制主要從兩方面限制:
記憶體量:ShuffleWorker程序使用總記憶體超過閾值即發生流控
連線數:同時向ShuffleWorker傳送資料的連線數,超過閾值即發生流控
B、多機切換
Map向ShuffleWorker傳送資料,會有多個ShuffleWorker可供選擇,當某個Worker出問題(比如Worker發生流控,或節點掉線),可以切換到備選Worker繼續傳送。
如圖所示,ShuffleWriter在向ShuffleWorker A傳送資料的時候,A節點出現故障,ShuffleWriter切換到B節點繼續傳送資料。
C、分散式儲存
Shuttle採用分散式檔案系統作為儲存底座。
在分散式儲存技術如此發達的今天,我們不需要花費過多精力優化儲存。
專業的事情交給專業的“人”來做,這樣的好處主要有:
1、降低Shuttle系統本身的複雜度,提升自身穩定性
2、分散式檔案系統自身具有良好的穩定性,擴充套件性,負載均衡等優勢
3、適配多種分散式檔案儲存,選擇多樣化,充分利用不同系統優勢
4、使得ShuffleWorker解耦本地儲存能力,存算分離,更易於雲上部署
業界主流的分散式檔案系統,本身對讀寫效能都做了充分的優化。
另外,我們大量使用了公司儲存團隊自研的分散式檔案系統CubeFS[8],CubeFS針對Shuffle場景做了定製化的優化,簡單介紹一下CubeFS的優勢:
CubeFS架構圖
CubeFS是CNCF新一代雲原生分散式儲存產品,相容S3、HDFS、POSIX多種接入協議,提供多副本和糾刪碼兩種儲存引擎,支援多租戶、多AZ部署。
CubeFS創新性採用存算分離架構,提供可擴充套件的元資料服務,低成本的模式可配的糾刪碼引擎,自適應多級快取特性,使得CubeFS在穩定性、擴充套件性、效能與成本、可運維性等方面均表現優秀;對多種接入協議的原生支援,與容器相容性好,拓寬了CubeFS產品生態;CubeFS已經被用於OPPO各個核心業務,如大資料儲存,大資料shuffle、人工智慧、ElasticSearch、MySQL、資料備份等,有力支撐各類業務資料海量儲存需求。
D、熱升級
ShuffleService一旦上線,會為大量任務提供shuffle 服務,不能停服,同時,系統的升級迭代會不斷需要重啟服務。為此,系統必須具備熱升級的能力。
Shuttle有兩種熱升級模式:
1、滾動升級:通過ShuffleMaster逐一加黑-重啟ShuffleWorker。
這種方式針對小規模系統還可行,對於規模比較大的ShuffleService系統,可以考慮第二種模式。
2、叢集切換:ShuffleWorker程序繫結機器IP和埠,一臺機器可以部署多個Worker程序,因此我們在線上同一批機器部署兩套ShuffleService,升級的時候可以直接整體切換服務。
上線以來,經歷線上多次升級變更,無一例因為升級導致的失敗case。
3)效能優化
A、非同步傳輸
資料傳輸和訊息處理,均使用Netty非同步處理機制,對比同步處理機制,效能有明顯優勢。同時,訊息採用Pb格式,提升訊息序列化和反序列化效能。
B、併發讀寫
ShuffleWriter和Reader對於資料的讀寫均採用多執行緒併發處理,在Reader端使用RingBuffer作為底層儲存的緩衝,讀過程非同步化。
C、定製執行緒池
ShuffleWorker會併發處理不同的Map傳送的資料,使用Java原生執行緒池會引入過多的同步機制,影響處理資料速度。為此,我們定製執行緒池,確保同一Partition的資料交由單一執行緒處理,顯著降低同步操作,提升處理速度。
不僅如此,為優化資料傳輸效率,我們根據網路MTU定製資料包大小,精益求精。
4)擴充套件性
A、多叢集路由
ShuffleMaster可配置任務路由規則,多個叢集線上服務,隨時可以切換流量。在叢集出現異常,任務可以選擇切換到正常的叢集。
B、多儲存共存
目前Shuttle支援HDFS、CubeFS、Alluxio、S3等分散式儲存系統,多種儲存可以同時線上提供服務,無論是雲上還是自建叢集,均可應對。
同時,Shuttle設計就考慮到Spark3.x的AQE特性支援,我們線上同時執行著支援Spark2.4和Spark3.1.2版本的Shuttle。
04 業界相關技術對比
針對穩定性,資料正確性保障,效能優化方面,我們跟業界相關工作做了對比。
專案名稱 |
特點 |
優缺點 |
Uber RSS |
獨立的Service 本機儲存 Shuffle Server多副本複製 Shuffle Server承擔讀寫 |
無Checksum 本機磁碟限制 無Master管控 |
MegNet |
非獨立Service 保持原生Shuffle落盤 同時盡力Push到遠端 |
跟原生Spark繫結緊密(已經Merge 到社群版本) 磁碟IO翻倍 無Checksum 無Master管控 |
騰訊 FireStorm |
獨立Service 分散式儲存(僅HDFS) |
使用HDFS儲存 Checksum保障正確性 無Master管控 多執行緒同步訊息傳送 Writer堆內快取資料 |
阿里 EMR-RSS |
獨立Service 本機儲存 |
無Checksum 有Master管控 對本地儲存有優化 Writer堆外記憶體快取 |
Shuttle |
獨立Service 分散式儲存 多維穩定性保障 |
多儲存適配 Checksum保障正確性 Master管控叢集 Writer堆外記憶體快取 併發讀寫,Netty非同步傳輸 |
Shuttle在穩定性和效能優化方面做了很多考量,系統上線後一直提供穩定服務,期間多次升級,無一任務因此失敗,下面會介紹一下我們的效能測試效果。
05 測試效果
文章[3]中,EMR-RSS已經跟其他的開源產品做了詳細的對比測試,且在效能上有明顯的優勢,所以,我們直接跟EMR-RSS對比測試。
(1)測試環境
硬體環境:20臺物理機
機器配置:24塊HDD,記憶體384GB,cpu 48核心。
軟體配置:
Shuttle使用HDFS儲存,均使用預設配置
EMR-RSS使用本地儲存,配置使用所有磁碟。rss.shuffle.writer.mode配置為sort(預設為hash)
測試任務:TeraSort Spark任務
靜態資源分配,Executor 800,分割槽數1000,其他使用預設配置。
(2)測試結果
EMR-RSS 1Tb TeraSort:
Shuttle 1Tb TeraSort:
EMR-RSS 5Tb TeraSort:
Shuttle 5Tb TeraSort:
注:不同規格任務執行時間,兩種技術方案分別執行5次求平局值對比
整體看,Shuttle和EMR-RSS對比TeraSort任務在幾個不同規模資料量上有4%-8%的效能提升。
(3)測試分析
Shuttle的讀資料明顯快,分析原因如下:
1、Shuttle讀資料從HDFS讀取,不佔用ShuffleWorker程序資源;
2、Shuttle讀資料方式是非同步流水線方式。
但是,我們也看到Shuttle在寫資料要比EMR-RSS慢,分析原因如下:
1、Shuttle 的流控機制,在每次傳送資料包會先獲取一次令牌,多一次網路互動。
2、Shuttle的Checksum機制,在每個分割槽資料傳送完畢後,會多發一個Checksum包,且最後的Checksum包是同步方式通訊。
由上分析,Shuttle在保障穩定性和資料正確性上做了一些效能取捨。但是,由於讀資料的 速度更快,不僅彌補了寫資料導致的效能Gap,整體效能還是有提升。
(4)線上效果
目前,OPPO集團大資料計算任務30%的Shuffle資料已經接入 Shuttle,效果最好的大任務執行效率提升50%+;整體效果資料見下圖:
指標名稱 |
下降幅度 |
平均e2e時間下降 |
29.5% |
平均成本下降 |
24% |
task執行總時間下降 |
22% |
平均fetch fail task數下降 |
68->0 |
06 未來展望
為了讓 Shuttle 能夠影響更多的計算,我們決定將 Shuttle 專案開源[9]。
對於技術演進方向,我們計劃從三個方向進行:
1、接入更多的計算引擎,比如 Flink、Trino等。
2、依託現有的分散式儲存,優化底層儲存,適應Shuffle場景的特殊需求。
3、提供更多的計算服務,不侷限於Remote Shuffle服務。
附錄
[1] Haoyu Zhang, Brian Cho, Ergin Seyfe. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. ACM 2018
[2] Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020
[3] 阿里雲EMR Remote Shuffle Service在小米的實踐.
http://mp.weixin.qq.com/s/xdBmKkKL4nW7EEFnMDxXYQ
[4] 《Hadoop權威指南》
[5] Ubser Spark RSS:
http://github.com/uber/RemoteShuffleService
[6] 騰訊Spark RSS FireStorm:
http://github.com/Tencent/Firestorm
[7] 阿里雲 Spark RSS:
http://github.com/alibaba/RemoteShuffleService
[8] CubeFS:
http://github.com/cubeFS/cubefs
[9] Shuttle:
http://github.com/oppo-bigdata/shuttle
作者簡介
David Fu OPPO大資料計算平臺架構師
負責大資料計算平臺技術演進設計開發,曾供職於阿里雲,去哪兒網大資料平臺,擁有10年大資料架構,開發經驗
本文分享自微信公眾號 - OPPO數智技術(OPPO_tech)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。
- OPPO資料湖統一儲存技術實踐
- Quarkus-雲原生時代Java的曙光?
- Quarkus-雲原生時代Java的曙光?
- Shuttle:高可用 高效能 Spark Remote Shuffle Service
- Shuttle:高可用 高效能 Spark Remote Shuffle Service
- OPPO自研雲原生分散式任務排程平臺
- OPPO自研雲原生分散式任務排程平臺
- GTC 2022:GPU推理加速在OPPO NLP場景的優化落地
- OPPO雲資料庫訪問服務技術揭祕
- 全鏈路非同步Rest客戶端 ESA RestClient
- 全鏈路非同步Rest客戶端 ESA RestClient
- OPPO唐黎:零程式碼技能平臺技術實踐探索!
- MySQL 分散式事務的“路”與“坑”
- PendingIntent重定向:一種針對安卓系統和流行App的通用提權方法——BlackHat EU 2021議題詳解(上)
- AI算力加速之道
- AI算力加速之道
- 5分鐘瞭解ScyllaDB
- 5分鐘瞭解ScyllaDB
- ORTC與SIP融合通訊服務架構
- ORTC與SIP融合通訊服務架構