Spark 實踐 | 蘋果工程師分享如何運維超大規模 Spark on Kubernetes 叢集

語言: CN / TW / HK

我們公司使用Spark on YARN作為最主要的大資料計算引擎,目前跑在AWS的EMR叢集上。但是自從去年所有的Web應用都已經從EC2遷移到了k8s叢集上來以後,每日的資料計算任務也遷移到k8s叢集就被提上了日程。本週正好看到蘋果的工程師在今年的Data+AI Summit上分享蘋果在運維超大規模Spark on Kubernetes叢集時學到的經驗教訓,覺得蠻有幫助,所以翻譯整理出了這篇文章。

原視訊在Youtube,地址:https://www.youtube.com/watch?v=LVlOv0snXog

(注:由於筆者對k8s不太熟悉,因此文中可能會有部分錯誤,歡迎各位讀者指正)

蘋果的大資料需求和平臺架構

蘋果大資料團隊的Leader:Aaruna Godthi首先分享了蘋果大資料平臺的應用場景以及架構的演變歷史。

大資料平臺整體架構

可以看到,蘋果的大資料平臺主要服務四類使用者:資料工程師,資料科學家,ML工程師,資料分析師。業務需求主要包括資料建模,演算法訓練,BI等。

從技術視角來看,蘋果的大資料平臺使用Spark和Flink作為計算引擎,Airflow作為任務排程系統,而底層使用Iceberg作為儲存格式,Ad-hoc查詢則使用Trino。

根據Aaruna Godthi的描述,蘋果的大資料平臺也是最近才遷移到k8s之上,之前的架構基於的編排系統是mesos。

基於mesos的架構

注意圖中的 Spark Orchestrator ,這是蘋果自主研發的一個系統,用於接受來自多個系統的Spark任務提交,並且根據系統負載,任務優先順序,SLA等級要求,對Spark任務的執行順序,優先順序,分配的資源等進行管理控制。 在Spark on Kubernetes架構裡有點類似於YARN的Capacity Scheduler承擔的職責。 後文提到的很多排程策略,也都是實現在這一系統中。

在Spark on Mesos叢集運行了好多年以後,蘋果的大資料團隊對其進行了改造,將任務從mesos遷移到了k8s上來。這次分享的就是在遷移過程中他們遇到的一些困難和他們的解決辦法。

新平臺遇到的挑戰和解決方案

首先展示一下遷移完成後的新平臺架構圖

遷移到k8s後的架構

可以看到Spark Orchestrator依然保留了下來。而在k8s叢集這一側,除了叢集固有的API Server和Scheduler,還有一個標黃的元件: Spark Operator 。這是由Google開源的Kubernetes Operator,可以在Github上找到。

對於新叢集的任務規模,Aaruna Godthi說 現在叢集每天處理38萬個任務,共會產生500萬個Executor 。可見任務量還是比較巨大的!

接下來由Zhou Jiang分享了從Spark on Mesos遷移到Spark on Kubernetes過程中遇到的各種挑戰。

etcd被撐爆

在首次嘗試遷移到k8s時遇到的第一個問題是 etcd在不到10分鐘的時間裡就被撐爆了

原因很快就被定位到,是因為同時產生的event過多,而且每次更新都會產生一個新的版本,所以導致資料量過大,結果就迅速撐爆了etcd。

蘋果的工程師解決這一問題採用了以下的改進措施:

  1. 調大etcd的容量。

  2. 採用更激進的compaction策略。

  3. 把resource和event的儲存分離,單獨儲存event。

Partially Running導致的資源死鎖

當使用k8s來排程Spark任務時,有時候會出現這樣一種情況:假設叢集總共支援600個executor。任務A需要400個executor來執行,現在已經申請到300個executor,正在申請剩下的100個executor;而任務B也需要400個executor執行,也已經申請到300個executor,也在申請100個executor。 這樣整個叢集就陷入了一種死鎖狀態 ,無論任務A還是任務B,都得不到足夠的資源執行下去。這種申請到部分資源的狀態就被稱為Partially Running狀態。

Partially Running的情況

為了解決這個問題,蘋果的工程師做了兩件事情:

  1. operator實現批量地申請executor。

  2. 當一定時間內無法申請到足夠的executor時,operator會殺掉application,並等待一段時間後重試。

這些改造沒有開源,而是實現在了蘋果內部的Spark Operator裡面。它的配置項及其含義如下。

properties:
spark.executor.instances: 400
spark.apple.executors.min.threshold.ratio: 0.8 // requires at least 80% of total executors
spark.apple.executors.startup.timeout: 60000 // terminate if cannot get enough executors after 10 min
spark.apple.backoff.duration.on.failure.ms: 30000 // backoff 5 min before attempting restart

監控任務的資源利用率並提出建議

Spark任務需要使用者在任務配置裡指定任務所需的executor數量,最大記憶體等。雖然從底層實現的角度來看是一件必要的事情,但其實對使用者來說這是個兩難選擇:如果資源給得太少,任務會因為資源不足而被kill掉;如果資源給得太多,又有很多浪費。 如果想給出一個相對精確的數量,使用者又缺少這方面的資料 。最終往往只能儘可能地多配一點資源,畢竟哪怕資源浪費一點,也比任務失敗強。

蘋果解決這個問題的思路是, 註冊一些Spark的listner用於收集記憶體、磁碟用量、shuffle資料量等資訊,收集整理成報表向用戶展示,並根據這些統計資訊向用戶提出一個推薦配置。

最下面一欄就是系統的推薦配置

個人覺得這是一個非常好的功能。我們公司目前也在著手提高任務整體資源利用率的工作,這個功能對於提高資源的利用率應該會很有幫助。

節點回收導致shuffle output files lost

在Spark 2.4以上,一般會開啟Dynamic Allocation功能。Dynamic Allocation功能會把已經計算完成的executor歸還給scheduler,從而減少pod閒置的時間,提高整體的資源利用率。

但是pod回收會導致一個副作用,就是儲存在這個pod上的shuffle output file也被一併回收了,會影響下游task的執行,不得不重新計算一遍這個rdd。

於是蘋果提出了兩個辦法來解決這個問題。

一是使用SPARK-20624新增的shuffle tracking功能。具體的是以下幾個配置項。

// enable state tracking
spark.dynamicAllocation.shuffleTracking.enabled
spark.dynamicAllocation.shuffleTracking.timeout
spark.dynamicAllocation.cachedExecutorIdleTimeout

或者也可以使用external shuffle storage(不同於external shuffle service),例如支援常用的各種物件儲存,如s3,gcs等,可以將他們暴露成pvc,然後掛載到pod裡面。

// or external shuffle storage
spark.shuffle.externalStorage.enabled
spark.shuffle.externalStorage.backend
spark.shuffle.externalStorage.bucket

這種做法很有參考價值。我們公司目前跑在EMR上的Spark叢集,就經常會因為instance pool庫存緊張而被強制回收了spot instance,引起shuffle output files lost。後續可以考慮使用external shuffle storage。

History Server的儲存

相信運維過Spark叢集的朋友,都對event log又愛又恨。因為Spark的event log能夠完整還原任務的執行情況,對於排查故障特別是優化任務效能很有幫助。但是event log的數量又特別大,很容易就把HDFS的容量佔滿,導致各種問題。

蘋果也遇到類似的問題。他們的Spark on Kubernetes叢集的History Server應該是儲存在叢集的PV上,所以有一定的容量限制。為了把長期的event log儲存下來,蘋果工程師的做法是擷取超過24小時的log,把這些檔案儲存在s3等物件儲存之上。這樣可以解決叢集的儲存空間有限的問題。

總結

最後總結下我從這個分享中學到的一些東西:

  • Spark on Kubernetes不可避免地需要一個Spark Orchestrator負責資源的協調工作,否則任務的優先順序,資源隔離,Gang Scheduling等功能都是缺失的。但是目前沒有開源解決方案能提供這樣的功能,可能需要自己開發。

  • 可以用Spark Listener功能收集Spark任務的各種效能指標,讓使用者知道跑一個任務大致需要多少資源,從而更合理地申請資源。

  • 可以用s3等外部儲存作為Spark的Shuffle Storage,減少因節點被回收導致shuffle資料需要重算的機率。

如果覺得這篇文章對你有所幫助,

請點一下 在看 ,是對我的肯定和支援~