Presto的一些基本概念

語言: CN / TW / HK

模型

Presto 是 Facebook 開源的 MPP (Massive Parallel Processing) SQL 引擎,其理念來源於一個叫 Volcano 的並行資料庫,該資料庫提出了一個並行執行 SQL 的模型,核心思想就是 Operator Model 和 Iterator Model。

  • Operator Model
    即通過各種 Operator 組成一棵樹,樹的根節點負責結果輸出,樹的葉子節點是各種 TableScan。這棵樹被稱作 Plan(執行計劃),資料庫裡又被細分為邏輯執行計劃和物理執行計劃。這棵樹是由 SQL 經過詞法、語法分析及語義分析後,生成一個 AST(Abstract Syntax Tree),一般經過 Visitor 模式遍歷後生成。原始資料通過葉子節點 TableScan 讀取資料,然後經過各個 Operator的計算,包括(TableScan、Project、Filter、Exchange、Agg、Join、TaskOutput等)產出結果。
  • Iterator Model
    顧名思義就是一個遞迴迭代過程,Plan 樹的各節點有三個狀態,Open、GetNext及Close。從根節點 Open 開始遞迴呼叫 GetNext 獲取資料,即父節點遞迴呼叫子節點介面直到沒有結果為止,然後Close。

概念

Stage

MPP的理念就是能儘量細粒度的將 SQL 並行執行,以一個 SQL 2個表 JOIN 後 Agg 為例,那麼每個表都可以單獨並行執行去 Scan 資料(互不影響),然後進行 Join 和 Agg。所以執行計劃(Plan)將執行 PlanFragment,即將一個樹分塊變為各個子樹,每個子樹可以並行的在多臺機器上執行,這個 Fragment 被稱為 Stage。

Presto根據 Stage 的用途,分為四種stage:

  • Coordinator_Only:一般表示 DDL,DML 的 Stage
  • Single:用於聚合子 stages 資料,並最終將資料輸出給終端使用者。比如每個查詢中的根節點(Root Stage)
  • Fixed:用於接收子 Stage 產生的資料,並在叢集中對這些資料進行聚合或分組計算
  • Source:連線資料來源,從資料來源讀取資料

我們以簡單的SQL查詢為例,SQL為select id from table limit 1; 這個SQL簡單來說,就幹了2件事,一是Scan資料,另外是Limit,而這2件事,可以並行執行,所以如圖所示,其分為2個Stage:

Stage 1 為Scan資料和Limit,這裡Limit是下推優化。Stage 0為最終結果輸出。

同時Presto UI裡可以看到每個Stage詳細資訊,以及每個Stage需要的Task數(可以認為Worker數),如圖所示:

Exchange

連線不同的 Stage,用於不同 Stage 之間的資料互動。資料的互動有一些Operator實現,比如資料是Hash分發還是完全Replicate等。從上圖可以看到Stage 1 和Stage 0 需要互動,通過Exchange實現。

Task

Stage 有多個 Task 組成。Stage 並不會執行,其實個抽象的概念,其只是負責管理 Task 和封裝建模。Stage 實際執行的是Task。每個Task處理一個或者多個Split,每個Task被分配到每臺機器上執行。每個Task都有對應的輸入和輸出。同一個Stage下的Task是個並行的概念,做的事情是相同的。

如下圖所示,我們可以看到每個Tasks的相信資訊,其中0.x表示Stage 0,1.x表示Stage 1,同時我們也可以看到每個Task執行花費的時間,讀取的資料大小以及每個Task處理的Split數目。一個Stage包含一個或多個Task,每個Task做的事情是一樣的,所以每個Stage的花費時間由最慢的Task決定,比如Scan HDFS資料,可能會因為某些Data Node阻塞導致Task阻塞。

Driver

Task 被分解成一個或者多個 Driver,並行執行多個 Driver 的方式來實現 Task 的並行執行。Driver 是作用於一個 Split 的一系列 Operator 的集合。一個 Driver 處理一個 Split,產生輸出由 Task 收集並傳遞給下游的 Stage 中的一個 Task。一個 Driver 擁有一個輸入和輸出。

Operator

Operator 表示對一個 Split 的一種操作。比如過濾、轉換等。 一個 Operator 一次讀取一個 Split 的資料,將 Operator 所表示的計算、操作作用於 Split 的資料上,產生輸出。每個 Operator 會以 Page 為最小處理單位分別讀取輸入資料和產生輸出資料。Operator 每次只讀取一個 Page,輸出產生一個 Page。

Split

一個分片表示大的資料集合中的一個小子集,與 MapReduce 中的 Split 概念類似。對於Hive中的表,一個Split就是HDFS檔案的一個分片。根據檔案格式是否分片(如ORC,Parquet),該Split可能是一個Block的大小,也可能是整個檔案。

Page

Presto 中處理的最小資料單元。一個 Page 物件包括多個 Block 物件,而每個 Block 物件是一個位元組陣列,儲存一個欄位的若干行。多個 Block 的橫切的一行表示真實的一行資料。一個 Page 最大 1MB,最多 16 * 1024 行資料

Pipeline

Stage 裡有很多 Operator,這些 Operator 可能並行度是不一樣的,比如 Scan 資料並行就很大,但是最後聚合資料,並行一般為1。所以 PlanFragment 又會被切分為若干 Pipeline,每個 Pipeline 由一組 Operator 組成,這些 Operator 被設定同樣的並行度。Pipeline 之間會通過 LocalExchangeOperator 來傳遞資料。

在 Presto UI 上我們可以看到 Pipeline資訊,如下圖所示,Stage 0 主要是將 Exchange 的資料,做最後的 limit,所以其可以細分為 2 個步驟,LocalExchangeOperator 及 LimitOperator,這2個動作的並行度是不一樣的,Exchange 可以多個執行緒去做,而 Limit 只需要一個執行緒。從圖中我們可以看到 Driver 和 Operator 資訊,其中 Driver 的數目就是這個 Pipeline 的並行度。

因為後續會陸續介紹 Presto 的一些執行流程,為了防止被一些概念繞暈,所以本文主要是對 Presto 的一些概念和專有名詞做了一些科普和解釋。

參考資料

  • 《Presto技術內幕》
  • 《Presto基本概念》