Spark系列 - (1) Spark基礎

語言: CN / TW / HK

Spark

1. Spark基礎

Spark是一種基於記憶體的快速、通用、可擴充套件的大資料分析引擎。

下圖是Spark的發展歷史,

1.1 Spark核心模組

  • Spark Core:實現了 Spark 的基本功能,包含任務排程、記憶體管理、錯誤恢復、與儲存 系統互動等模組。Spark Core 中還包含了對彈性分散式資料集(Resilient Distributed DataSet,簡稱 RDD)的 API 定義。

  • Spark SQL:是 Spark 用來操作結構化資料的程式包。通過 Spark SQL,我們可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)來查詢資料。Spark SQL 支援多種資料來源, 比如 Hive 表、Parquet 以及 JSON 等。

  • Spark Streaming:是 Spark 提供的對實時資料進行流式計算的元件。提供了用來操作資料流的 API,並且與 Spark Core 中的 RDD API 高度對應。

  • Spark MLlib:提供常見的機器學習(ML)功能的程式庫。包括分類、迴歸、聚類、協同 過濾等,還提供了模型評估、資料 匯入等額外的支援功能。

  • 叢集管理器:Spark 設計為可以高效地在一個計算節點到數千個計算節點之間伸縮計 算。為了實現這樣的要求,同時獲得最大靈活性,Spark 支援在各種叢集管理器(Cluster Manager)上執行,包括 Hadoop YARN、Apache Mesos,以及 Spark 自帶的一個簡易排程器,叫作獨立排程器。

Spark特點:

1)快:與Hadoop的MapReduce相比,Spark基於記憶體的運算要快100倍以上,基於硬碟的運算也要快10倍以 上。Spark實現了高效的DAG執行引擎,可以通過基於記憶體來高效處理資料流。計算的中間結果是存在於記憶體中 的。

2)易用:Spark支援Java、Python和Scala的API,還支援超過80種高階演算法,使使用者可以快速構建不同的應 用。而且Spark支援互動式的Python和Scala的Shell,可以非常方便地在這些Shell中使用Spark叢集來驗證解決問 題的方法。

3)通用:Spark提供了統一的解決方案。Spark可以用於批處理、互動式查詢(Spark SQL)、實時流處理 (Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同型別的處理都可以在同一個應 用中無縫使用。減少了開發和維護的人力成本和部署平臺的物力成本。

4)相容性:Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和 Apache Mesos作為它的資源管理和排程器,並且可以處理所有Hadoop支援的資料,包括HDFS、HBase等。這對 於已經部署Hadoop叢集的使用者特別重要,因為不需要做任何資料遷移就可以使用Spark的強大處理能力

1.2 Spark與Hadoop的區別

Hadoop是一種分散式儲存系統,而Spark是一種分散式計算框架。與其說是Spark和Hadoop的區別,倒不如說是Map Reduce和Spark的區別。

下圖展示了兩者最主要的區別,

  • MapReduce慢的原因:頻繁讀寫磁碟導致額外的複製,序列化和磁碟IO開銷。

  • Spark為何快:記憶體計算+DAG(delay scheduling)

| 比較專案 | Map Reduce | Spark | | --- | --- | --- | | 框架 | MapReduce由Map和Reduce兩個階段組成,並且通過Shuffle將兩個階段連線起來。(套用MapReduce模型解決問題,不得不將問題分解為若干個有依賴關係的子問題,每個子問題對應一個MapReduce作業,最終所有作業形成一個DAG) | Spark是通用的DAG框架,可以將多個有依賴關係的作業轉換成一個大的DAG。(核心思想是將Map和Reduce兩個操作進一步拆分為多個元操作,並通過一些控制程式組裝後形成一個大的DAG作業。) | | 中間結果儲存方式(最大區別) | Map 步驟是在不同機器上獨立且同步執行的,它的主要目的是將資料轉換為 key-value 的形式;而 Reduce 步驟是做聚合運算,它也是在不同機器上獨立且同步執行的。Map 和 Reduce 中間夾雜著一步資料移動,也就是 shuffle,這步操作會涉及數量巨大的網路傳輸(network I/O),需要耗費大量的時間。 由於 MapReduce 的框架限制,一個 MapReduce 任務只能包含一次 Map 和一次 Reduce,計算完成之後,MapReduce 會將運算結果寫回到磁碟中(更準確地說是分散式儲存系統)供下次計算使用。這樣的讀寫資料會引起大量的網路傳輸以及磁碟讀寫,極其耗時,而且它們都是沒什麼實際價值的廢操作。| 在Spark中,使用記憶體(記憶體不夠使用本次磁碟)替代了使用HDFS儲存中間結果。對於迭代運算效率更高。 | | 操作模型 | Hadoop只提供了Map和Reduce兩種操作,所有作業都得轉換成Map和Reduce的操作 | Spark提供了多種資料集操作型別,比如transformation包括map,filter,groupByKey等,actions操作包括Count,collect,reduce,lookup等等。 | | 應用場景 | 離線大規模分析處理 | Hadoop適用的場景基本都合適,特別對於迭代計算相比Hadoop有更大的優勢。(只有map操作或者只有一次reduce操作的場景下,兩種效能差別不大。) |

總結:Spark採用更先進的架構,在靈活性、易用性、效能等方面都比MapReduce更有優勢,有取代MapReduce的趨勢,但hdfs和yarn依然有其不可替代的作用。

1.3 Spark的4種執行模式

目前Spark的執行模式主要有以下4種:

  • local:主要用於開發除錯Spark應用程式
  • Standlone:利用Spark自帶的資源管理與排程器執行Spark叢集,採用Master/Slave結構,為解決單點故障,可以採用Zookeeper實現高可靠(High Availability, HA)
  • Apache Mesos:執行在著名的Mesos資源管理框架基礎之上,該叢集執行模式將資源管理管理交給Mesos,Spark只負責執行任務排程和計算
  • Hadoop YARN:叢集執行在Yarn資源管理器上,資源管理交給YARN,Spark只負責進行任務排程和計算。

一個完整的Spark應用程式,在提交叢集執行時,它涉及到如下圖所示的元件:

每個Spark應用都由一個驅動器程式(drive program)來發起叢集上的各種並行操作。驅動器程式包含應用的main函式,驅動器負責建立SparkContext,SparkContext可以與不同種類的叢集資源管理器(Cluster Manager),例如Hadoop YARN,Mesos進行通訊,獲取到叢集進行所需的資源後,SparkContext將得到叢集中工作節點(Worker Node)上對應的Executor(不同的Spark程式有不同的Executor,他們之間是相互獨立的程序,Executor為應用程式提供分散式計算以及資料儲存功能),之後SparkContext將應用程式程式碼傳送到各Executor,最後將任務(Task)分配給executors執行。

  • ClusterManager:在Standalone模式中即為Master節點(主節點),控制整個叢集,監控Worker.在YARN中為ResourceManager
  • Worker:從節點,負責控制計算節點,啟動Executor或Driver。在YARN模式中為NodeManager,負責計算節點的控制。
  • Driver:執行Application的main()函式並建立SparkContect。
  • Executor:執行器,在worker node上執行任務的元件、用於啟動執行緒池執行任務。每個Application擁有獨立的一組Executor。
  • SparkContext:整個應用的上下文,控制應用的生命週期。
  • RDD:Spark的計算單元,一組RDD可形成執行的有向無環圖RDD Graph。
  • DAG Scheduler:根據作業(Job)構建基於Stage的DAG,並提交Stage給TaskScheduler。
  • TaskScheduler:將任務(Task)分發給Executor。
  • SparkEnv:執行緒級別的上下文,儲存執行時的重要元件的引用。

SparkEnv內構建幷包含如下一些重要元件的引用。

1、MapOutPutTracker:負責Shuffle元資訊的儲存。 2、BroadcastManager:負責廣播變數的控制與元資訊的儲存。 3、BlockManager:負責儲存管理、建立和查詢快。 4、MetricsSystem:監控執行時效能指標資訊。 5、SparkConf:負責儲存配置資訊。

接下來介紹的4種執行模式,都遵循了下圖所示的通用執行流程,

任務提交後,都會先啟動Driver程序,隨後Driver程序向叢集管理器註冊應用程式,之後叢集管 理器根據此任務的配置檔案分配Executor並啟動,當Driver所需的資源全部滿足後,Driver開始執行main函式,Spark查詢為懶執行,當執行到action運算元時開始反向推算,根據寬依賴進行 stage的劃分,隨後每一個stage對應一個taskset,taskset中有多個task,根據本地化原則,task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通訊,報告任務執行情況。

1.3.1 Local模式

Local模式就是執行在一臺計算機上的模式,通常就是用於在本機上練手和測試。 它可以通過以下集中方式設定Master。

local: 所有計算都執行在一個執行緒當中,沒有任何平行計算,通常我們在本機執行 一些測試程式碼,或者練手,就用這種模式;

local[K]: 指定使用幾個執行緒來執行計算,比如local[4]就是執行4個Worker執行緒。通 常我們的Cpu有幾個Core,就指定幾個執行緒,最大化利用Cpu的計算能力;

local[*]: 這種模式直接幫你按照Cpu最多Cores來設定執行緒數了。

提交任務的簡易流程如下:

其中最重要的角色就是driver和executor。

Driver(驅動器)

Spark 的驅動器是執行開發程式中的 main 方法的程序。它負責開發人員編寫的用來創 建 SparkContext、建立 RDD,以及進行 RDD 的轉化操作和行動操作程式碼的執行。如果你 是用 spark shell,那麼當你啟動 Spark shell 的時候,系統後臺自啟了一個 Spark 驅動器程 序,就是在 Spark shell 中預載入的一個叫作 sc 的 SparkContext 物件。如果驅動器程式終 止,那麼 Spark 應用也就結束了。主要負責:

1)把使用者程式轉為任務 2)跟蹤 Executor 的執行狀況 3)為執行器節點排程任務 4)UI 展示應用執行狀況

Executor(執行器)

Spark Executor 是一個工作程序,負責在 Spark 作業中執行任務,任務間相互獨立。 Spark 應用啟動時,Executor 節點被同時啟動,並且始終伴隨著整個 Spark 應用的生命周 期而存在。如果有 Executor 節點發生了故障或崩潰,Spark 應用也可以繼續執行,會將出 錯節點上的任務排程到其他 Executor 節點上繼續執行。主要負責:

1)負責執行組成 Spark 應用的任務,並將結果返回給驅動器程序; 2)通過自身的塊管理器(Block Manager)為使用者程式中要求快取的 RDD 提供記憶體式儲存。RDD 是直接快取在 Executor 程序內的,因此任務可以在執行時充分利用快取資料加速運算。

下面以最簡單的wordCount程式為例,說明整個資料流程。

1.3.2 Standalone模式

構建一個由 Master+Slave 構成的 Spark 叢集,Spark 執行在叢集中。整個執行流程如下:

這種模式下,可以利用Zookeeper進行HA配置。

1.3.3 Yarn模式

此模式下,Spark客戶端直接連線Yarn,不需要額外構建Spark叢集,有yarn-client和yarn-cluster兩種模式。主要區別在於:Driver程式的執行節點。

  • yarn-client:Driver 程式執行在客戶端,適用於互動、除錯,希望立即看到 app 的輸出。
  • yarn-cluster:Driver 程式執行在由 RM(ResourceManager)啟動的 AP(APPMaster),適用於生產環境。

1.3.4 Mesos模式

Spark 客戶端直接連線 Mesos;不需要額外構建 Spark 叢集。國內應用比較少,更多的 是運用 yarn 排程。

1.3.5 總結

幾種模式的對比如下:

1.4 Spark重要概念

  • SparkContext 是程式執行排程的核心,由高層排程器DAGScheduler劃分程式的每個階段,底層排程器
  • TaskScheduler劃分每個階段的具體任務。
  • SchedulerBankend 管理整個叢集中為正在執行的程式分配的計算資源Executor。

  • DAG (Directed Acyclic Graph)有向無環圖。Spark實現了DAG計算模型,DAG計算模型是指將一個計算任務按照計算規則分解為 若干子任務,這些子任務之間根據邏輯關係構建成有向無環圖

  • RDD (Resilient Distributed Dataset)彈性分佈資料集。是不可變的、Lazy級別的、粗粒度的 (資料集級別的而不是單個數據級別的)資料集合,包含一個或多個數據分片,即Partition。

  • DAGScheduler 負責高層排程,劃分Stage並生成程式執行的有向無環圖。

  • TaskScheduler 負責具體Stage內部的底層排程,具體Task的排程、容錯等

  • Job (正在執行的叫ActiveJob)是Top-level的工作單元,每個Action運算元都會觸發一次Job,一個Job可能包含一個或多個Stage

  • Stage 是用來計算中間結果的Tasksets。Tasksets中的Task邏輯對於同一RDD內的不同Partition都一樣。Stage在Shuffle的地方產生,此時下一個Stage要用到上一個Stage的全部資料,所以要等上一個Stage全部執行完才能開始。Stage有兩種: ShuffleMapStage和ResultStage,除了最後一個Stage是ResultStage外,其他的Stage都是ShuffleMapStage。 ShuffleMapStage會產生中間結果,以檔案的方式儲存在叢集裡,Stage經常被不同的Job共享,前提是這些Job重用了同一個RDD。

  • Task任務執行的工作單位,每個Task會被髮送到一個節點上,每個Task對應RDD的一個Partition。Taskset劃分的Stage會轉換成一組相關聯的任務集。

  • Transformation和Action Transformation運算元會由DAGScheduler劃分到pipeline中,是Lazy級別的不會觸發任務的執行;Action運算元會觸發Job來執行pipeline中的運算。


後端精進之路.png