VLDB'20 Magnet: 領英Spark Shuffle解決方案

語言: CN / TW / HK

本文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 介紹的是領英(LinkedIn)的Shuffle 服務系統 magnet。

領英(LinkedIn)對Apache Spark進行了大規模商用,用於內部的機器學習平臺、商業分析系統等等。每天都有PB級別的資料跑在上萬節點構成的Spark 叢集上。這其中,Spark Shuffle 造成了嚴重的效能瓶頸;Shuffle 的效能問題也讓叢集難以 scale out。 magnet 解決了shuffle 導致的scalability問題,並且讓Spark叢集端到端處理效能提升了30%。

前置知識:Spark官方優化

這篇文章是關於領英內部的Spark shuffle優化方案的。如果對於Spark shuffle本身沒有深入瞭解,其實很難Get到文章的點,畢竟本文的shuffle優化不是一個孤立的問題,而是一脈相承自Spark 官方的優化歷程。

從官方時間線來看,Spark shuffle 的演進至少有以下幾個milestone:

  1. 初版使用hash-based shuffle,特點是當DAG複雜時,檔案數量會爆炸;
  2. Spark 0.8 引入File Consolidation機制,根據並行度來池化檔案,減少檔案數量;
  3. Spark 1.1 引入sort-based shuffle,同一個map task只輸出一個shuffle檔案+index檔案,該shuffle方式在後續版本中成為預設方式;
  4. Spark 1.4 Tungsten-Sort Based Shuffle 來優化shuffle時排序的效能

現在一個Spark DAG拉起來了:

上游的map task需要把資料傳輸給下游的reduce task,每個reduce task 資料集對應一個partition id,我們需要把每個map task輸出的資料根據partition id 分散和傳送出去。

Spark 0.8以前的解決方式是基於雜湊:現在上游有M個map task,下游有R個reduce task,首先在map task所在節點上建立 M * R個本地檔案,map task 輸出的資料通過hash確定對應輸出的檔案位置,寫入到本地檔案中;每個reduce task 通過Driver 得知所需檔案的位置並逐個拉取。如下圖所示:

這種shuffle方式很快出現了問題:當DAG非常複雜,map和reduce task的數量激增,shuffle所需的檔案數量會爆炸;單個shuffle檔案不會很大,海量的小檔案寫入其實是隨機IO,這也導致了IO效能很低。

從Spark 0.8開始,官方引入了File Consolidation機制來優化hash shuffle。假設節點上有C個cpu 核數,每個Task 佔用T個核,那麼並行度為 C / T;假設reduce task數目為R,那麼只需為每個並行度池化一個檔案組,其中包含R個檔案用於資料輸出,檔案池總大小為C / T * R。shuffle過程如下圖所示:

File Consolidation 機制讓shuffle檔案數量下降了1~2個數量級,但這還不夠,Spark 1.1引入了sort-based shuffle,直接把檔案數量降低到了常量級別(並行度 * 2)。

現在上游有M個map task,每個task佔用T個核數,共有C個cpu核數;map task 將資料寫入AppendOnlyMap並根據{partitionId, hash(key)) 進行排序,排序結果能保證屬於同一個partition id的資料在物理儲存上保持相鄰,稱作 block 。最終我們可以把排序結果儲存在一個檔案中,並且用一個index檔案來記錄每個block在檔案中的偏移位置。

Spill技術廣泛運用在大資料引擎中,用於應對記憶體不足的情況;Spark sort-base shuffle也利用了Spill Sort來完成輸出結果的排序。

整個過程如下圖所示:

Spark 1.4 引入了 Tungsten-Sort Based Shuffle,其實是對sort-based shuffle的效能優化,包括:

  1. 利用sun.misc.Unsafe直接操作原始資料,不需要事先反序列化;
  2. 使用cache-efficient的排序資料,每個sort key 佔用8bytes,排序效率更高;
  3. 由於排序不需要反序列化,spill效率更高了。原先需要deserialize->compare->serialize->spill,現在可以直接將原始位元組資料spill到磁碟。

領英Shuffle解決方案

和官方版本一樣,領英使用Spark sort-based方式來進行shuffle。在每天PB級別的workload上,領英工程師發現了這樣幾個問題:

1. 磁碟IO問題

上節介紹Spark sort-based shuffle提到,同一個Task輸出兩個檔案,包括資料檔案shuffle file,由一個個Block組成;index file,標識block在檔案中的位置。下游Executor獲取資料位置後,根據Index拉取block資料。

下圖展示了領英某天真實的block統計資料,可以看到block大多數容量為KB級別,並且block size越小時,shuffle拉取資料的延遲就越高,這其中有兩個主要原因:

  1. 出於價效比考慮,主要使用的是機械硬碟,海量小檔案讀會使得IO效率低下;
  2. 當shuffle資料量線性增長時,伴隨的block數量往往會成平方型增長,論文 Riffle: optimized shuffle service for large-scale data analytics 已有證明。

2. 連線可靠性問題

由於上游多個Executor共享同一個Spark ESS(spark external shuffle service),下游同一個Executor內共享一個連線,假設ESS服務數量為S,下游Executor數量為E,那麼一次shuffle需要維護的連線數量為S * E。

在一次複雜的分析任務裡,S和E的數量都能達到1000級別,一次需要維護數百萬條連線,這就對連線的可靠性帶來了巨大的挑戰;一旦某個下游Executor與上游ESS連線異常斷開,整個stage中的reduce task都需要失敗重試。

Magnet shuffle service

Magnet Shuffle Service是領英引入的一種spark外部shuffle服務,用於優化磁碟io效率,減少ESS連線失敗,提升連線可靠性,解決資料傾斜和task stragglers,並且不會帶來過多的cpu和記憶體消耗。

magnet主要結構和流程:

  1. Spark driver元件,協調整體的shuffle操作
  2. map任務的shuffle過程,增加了一個額外的操作push-merge,將資料推到遠端shuffle服務上;
  3. magnet shuffle service是一個強化版的ESS。隸屬於同一個shuffle partition的block,會在遠端傳輸到magnet 後被merge到一個檔案中;
  4. reduce任務從magnet shuffle service 接收合併好的shuffle資料,不同reduce任務可以共享shuffle資料來提升shuffle傳輸效率。

幾個關鍵特性:

  • push-merge shuffle KB級別小檔案的隨機讀 -> MB級別檔案順序讀
    push操作與map任務解耦,Push失敗不會導致map任務重試
  • Best-effort approach magnet可以接受block push失敗,reduce任務既可以消費合併好的block資料,也可以接受未經合併的小檔案。
  • Stragglers/Data Skews Mitigation 可用來減少資料傾斜和task stragglers(即某個慢任務拖慢整個任務的執行時間)

Push-Merge

push-merge的根本目的是減少reduce側的隨機IO,在Magnet上把小檔案block合併後,reduce task可以讀取連續儲存的、大小在MB級別的檔案,這樣一來,隨機讀取就可以變成連續磁碟空間上的IO。

push-merge的基本單位是 chunk ,map task輸出block後,首先要將block以演算法1的方式分配到chunk中去。

簡單解釋一下演算法1,其實演算法1講的就是block攢批到chunk,chunk長度超限之後又push到magent上的過程。針對每個reduce 任務以及對應的block資料 (編號為i):

  1. 當chunk長度沒有超過限制L,將block (長度為 )append到chunk中,更新chunk長度
  2. 當chunk長度超過限制L,將chunk推到編號為k的Magnet機器上,之後寫入新的block進去(重新初始化);
  3. 時,把chunk推到編號為k的Magnet機器上,並且把編號k+1,這裡需要理解一下, 就是第i個任務所對應的Magnet機器編號,R/n是每臺Magnet機器所對應的reduce task數量。因此這裡需要更新一下機器編號,把chunk傳送到下一臺機器上。

這裡還有一個隱患,根據演算法1,所有map task都按照partition id = 1, 2 ... R的順序來構造和push chunk,這會導致Magnet上資源的熱點和嚴重的爭用衝突。(例如,一次push的所有block都屬於PartitionId=1,那麼在magnet機器上進行merge時,需要排隊等待partitionId=1上的檔案鎖)因此,設計者將chunk按照編號進行了隨機化處理,來避免所有map task按照相同次序push chunk。

Magnet機器上需要維護一些重要的元資訊,包括:

  1. bitmap : 儲存已merge的mapper id,防止重複merge
  2. position offset : 如果本次block沒有正常merge,可以恢復到上一個block的位置
  3. currentMapId :標識當前正在append的block,保證不同mapper 的block能依次append

提升連線可靠性

magnet shuffle服務通過Best-effort的方式來解決海量連線可靠性低的問題。在該體系上,所有連線異常都是non-fatal的,可以理解為每個環節上的連線斷開或異常,都有一個對應的備選和兜底方案:

  • 如果Map task輸出的Block沒有成功Push到magnet上,並且反覆重試仍然失敗,則reduce task直接從ESS上拉取原始block資料;
  • 如果magnet上的block因為重複或者衝突等原因,沒有正常完成merge的過程,則reduce task直接拉取未完成merge的block;
  • 如果reduce拉取已經merge好的block失敗,則會直接拉取merge前的原始block。

Spark Driver維護了MergeStatus,包含已經merge的block的資訊;MapStatus,代表所有map task,即未完成merge的block資訊。通過維護MergeStatus和MapStatus,可以完成上述的備選方案,達到Best-effort的效果。

解決Task Straggler問題

task straggler問題也是一個非常有趣的點。下圖(a)代表觀測到的map task和reduce task的執行時間,橫軸為時間線;圖(b) 代表隱藏在背後的真實任務執行情況。

解釋一下圖(b),藍色代表map task執行時間,綠色代表push-merge時間,紅色代表真實的reduce task執行時間。可以看到有一個map task的push-merge執行非常緩慢,導致整個reduce task需要等待block merge完成,才能開始執行。

為了解決這個問題,magnet服務設定了push-merge超時時間,如果block沒有在超時時間內完成push-merge,magnet服務會停止繼續接受block,提前讓reduce task開始執行;而未完成push-merge的block,根據上節中提到的Best-effort方案,reduce task會從MapStatus中獲取狀態與位置資訊,直接拉取沒有merge的block資料。圖(c)展示了在task straggler問題解決後,reduce task完成時間大大提前了。

Data Skew 問題

在Spark shuffle過程中,如果某個partition的shuffle資料量遠高於其他partition,則會出現資料傾斜(data skew)問題。 data skew 不是magnet特有的問題,而是在Spark上已經有成熟解決方案,即大名鼎鼎的 adaptive query execution

magnet需要適配Spark 的adaptive execution特性,同時防止一個magnet服務上因data skew而導致有 100GB / 1TB級別的資料需要merge。為此,針對上文的演算法1,做出了一些改進,具體辦法是,通過限制 size超過閾值的block被併入到chunk中;如果超過閾值,則會利用上節中的Best-effort方案,直接拉取未完成merge的block資料。而普通的、未有data skew情況的block,則會走正常的push-merge流程。

參考文獻

[1] Shen M, Zhou Y, Singh C. Magnet: push-based shuffle service for large-scale data processing[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 3382-3395.

[2] Zhang H, Cho B, Seyfe E, et al. Riffle: optimized shuffle service for large-scale data analytics[C]//Proceedings of the Thirteenth EuroSys Conference. 2018: 1-15.

[3] Spark Architecture: Shuffle Spark Architecture: Shuffle

[4] Spark sql adaptive execution at 100 tb. https:// software.intel.com/en-u s/articles/spark-sql-adaptive-execution-at-100-tb (Retrieved 02/20/2020).