POLARIS: The Distributed SQL Engine in Azure Synapse

語言: CN / TW / HK

這篇paper介紹了Microsoft Azure雲在去年年底新發布的一款湖倉一體的大數據分析產品Synapse,而這篇paper介紹的就是他的分佈式查詢處理引擎Polaris。

背景

傳統的數據分析業務是基於關係型數倉產品(greenplum/exadata...)執行一些reporting/BI類的workload,但隨着近些年數據在來源和容量上的爆炸,傳統數倉很難滿足新形勢下的業務需求。而數據湖則正是針對數據的快速ingesting而設計,可以存儲海量的非結構化數據。

最新的發展趨勢則是兩者的融合,將結構化+非結構化數據結合在一起,客户可以在不移動底層數據的情況下執行原有的分析查詢,甚至是一些新型的data science/maching learning的大規模計算,而對用户提供統一的SQL工具鏈,這是用户最希望看到的形式,也形成了lakehouse這個大數據發展中的新賽道。看看snowflake的股價和databricks的融資額,這個新趨勢的影響力已經不言而喻。

Microsoft作為雲供應商和數據庫大廠,自然不會放過這個機會,Synapse是它的湖倉一體產品,和相對於snowflake等公司,它有着自有云平台 + 沉積多年的數據庫研發經驗這兩個先天優勢。

這篇paper中介紹的Polaris作為Synapse的查詢處理層,它實現了Synapse的兩個主要的設計目標:

  1. 融合結構化與非結構化數據,對海量異構數據實現統一且高效的處理
  2. 利用雲上基礎設施,實現 計算與狀態的分離 ,從而提供靈活的服務模式

其實已經有悠久的歷史了,它的前身就是SQL Server的PDW,後來到雲上稱為了SQL DW Service,paper中也提到基於原有技術做了很多改進和重構,關於PDW可以看下之前的文章:

技術特點

為了實現結合雲原生 + 湖倉融合的能力,Polaris做了一些技術上的創新,這裏先大概列出來,後面會依次介紹:

  • Cell data abstraction

為了能夠對下層的異構數據直接在原地做統一的處理,Polaris對來自不同storage system的不同data format的數據,統一抽象為"data cell"這個邏輯上的數據塊概念,有了這層抽象就可以屏蔽掉底層存儲的異構特點,統一處理邏輯。

  • Scale out + Scale up結合

Microsoft在數據庫上算是出了名的不喜歡重複造輪子,他們儘可能會利用已有的技術來做適配改造,在Polaris的設計中也充分的體現了這一點,每個計算節點本質都是單機的SQL Server,內部會複用其改造後的優化器 + 執行器,同時也會複用PDW中的分佈式優化器,這樣可以充分利用SQL Server積累多年的技術優勢。

在單節點內儘可能獲取最優的執行效率,而多節點間則由Polaris新的執行引擎負責管理調度,保證大數據量查詢的擴展性、穩定性和高效性。

  • 全局resource-aware的調度

通過實現跨query的細粒度的、感知資源負載的調度框架,可以從全局角度實現對資源的高效利用,並控制多query的併發查詢。

  • 靈活的服務模型

基於以上提到的多個能力,Polaris可以提供多種服務形態(類似於snowflake),包括

  1. server-less 節點數可以從 0 -> N動態調整,根據實際負載情況
  2. reserved pool 節點數從 min -> max動態調整,但不會完全銷燬,預留了固定資源
  3. multiple reserved pools,允許多個pool同時允許,提供多租户 + 彈性的能力
  • 多level的存儲層級

利用SQL Server在2012年就實現的Resilient bufferpool功能,可以將buffer pool擴展到本地的SSD中,從而形成了memory -> SSD -> Remote Storage的3層存儲層級,儘量利用數據的本地化提升性能。

整體架構

  • 計算/狀態分離

瞭解snowflake的同學可能都知道,它主打的幾大優勢中,最為核心的就是:充分利用雲上基礎設施在成本、彈性上的優勢。為了做到真正的彈性,首先要做到的就是 計算與狀態的分離。

可能大家更多瞭解的是計存分離,計算狀態分離是指什麼?我們先來看下歷史:

在最早期的on-premise數倉產品中,一般都採用了share-nothing的架構,每個節點既包含了數據,也包含了數據處理中所有的狀態信息:事務狀態(transaction log),數據分佈元信息,這樣的集羣要改變topo是比較困難的,涉及到數據的遷移,metadata的重置等。

隨着雲的發展,雲原生的分佈式數據庫提出了計算/存儲分離的理念,這是一個很自然的發展趨勢,本質上數據和計算就是兩件正交的事情,兩者的發生頻率、生命週期、成本模型不同。通過把數據和計算分開,可以從2個層次各自管理,各自彈性+高可用。但是從上圖可以看到,雖然數據分離了,執行的很多狀態仍然在計算層:

  1. 以事務為例,如果一個節點fail了,上面的事務狀態會丟失掉,全局事務也只能失效掉,想立即轉移到其他節點再執行這個分支事務是很難做到的。
  2. 如果是query,每個節點保持一部分metadata,並綁定了要處理的數據。節點失效,這種綁定關係無法立即高效轉移到其他節點中,query只能也失效掉。

這些都阻止了查詢實現真正的容錯和彈性,對於大數據分析來説,涉及的計算節點往往很多,失效的概率更高,解決不了容錯問題執行效率就會大大降低。

第3種,計算節點上只有cache,其他的狀態都持久化在外部存儲中(例如snowflake把事務狀態和metadata保存在FoundationDB中),而cache本身是可以重建的,因此可以説計算節點實現了真正的無狀態。好處是不言而喻的,如果一個node失效,從持久化狀態中完全可以判斷該node負責了哪些數據執行到了哪種狀態,就可以有策略做 partial retry ,避免了大查詢整體失敗,提升了執行的容錯能力。

  • Polaris整體架構

上圖中展示了兩個Polaris Pools共享同一個持久化元數據存儲以及底層的海量數據存儲。可以看到架構上和snowflake是基本一致的:

Polaris Pool <-> VirtualWareHouse

Centralized Service <-> Cloud Service

底層異構數據存儲 <-> AWS S3

  1. 每個Pool內是無狀態的,數據在底層,metadata + transaction log在Centralized Service中
  2. 在抽象+共享的數據視圖下可以有多個pools存在,使得多租户 / 多workload成為可能

每個pool內部由一系列的微服務(VM/K8S)組成,各自具有不同的角色,從上到下依次是

  • SQL Server Front End 接收用户query,完成部分優化生成Memo結構,併為data cell綁定metadata (distribution/partition...)
  • Distributed Query Processor(DQP),負責分佈式優化 + 執行 + 執行管理 + workload管理
  • 大量Compute Servers完成實際的計算,每個Server包含2個服務:
    • Execution Service(ES),負責執行中的狀態管理
    • SQL Server負責實際的執行
  • 各個服務之間包含兩個通道:
    • data channel 負責數據傳輸
    • control flow channel 負責執行狀態、信令傳輸

Data Abstraction

底層的數據可以在不同的storage中,具有不同的data format,但上層是無法對這些異構因素一一考慮的,為了做到統一處理,引入了"data cell"這個乾淨的抽象。

在Polaris中,底層的data set邏輯上被視為data cells的集合。data cell被分配到不同的compute node上實現並行計算,每個data cell是self-contained的,包含有描述的meta + statistics。Polaris自身的DQP不關心data cell內部細節,對於cell內部的解析+處理完全由每個計算節點的SQL Server計算引擎完成。

  • data cell概念

作為數據的抽象,data cell包含了數據兩方面的屬性:

  1. hash-distribution,系統定義的function,會根據用户指定的distribution key,將數據對象分發到不同的計算節點中,Polaris會盡可能將data cell分發到大量的nodes中實現充分的並行計算。對於很多算子(equal join/group by),可以利用分佈的特性來避免數據shuffle。
  2. partitioning,用户定義的function,可以根據partition key,將數據劃分為若干分區,主要目的是為了做partition pruning,通過在partition key上的range/equality謂詞剪枝掉儘可能多的cells。

如上所示,data set從兩個維度上做了拆分,物理上cell可以分成一些group來方便就近獲取。有了data cell的抽象後,整個異構的存儲層就可以和計算層隔離開:

Polaris只需要關心data cells就可以了,屏蔽了下層眾多的存儲系統(事務型/分析型)。

  • 彈性的query processing

query的執行可能涉及大量節點,node fail的概率會顯著升高,此外為了實現彈性,集羣topo會隨着workload的改變而調整也會是常態。因此要求Polaris能夠根據集羣topo的變化,動態的將data cell分配到node上(當然為了實現這一點,將所有state保存在外部是前提條件),彈性一般會提現在如下幾種場景

  1. Auto-Scale

DQP可能會根據workload情況拉起新的node或釋放老的node,在這個過程中,已經運行的query不發生變化,而新的query將感知新的topo來分配執行task(task後續介紹)。可見動態調整的粒度在query level

如上圖所示,當負載很高時,可以動態拉起一些nodes,隨着新query的到來,負載將逐步均衡。

2. Resilience to Node Failures

如果出現node failure,DQP通過其基於 層級狀態機 的調度機制可以感知到,並把失敗的task轉移到其他存活節點重新執行,這個能力對於保證大規模查詢穩定執行是非常重要的。(關於層級狀態機後面會講到)

可以看到,上圖中節點4掛掉後,負載會均攤到剩餘存活節點中。

3. Skewed Computations

在架構圖中我們看到DQP和ES之間是有control channel的,基於這個channel實現了feedback loop機制,跟蹤在node上執行task的狀態,如果node負載過重可以做一些re-balance的操作,調度走一些task,如果這樣也無法解決hot spot問題,就會觸發auto-scale,加入更多節點實現負載均攤。

上圖可看到黃色node已經overload了,可以將task調度出去來減輕其負擔。

4. Affinitizing Tasks to Compute

由於SQL Server的Resilient bufferpool能力,其上的local SSD也稱為了cache的一部分,這部分cache會盡可能的被利用,來避免從remote storage上重新拉數據,DQP會根據cache中數據的分佈情況,將task分發到要計算的data cache上去,但對cache的具體管理算法paper裏沒有提到。

查詢優化

Polaris中的查詢優化基於SQL Server PDW演進而來,整體的優化流程和計劃形態基本保持一致,優化過程分為2個階段:

  1. 單個SQL Server Frond End從metadata存儲中獲取cell相關信息,並利用單機SQL Server的Cascades QO生成單節點的search space(MEMO)
  2. MEMO被髮送到DQP組件,DQP基於MEMO做bottom-up enumeration,枚舉每個算子可能的分佈式執行方式,比基於輸出的物理屬性劃分等價類,逐步向上傳遞局部最優解:

具體細節仍然可參考這篇文章:

Polaris的paper中主要講解了第2階段也就是分佈式優化的一些內容,我們也具體來看下:

在基於基礎的Memo結構去枚舉所有的分佈式執行計劃時,Polaris仍然利用了Cascades中的physical property這個概念來表示數據(data cell)的物理屬性(有序、分佈。。),對於分佈式優化來説,分佈屬性是尤其重要的,paper中描述了幾種可能的分佈方式:

  1. h(c): 基於hash function對於data object在c這個屬性列上做hash distribution,生成的分佈方式描述為:
  2. B(...),所有data object被廣播到所有節點上,記為
  3. data object非hash distribution的方式劃分到節點上,例如round-robin,記為

這些分佈屬性在枚舉中有兩方面的作用

  1. 作為Property Requirement,保證數據分佈與執行方式匹配,從而可以得到正確的執行結果,例如考慮兩表P Q join on P.a = Q.b,那麼仔細考慮下可能的並行執行方式,無非一下幾種:
  • P在join key a上hash分佈,且Q在join key b上hash分佈
  • P在所有節點上廣播,這是Q可以採用任意分發方式
  • Q在所有節點上廣播,這是P可以採用任意分發方式

如果不符合以上模式,例如P/Q均在所有節點上隨機分佈,那麼很明顯我們無法得到正確join結果,這種正確性模式的保證,就是Property Requirement,描述為:

那麼在枚舉過程中,如果對一個算子的輸入,無法滿足其Property Requirement,則不予考慮。

2. 作為Interesting Property,DQP採用了System-R思路的bottom-up枚舉,由於有了上面提到的Requirement,每個算子的輸出物理屬性將受到父算子的執行語義的約束,因此Requirement也就是構成了Interesting Property.

下圖是兩表P join Q on P.a = Q.b的一個枚舉示意:

枚舉是bottom-up的,對於P/Q兩表,其初始的分佈為 ,考慮到Property Requirement(Interesting Property),可以各自枚舉出可以被接受的分佈方式。

到join算子時,會對兩邊輸入的分佈方式進行組合,並根據算子語義判斷各種組合是否可以匹配,具有相同輸出物理屬性的多種組合中選擇代價最低的,作為這個算子的局部最優解集合,再傳遞到上層算子,本例中,每種可能的interesting property中選擇一個winner:

並行優化完成後,計劃被描述為由physical operator組成的DAG。這裏之所以稱為DAG,是由於DQP採用了push的執行模式。和其他的分佈式計劃一樣,算子樹按照exchange(Polaris稱為Data Move Enforcer)節點進行切分為若干slice(Polaris稱為task template),task template描述一個計劃片段,會在若干個compute node上 實例化 為具體的task並行執行。

Polaris與Presto/Impala等計算引擎不同的是,task template之間採用了阻塞型的push,數據從child template發送到parent template後,會先 持久化 到目標node的本地SSD中,數據發送完畢後,上層template才能夠執行,這樣在template之間形成了Precedence constraints(前序約束),如下圖所示:

左邊是我們常見的物理算子樹,右圖則是根據exchange切分後的DAG圖,每個紅框內是構成一個template,是一個可以完整地在多個節點上並行執行的計劃片段。存在輸入關係的template之間形成了前序約束。

這種執行模型複用了PDW原有的方式,它的最大優點就是便於做 partial retry ,此外每個task template將被重新翻譯回T-SQL,下發到計算節點的SQL Server實例中,這樣利用中間物化結果中精確的統計信息,在節點內重新優化,可能生成潛在更優的局部執行計劃,加速查詢執行。

query-level執行調度

執行調度是Polaris最為創新也最具有工程複雜度的部分,包括單個query內的調度算法以及跨query的整體調度模式,下面分別介紹。

上一節已經描述,在並行優化完成後,會生成query task DAG這樣的計劃描述,具體的調度策略採用了一種稱為 hierarchical composition of finite state machines 的模型,我們就稱為層級狀態機吧,先解釋下這個模型

  • 執行中的各個部件被統一為entity這麼個概念,包括query DAG,task templates和task。
  • leaf task template當可以執行時,被實例化為對應的tasks集合,這些tasks實體“組成”了所屬的task template
  • non-leaf task template有前序約束的其他task templates,這些約束templates"組成"了這個task template

上面提到的這種“組成”關係也被稱為dependencies。

每個entity都有對應的狀態機描述其執行中的狀態以及相應的跳轉,這些信息會被記錄到持久化存儲中作為log保存。具體的狀態包括:

  • Simple state : 表示task template或者task是success / failed / ready
  • Composite state : 表示task template正在run / blocked,Composite state的特定是它向其他state的跳轉是由其dependencies的執行情況來決定的。

用一個具體例子來説明:

上圖是P join Q join R的執行計劃的DAG圖以及執行中的層級狀態機的轉換

  1. 初始時,query DAG這個entity處於ready狀態,當被調度時,它進入run狀態並實例化它的dependency也就是task template T1(P join Q join R)這個entity
  2. Task template T1被實例化後處於blocked狀態因為它還有dependencies T2 + T3,這兩個entity將被實例化
  3. T2 / T3可以直接進入ready狀態因為它們不再有依賴的task templates,然後放入到調度隊列中
  4. 當從隊列中取出進入run狀態後,T2/T3將會實例化對應的task,task進入run狀態執行
  5. 所有task都執行成功為success狀態後,success會傳遞到T2/T3的task templates,T2/T3也就變為success狀態並傳遞到parent T1
  6. T1發現所有dependencies的狀態都為success時,就會解除blocked狀態進入ready狀態從而進入調度隊列,後續處理與4一致
  7. 如果task執行失敗進入failed狀態,failed也會傳遞到上層,會觸發失敗類型的分析,引入以下兩種情況:
    1. failed是可恢復的(node failure),該task將會調度到其他node上做retry
    2. failed是不可恢復的,failure狀態會傳遞到上層task template,task template這個entity會被標記為failed,由其parent template在整個template粒度上,嘗試retry

整個錯誤的重試和傳播機制如下圖3 -> 4所示:

整個狀態機的粗略流轉如下(忽略了不少情況):

個人感覺採用這種狀態機的模型以及抽象出entity這個概念,主要有以下幾個好處:

  1. 統一對執行錯誤的處理邏輯 + log記錄邏輯,大多數邏輯對所有entity都適用
  2. 可以保證前序約束一定被滿足(blocked -> run)
  3. 可以做partial retry,在task / task template兩個粒度上做重試,保證執行的穩定性
  4. 可以做基於log的重放,便於debug或者pause -> resume這種靈活的執行方式

workload-level執行調度

在一個workload內,一定有不止一個query在併發執行,可能是幾千個light weight的query,或者是少量非常複雜的大查詢,上一節描述了一個query內部如何調度,現在看下query之間的調度策略。

Polaris維護了所有active query的一個global view,記為workload graph,實際上就是所有task DAG的集合。

在workload graph中的每個task都有響應的resource demand,可以用resource vector這個d-維向量來描述task對於各類資源的需求情況,resource demand是有task template中各個算子的計算和輸入data決定的,同樣每個compute node被建模為d-維的資源桶,這樣整個跨query的執行調度問題就變成了:

如何調度具有d-維資源需求的task集合,使其在滿足前序約束 + 滿足資源需求的前提下,以最短時間執行完成。

具體算法邏輯粗略來看並不複雜:

scheduler週期性被喚醒後,查看workload graph中所有處於ready狀態的task template到調度隊列中,調度策略就是以task template為單元實施的,可以有不同的policy:

  • FIFO
  • Sorted by resource demand (min to max / max to min)
  • Sorted by proximity to the root,這個表示説越接近完成的template越優先被調度

當選中下一個要調度的template後,scheduler會檢查其實例化的每個task的resource demand是否能夠被目標node所滿足,如果可以則各個task直接進入run狀態,否則要 等待 目標node資源充足後,task template才可以執行。

這種資源調度策略有以下優勢:

  • 以task template作為執行的調度單元,相比於query level來説更細粒度,更有利於資源的充分利用
  • 在將task分配到目標node時,基本原則是極可能最大程度利用該node的資源,避免over-provisioning造成浪費
  • 在調度中可以基於workload情況動態調整調度的policy,實現更大靈活性,例如:

一開始調度隊列中是{T1,T2,T3,T4,T5}這些templates,綠圈中的值代表他們的resource demand,採用了max to min的policy,因此調度順序是{T1,T2,T4,T5,T3},當開始執行後,假設T3無法fit而處於等待,所以前4個template進入run狀態,後續T4,T5執行完成,scheduler在隊列中看到的是{T3,T8},此時workload manager如果發現系統的disk resource比較緊張(之前template執行完成發生落盤),它可以切換policy為proximity to the root,來儘快釋放佔用的disk resource,此時調度順序就變為了{T8,T3}。但很遺憾這種動態的調整策略paper中沒有詳述。

  • 基於demand vs. available的資源情況,實現back pressure,當可用資源不足時,後續query將被拒絕

總結

這篇paper介紹了Polaris的整體架構和一些查詢優化 + 執行調度的技術方案,由於講的粒度還是比較粗的,可能只能對總體框架+設計思路上有些啟發,不過從paper中我們確實可以得到一些takeaway:

  • 湖倉一體是大數據分析發展的新賽道,目前snowflake/databricks/各大雲廠商已經擠了進去,大大小小做AP的廠商也在發力
  • 隨着AI/data science的發展,schema-less數據越來越重要
  • 計算與狀態分離 + 雲基礎設施,發揮成本+彈性的最大優勢
  • query-level的容錯能力非常重要
  • 為保證可預期的性能,workload-level的資源協調是難點
  • 複用有優勢的競爭力,避免不必要的新輪子