Presto 實踐 | Presto 在B站的實踐

語言: CN / TW / HK

1

架構

1.1 B站SQL On Hadoop 整體架構

在介紹Presto在B站的實踐之前,先從整體來看看SQL在B站的使用情況,在B站的離線平臺,核心由三大計算引擎Presto、Spark、Hive以及分散式儲存系統HDFS和排程系統Yarn組成。

如下架構圖所示,我們的ADHOC、BI、DQC以及資料探查等服務都是通過自研的Dispatcher路由服務來進行統一SQL排程,Dispatcher會結合查詢語句的語法特徵,庫表讀HDFS的資料量以及引擎的負載情況等因素動態決定選擇最合適的計算引擎執行,如果是Hive的語法需要用Presto執行的,目前利用Linkedin開源的coral對語句進行轉化,如果執行的SQL失敗了會做引擎自動降級,降低使用者使用門檻;排程平臺支援Spark、Hive、Presto cli的方式提交ETL作業到Yarn進行排程;Presto gateway服務負責對Presto進行多叢集管理和路由;我們通過Kyuubi來提供多租戶能力,對不同部門管理各自的Spark Engine,提供adhoc查詢;目前各大引擎Hive、Spark、Presto以及HDFS都接入了Ranger來進行許可權認證,HDFS通過路徑來控制權限,計算引擎通過庫/表來控制權限,並且我們通過一套Policy來實現表, column masking和row filter的許可權控制;部分元件比如Presto、Alluxio、Kyuubi、Presto Gateway、Dispatcher, 包括混部Yarn叢集都已經通過公司k8s平臺統一部署排程。

1.2 Query查詢情況

目前在Adhoc查詢場景,Presto引擎佔比接近一半。ETL常見主要還是Spark和Hive,隨著我們不斷的對Hive作業遷移到spark,ETL作業spark佔比達到64%。

2

Presto應用

Presto是由facebook 開源的分散式的MPP(Massive Parallel Processing)架構的SQL查詢引擎。基於全記憶體計算(部分運算元資料也可通過session 配置spill到本地磁碟),並且採用流式pipeline的方式處理資料使其能夠節省記憶體的同時,更快的響應查詢。

相對Hive、Spark引擎,Presto存在不少優勢:

1. shuffle資料不落地

2.流式任務執行而不是按stage級別執行

3.split為執行緒級別的排程

4.資料來源外掛化

這使Presto 特別適合互動式跨源資料查詢,Presto也並不是完美的,比如因為其流式pipeline執行方式的設計,使其喪失了task級別的recovery機制,所以Presto目前不是特別適合用來做大規模的ETL查詢,當然目前社群也在通過對presto進行各種優化來使其適應更大規模的查詢,比如Presto Ulimited和Presto on Spark專案。

2.1  使用場景

在B站,Presto主要承擔了ADHOC查詢、BI查詢、DQC(資料校驗,包括跨資料來源校驗)、AI ETL作業、資料探查等場景.

2.2 叢集規模

目前Presto總共7個叢集,分佈在2個機房,最大單叢集節點400+,總節點數在1000+。

2.3 業務增長

目前我們叢集每日查詢數16w左右,每日查詢HDFS資料量10PB左右,目前相比2020年初日查詢數增長10倍。

2.4 Presto架構

目前我們是基於PrestoSQL-330(現在改名叫Trino)版本進行二次開發和優化的,我們所有叢集接入到公司Caster釋出系統,由k8s進行排程管理,包括jmx的採集、監控dashboard、告警,極大的簡化了我們運維的成本。整體架構如下圖:

目前我們所有的presto查詢,包括Cli、JDBC、PyHive都是直接提交到Presto-gateway,由gateway來負責路由。

gateway改造:

1. 支援多coordinator排程,相同query只能排程到一個叢集的一個coordinator。

2. 探測coordinator的狀態,如果不Active,則踢出排程列表,給無損釋出提供可能。

3. 支援按使用者/作業ID來選擇機房排程,同時我們還會對Query通過Parser解析依賴的表和分割槽,根據哪個機房流量讀取大,將Query排程到哪個叢集。

4. 探測coordinator的負載,主要包括記憶體、作業是否堵住,支援跨叢集負載均衡排程。

5. 提取了Query特徵,相同特徵Query提交我們會有一系列攔截措施。

presto改造:

1. coordinator支援多活,解決了coordinator的單點問題。

2. coordinator支援按業務來進行排程,不同業務排程到不同的Worker節點,同時為了增加叢集利用率,我們支援按時間跨Label排程, 比如凌晨為adhoc和bi查詢的低峰,但卻為dqc的高峰,這個時候dqc能夠跨Label使用其他Label的計算資源。

2.5 叢集執行情況

目前adhoc叢集執行百分位如下圖所示:

3

穩定性改進

3.1 Coordinator多活改造

Presto 是典型的主從架構,Coordinator作為主節點,其存在單點問題,當主節點掛了之後,整個叢集不能對外提供服務,為了增加叢集的穩定性和可靠性,我們對Presto服務發現以及資源全域性化做了改造,使coordinator可以支援橫向擴充套件。架構圖如下:

1. 因為Coordinator雖然能夠支援橫向擴充套件,但是它並不是無狀態化的,所以我們對gateway進行了改造,一條query提交過來之後,針對這個叢集,如果是多活,則隨機選擇一個coordinator,並且將該query和coordinator的mapping保持到redis,之後該query的所有請求都會保持一致。

2.coordinator啟動的時候會通過加全域性鎖的方式,嘗試將自己節點ip和埠寫入State Store服務,然後啟動DiscoveryServer服務。

3.各節點的ServiceInventory獲取上述寫入到State Store中的節點資訊來作為Discovery Service,所有節點都會向該地址傳送announce,DiscoveryServer會進行儲存,然後DiscoveryNodeManager通過GET請求到‘/v1/service‘便能拿到所有節點資訊。

4.為了保持多Coordinator具有整個叢集的全域性資源資訊,每個Coordinator會將自己的query和resource group的資訊寫入State Store,同時會從State Store中不斷的讀取並更新自己節點上的資源資訊,這能保證各個Coordinator都有全域性的資源使用情況,避免了過度排程導致叢集負載過高而不穩定。

3.2 Label改造

大家都知道,Presto 在資源隔離方面做的並不好,coordinator的resource group 只能在使用者提交查詢的時候進行限制,比如超過query數,記憶體使用超過比例,cpu使用超過quota則新提交的查詢會進到queued佇列中,worker端雖然有MultilevelSplitQueue 來對執行時間長的task進行排程限制,但是並不能做到很好的資源隔離。

在經過多次因adhoc大查詢影響報表查詢之後,同時又不想拆分叢集(運維成本增加),所以我們對presto進行了改造,改造思路比較簡單:

1.開發一個服務,負責將已經劃好label的配置檔案load進記憶體,並實時檢測檔案是否更新,如有更新重新load。

2.DiscoveryNodeManager 通過服務發現拿到所有Node之後,將label資訊寫進InternalNode中。

3.NodeSelector 構建NodeMap的時候也就有了節點label資訊了。

4.客戶端根據不同業務將label資訊傳遞到coordinator,排程的時候根據label去get到相應的節點即可。

3.3 實時懲罰

Label的改造能隔離業務之間互相影響,但是並不能解決相同業務Label下受大查詢的影響,另外社群版本的cpu/記憶體限制只能限制新提交的語句,已經在執行的語句會不受cpu/記憶體限制,所以我們開發了一套實時懲罰的機制。

架構圖如下,通過實時收集到各個query的cpu使用情況,基於resource group配置的cpu quota資訊,對超過quota的resource group,直接向所有worker節點下發懲罰訊息,worker收到訊息後,會停止對該resource group的task進行排程,等到該resource group使用資源低於quota後,再通知worker重新對task進行排程執行。

目前的使用場景是,ETL叢集如果有大量大查詢同時提交,就會出現叢集不穩定,比如某個worker被打掛,針對大查詢,presto gateway會在第二次提交到集群后,自動路由到slow resource group,針對該resource group我們開啟了懲罰機制,避免過多大查詢影響整個叢集的穩定性。同時我們的懲罰機制是否開啟,以及懲罰quota大小都支援動態更新,隨時可以調整配置不需要重啟叢集。

懲罰演算法虛擬碼如下:

成員變數:(以GroupA舉例)

  • punishCpuLimit :GroupA所配置的cpu算力上限

  • usagePerSecond:GroupA實時統計到的每秒所使用的cpu消耗

  • cSum:GroupA累計消耗的cpu總和

long cSum = lastCSum + usagePerSecond;

if (cSum <= punishCpuLimit) {

cSum = 0;

} else if (cSum >= 2 * punishCpuLimit) {

// 這邊記錄當前resource group 需要懲罰

cSum = cSum - punishCpuLimit;

} else if (punishCpuLimit < cSum && cSum < 2 * punishCpuLimit) {

cSum = cSum - punishCpuLimit;

}

程式碼做了如下改造:

1. ResourceGroup中除了通過running query收集原有cpu time資訊,我們還收集了schedule time和running driver指標供懲罰選擇。

2. 如下圖所示,worker端我們實現Grouped2MultilevelSplitQueue物件,該物件維護了一個resource group和MultilevelQueue的mapping,並且會接受處理coordinator的懲罰資訊。

3. coordinator端實現了一個Punish Service,用來實時計算各resource group是否超過了quota設定,如果超過,則下發懲罰資訊給所有worker節點

3.4 查詢限制

我們在presto gateway中開發了一系列規則來對使用者查詢行為以及bad sql進行限制,策略包括:

1.  對查詢語句進行了特徵提取,包括去除註釋,替換表示式的具體值為萬用字元,取md5為該查詢的特徵值,方便進行相同特徵查詢限制。

2. 針對INSUFFICIENT_RESOURCES型別超記憶體的查詢,第二次查詢直接攔截不讓提交,因為再次提交依然會失敗,浪費叢集資源。

3. 讀HDFS超過30TB的查詢第一次會在執行時被kill掉,第二次提交會被gateway檢測到後直接攔截。

4. 短時間大量提交的查詢會進行攔截限流,比如1分鐘提交超過30條相同特徵的query。

5. 回刷資料任務統一排程到一個獨立的resource group,避免影響正常ETL/ADHOC任務。

6. 針對worker oom killer的kill掉的查詢,如果其佔用記憶體超過一定閾值,那麼之後該特徵query都會排程到slow resource group進行限制。

3.5 其他改造

1. worker端開發了oom killer服務

不斷的從MemoryMXBean拿記憶體使用情況,當worker堆使用超過一定百分比,並且持續超過一定時間,就開始選擇佔用最大記憶體的task kill掉。

2.監控告警

通過presto暴露的jmx,然後將資訊採集吐到grafana,可以很方便的監控到叢集的一些關鍵資訊,並且基於這些資訊做了一些告警。

4

可用性改進

4.1 支援隱式轉換

Hive和Spark預設就支援隱式資料型別的轉換,比如query select 1 = '1' hive能正確返回true,而presto直接報語法錯誤,我們通過在ExpressionAnalyzer中對邏輯表示式和算術表示式進行了判斷,如果左右表示式不一致,同時能夠相容的話,直接通過加cast進行型別強轉。

hive> select 1 = '1';

true

Time taken: 3.1 seconds, Fetched: 1 row(s)


presto> select 1 = '1';

Query 20220301_114217_08911_b5gjq failed: line 1:10: '=' cannot be applied to integer, varchar(1)

select 1 = '1'


presto> set session implicit_conversion=true;

SET SESSION

presto> select 1 = '1';

_col0

-------

true

(1 row)

4.2 相容HIVE UDF

我們相容了Hive自帶的UDF和GenericUDF, 並且如果在Presto自帶以及hive-apache中沒有的UDF,會嘗試從hive metastore去獲取一下是否存在該Function,如果存在,則將UDF所在的jar包download到本地,然後通過classloader進行load。

1. UDF入參和出參轉換,Hive GenericUDF入參為DeferredObject,需要根據Presto引數型別進行相應的轉換,比如Presto的VARCHAR則需要通過Slice的toString轉換成String型別,返回結果為ObjectInspector,不同的返回型別需要轉換成Presto相應的資料型別,比如是StringObjectInspector則需要封裝到Slice中。

2. 通過codegen方式將HiveUDF呼叫方法生成到MethodHandle中。

3. 因Hive UDF未考慮併發問題,所以存線上程安全問題,構建的GenericUDF需要通過ThreadLocal來隔離。

4.為了防止各個UDF依賴不同版本的jar導致衝突,通過對每個UDF的jar new一個新classLoader進行隔離,該classLoader的parent為Hive plugin ClassLoader(已經載入了Hive-exec相關類)。

presto> select b_security_mask_email('[email protected]',0);

_col0

------------------

1*3@bilibili.com

(1 row)

4.3 支援insert overwrite table/ directory語法

Presto原生要支援Overwrite語義需要在insert into語句中設定'insert_existing_partitions_behavior' session引數來控制,為了保持和hive語法的一致性,我們通過修改Presto的語法檔案, 使其先支援接受Insert overwrite table語法,然後在遍歷AST樹時,遇到InsertOverwrite節點則生成Insert節點,同時將overwrite含義一路透傳到worker,修改其Insert語義為overwrite, 同時也支援hive的動態和靜態分割槽寫法。

因為adhoc系統針對大查詢的結果下載功能,通過將使用者sql修改為insert overwrite directory ‘location’ select語法,將結果儲存到hdfs,然後通過下載中心提供給使用者匯出,hive和spark是支援的,我們也對presto進行了改造支援。

presto> insert overwrite table tmp_db.tmp_table select '1' as a, '2' as b;

INSERT OVERWRITE: 1 row


presto> insert overwrite directory "/tmp/xxx/insert1" select value.features from ai.xxxTable limit 10;

rows

------

10

(1 row)

4.4 相容Hive Ranger Plugin

Ranger在2.0版本開始支援Presto plugin,我們基於Ranger1.2版本做了不少優化,升級的需求不大,所以我們在1.2版本的Ranger中加入了Presto的plugin,同時2.0版本的Ranger是基於3段式來進行賦權,而我們大部分的許可權需求還都在hive,所以我們對plugin進行了一些改造,使其相容了Ranger Hive賦權policy,也就是說通過對Hive plugin賦權一次,presto和hive、spark引擎共用policy,目前庫,表,row-level filtering和column masking都支援。

4.5 支援Hint語法

我們在語法定義層面做了hint的實現,支援常見session引數通過寫在sql hint上進行配置,比如join型別的選擇,query執行時間,是否關閉cache讀,是否開啟spill to disk等。

/*+ query_max_execution_time= '1h', scale_writers=true*/

SELECT clo_1, col_2 FROM xxxx WHERE log_data='20211019'

4.6 支援having alias、group by alias語法

針對如下查詢,因為percent是一個alias欄位,presto查詢會報錯,而hive和spark是支援該語法的,我們通過拿到node的SelectItems進行對比,並替換alias欄位資訊。

presto> select log_date, sum(job_percent) as percent from test.test_report group by log_date having percent > 0;

log_date | percent

----------+-----------------------

20211231 | 0.03625346663870051

4.7 其他改造

  1. 基於Linkedin開源的Coral支援讀Hive檢視。

  2. 支援動態載入和更新Resource group。

  3. 支援多資料來源聯合查詢,資料來源包括Kafka, JDBC, Tidb,Clickhouse,Iceberg,Hudi,  ES,其中JDBC connector支援按splitField自動切分成多個Split並行讀表。

  4. 基於HDFS的共享JAR包和配置,做到動態新增Catalog,無需重啟叢集。

  5. 在Web ui中展示了Query queued具體原因。

  6. 語句結束後將QueryInfo序列化寫入HDFS,實現了Job History服務,更長時間保留語句資訊,方便對出問題語句進行問題定位。

  7. 叢集實現無損釋出,Presto worker程序通過監聽釋出系統kill -15訊號,然後將自身狀態置為非ACTIVE,不接受新任務,等所有任務結束再退出程序。

  8. 實現了和Hive一樣的點邊式的欄位級血緣和運算元影響關係,細化了血緣模型。

5

效能提升

5.1 Presto on alluxio

通過收集presto的血緣資訊,我們發現少數表會被反覆讀取,根據表最近7天訪問的平均值作為熱度,從下圖可以發現,很多表一天被訪問好幾百次。

基於這樣一個事實,因為本身Presto和HDFS是存算分離的架構,加上HDFS經常會存在slow rpc,或者熱點Datanode情況。所以我們決定使用Alluxio來快取這部分熱資料,使Presto提升查詢效率的同時,也減少了HDFS的壓力,減少了受HDFS的影響。

架構圖如上,通過將Presto的血緣吐出到kafka,然後對血緣進行分析,比如如下血緣資訊,只需要對json進行解析就能拿到查詢的表,以及讀了哪些分割槽。

我們也做了以下事情來確保熱表資料被Presto識別,並且自動轉換到Alluxio中讀取:

1. 消費血緣資料,按叢集解析到分割槽級別訪問資訊並落地到Tidb。

2. 開發cache tag管理服務,主要用來對分割槽進行打tag(tag儲存在hms中的Partition Parameters),並且通過分割槽訪問情況,計算其TTL,對於超過TTL的分割槽會進行untag,並且從alluxio中刪除路徑。

如下圖所示,如果這個分割槽對哪個叢集是熱表,那麼只需通過cache_tag來控制哪個叢集應該從Alluxio讀資料。

3. 開發cache invalidate服務主要為了保證hdfs和alluxio的資料一致性,該服務會監聽Hive meta event,分割槽更新則刪除alluxio中的分割槽路徑,同時對於已經打tag的表,該服務還監聽add partition事件,然後給新增分割槽打tag,並且通過alluxio的distributed load 向Alluxio JobMaster傳送請求,load檔案到alluxio worker。

alluxio自身可以通過下面引數來控制是否每次和底層HDFS元資料是否一致,但是為了不受偶爾NN慢rpc影響,我們通過上述服務來保證資料的一致性,目前Presto adhoc叢集已經接入了HDFS的Observer NN,在RPC讀請求延遲方面得到了很大的改善,可以考慮直接通過alluxio來保證資料的一致性。

alluxio.user.file.metadata.sync.interval=0

alluxio.user.file.metadata.load.type=ALWAYS

4. Presto這邊做的改造就比較簡單,在load split的地方拿到分割槽的Parameters,如果含有cache_tag的資訊,並且如果和當前叢集是吻合的,那麼將HDFS的路徑改成Alluxio的地址,真正建立連線時候還會檢測一次Alluxio是否連通,如果有問題,會繼續降級讀HDFS。

效果如下:

通過Presto的TPC-DS benchmark,基本上平均能夠達到20-30%左右的效能提升,同時被打了tag的分割槽查詢更加穩定,如下圖所示,HDFS經常會有幾十秒的讀RPC延遲,從Alluxio的liststatus rpc時間來看(耗時低於10ms),訪問到熱分割槽的rpc請求更穩定,也更快。目前我們BI報表有30%的分割槽已經被打上了tag,未來計劃打上更多的熱分割槽tag。

5.2 多機房架構

隨著B站業務高速發展,資料量和作業數增長也變得越來越快,機房機位快速消耗,容量達到上限後會阻塞業務的發展。一個機房資源既然有限,那我們擴充套件為多個機房,引入異地第二機房部署Hadoop和Presto叢集,  但多機房面臨的問題一個是跨機房資料互動頻寬資源有限,存在瓶頸,一個是網路抖動造成的服務SLA會有影響。在此背景下我們設計了Presto的多機房架構,對原有的架構進行改造,保證從使用者視角仍然是一個機房。使用者側統一接入Presto gateway,每個機房我們都獨立部署一套Presto叢集,這樣Presto內部shuffle資料就不會跨機房產生流量。對於Hive外部資料來源讀取,分兩種場景,ETL場景下由於我們做了基於單元化思路的資料和業務遷移,將高內聚的業務和資料遷移到第二機房,所以作業通過Presto gateway時會自動按照使用者或者作業ID的mapping關係路由到對應機房的叢集。adhoc場景下,臨時產生的需求,一般無法預測流量,我們做法是在Presto gateway中解析出語句需要訪問的表和分割槽路徑,並從Namenode proxy中獲取到路徑所在的機房位置和資料大小資訊進行計算,預估出作業放到各機房後所產生的跨機房流量,以節省跨機房頻寬為目標,再綜合每個叢集的實際負載情況來決定將作業排程到哪個機房。比如:

  • 訪問單張表:排程到資料所在機房

  • 訪問多張表:

a. 多表在同一機房,作業路由到資料所在機房

b. 多表在不同機房,路由到資料量較大的表所在機房,較小的表限流讀

此外我們也做了兩個優化,一個是計算下推優化,利用Presto的Connector多源查詢能力,實現了跨IDC connector,將第二機房叢集視為一個connector,在訪問多表不同機房的場景下將SQL做改寫,子查詢計算邏輯儘可能下推到第二機房叢集進行部分計算處理,再和主機房計算結果進行合併,以減少跨機房流量頻寬。另一個我們也通過血緣分析出跨機房讀熱分割槽,提前載入到本地alluxio進行快取,儘量避免下次跨機房訪問。

5.3 Query result cache

我們之前根據query的md5統計了一下,每天有超過萬條查詢是重複的查詢,如果這部分查詢的結果能夠快取起來,那麼直接將結果返回給客戶端,不僅可以減少叢集壓力也可以提升查詢速度。

對Query的結果做快取,最大的挑戰就是保證使用者查詢的是最新的資料,否則就出現資料質量問題了。

架構如上圖:

1.為了能夠獲取query查詢的表以及分割槽資訊(這部分資訊將用來作為快取的key),我們將邏輯寫在Coordinator,在Coordinator 做完LogicalPlan之後,拿到查詢的表資訊(包括分割槽資訊),然後再加上query本身計算md5作為key,然後從根據key值從redis中查詢看看是否存在快取,如果存在,則將QueryStateMachine置為Cached狀態。這裡再解釋一下為什麼需要獲取查詢的表和分割槽資訊,比如這條sql:select * from db.table where log_date > '20220101', 那麼這條query今天和明天分別執行,讀到的分割槽數是不一樣的。當然我們最近也在準備將這部分邏輯前置到gateway,在gateway中對query進行部分元資料分析,拿到分割槽級別資訊。

2.在獲取結果的邏輯中,加入了快取結果儲存和讀取的邏輯,在儲存快取結果的同時,也會將上述分析拿到的分割槽、列型別資訊和query的mapping關係也儲存起來。

3.同時還開發了快取失效服務,監聽查詢依賴的表分割槽是否有更新,如果有則直接刪除快取。

如下圖所示,兩條一樣的query,第一次執行需要7s,第二次執行只需要300ms,並且從split來看沒有觸發排程。目前每天有5k條Query能夠得到快取加速。

5.4 Raptorx 

Raptorx是Prestodb通過資料快取進行查詢加速的專案(https://github.com/prestodb/presto/issues/13205),得益於軟親和性的排程策略,一個Split或者檔案會通過一致性Hash演算法排程到相同的Worker節點,第一次訪問的時候按照檔案需要讀的offset和length,以細粒度Page(通常1MB)為單位從HDFS快取在本地磁碟,第二次訪問的時候,直接從本地節點的快取訪問,而不需要再遠端讀取資料,因為採用了一致性Hash演算法所以儘可能降低了節點擴縮容時候對現有節點快取命中率的影響,如果分片Hash完選擇的第一臺節點由於負載過高不宜分配,會自動順延降級到後一臺節點排程,如果後一臺節點負載也很高,則繼續降級排程策略進行隨機排程,同時關閉本次查詢從快取讀的開關,這樣檔案最多物理快取在兩臺節點。同時得益於Split或者檔案能排程到相同機器,那麼針對ORC或者Parquet的一些檔案meta資訊,比如orc檔案的file footer,stripe statistics, row group index資訊等都可以快取到worker程序JVM記憶體中,無需再從HDFS讀取,也有不錯的快取命中率。

上面也提到,我們其實上線了Alluxio叢集來快取資料,那為什麼還需要引入raptorx呢,raptorx相對於Alluxio叢集模式有幾大好處:

  • Raptorx基於Alluxio local cache,是Page級別(預設1MB)的快取,而叢集模式必須快取整個檔案,通常使用者經常訪問的資料集中在某張表的幾個列,而列存格式中同一列資料是緊湊存放一起的,細粒度快取只需要快取某些常用列的資料,不需要整個檔案快取,減少快取管理開銷。

  • 上面提到,可以針對orc和parquet快取檔案和stripe、row group等的meta資訊,近一步提升查詢效能。

  • 本地資料管理以Library方式嵌入到Presto worker程序中,不受Alluxio叢集穩定性影響。

我們backport了Prestodb Raptorx相關的patch,並且做了一些其他改造:

1. 社群通過session來控制一個query級別的local cache,如果該query 開啟local cache,那麼query依賴的所有表的所有分割槽資料都會進SSD,粒度不夠精細,我們是根據分割槽是否被標記成熱分割槽,然後只會將熱分割槽進行軟親和性排程。

2. Raptorx中的hive metastore versioned cache是基於FB的內部版本,要使用這塊功能需要對hive metastore改造thrift api暴露出分割槽和表的版本資訊,我們利用Table和Partition的lastDDLTime來作為version,解決meta版本不一致問題,及時失效meta快取並重新載入。

3. 對orc和parquet都支援了檔案元資料的快取,並根據hive檔案的lastModifiedTime及時失效過期快取。

4. 基於alluxio local cache進行了改造,支援基於檔案的lastModifiedTime來判斷資料是否失效,並及時清理過期page。

5. 因為alluxio local cache目前只支援掛載一塊磁碟,實現了基於剩餘空間的VolumeChoosingPolicy來對多塊磁碟進行儲存管理。

6. 每次Presto worker啟動後必須恢復完所有page後才開始對外提供服務,這樣儘可能保證Page的快取命中率。

我們拿了一些query進行測試,如下圖所示,部分query能夠得到數倍效能提升。

PS:如果有用到viewfs來做hdfs的federation,那麼應該會遇到一些問題,大家可以參考:

https://github.com/prestodb/presto/pull/17390

https://github.com/prestodb/presto/pull/17365

https://github.com/prestodb/presto/pull/17367

5.5 支援struct 欄位型別下推

trino在高版本(334,https://github.com/trinodb/trino/pull/2672) 支援struct欄位型別的下推優化,包括project和filter的下推,我們AI團隊經常會用到巢狀資料型別,這個優化能夠給查詢帶來不小的提升,我們將整個功能backport到我們自己的版本,如下圖所示,有和沒有deference下推,執行計劃的project和filter有巨大的差別,實際測試下來有的sql shuffle資料量能夠達到幾十上百倍的減少,查詢效能也能夠提升數倍。

測試語句:

SELECT

A.ip,

B.info.mid

FROM

tmp_bdp.tmp_struct_test A

JOIN tmp_bdp.tmp_struct_test B on A.ip = B.ip

其中info是struct型別,包含9個String型別欄位,執行效果見下圖,Scan input size和shuffle size大幅度減少

目前我們只支援struct欄位型別下推,還無法做到map和array型別的下推,而我們線上存在不少array中巢狀struct的資料型別,大部分sql通過unnest來對array進行展開,之後目標是繼續深入研究針對array和map的下推支援。

5.6 JDK版本從8升級到zulu JDK 11

我們一開始想在升級JDK同時將垃圾收集器切換到ZGC來降低單次 GC的時間,提升叢集整體效能。benchmark測下來也確實ZGC效果最好,但是因為JDK11的zgc沒有class unloading功能,導致presto codegen出來的大量class無法回收導致metaspace洩漏,所以升級了JDK11依然使用G1收集器。

升級後,JDK11 g1收集器gc吞吐量是98%,相比JDK8有2個點的提升。同時JDK11提供了一些新的監控和診斷工具,比如JFR能幫助我們後續進一步分析JVM執行效能和定位問題。

JDK11 ZGC收集器效能指標:

JDK8 G1收集器關鍵效能指標:

JDK11 G1收集器關鍵效能指標:

5.7 支援動態過濾

動態過濾是指作業在執行時動態生成過濾器的功能(簡稱Dynamic Filter),適用於高選擇性Join場景,以此減少IO以及後續的計算量。目前,trino高版本已支援動態過濾,我們借鑑了trino高版本的動態過濾,實現了BroadcastJoin,Dynamic Partition Pruning以及Partitioned Join。

整體架構:

具體改造如下:

  1. Coordinator端新增PredicatePushDown優化器下推DynamicFilter資訊。

  2. Worker新增“Collect”運算元,通過PageSourceProvider下推DynamicFilter到原始檔讀取。

  3. Worker新增上報DynamicFilter資訊。

  4. Coordinator新增DynamicFilterService,對Worker彙報上來的DynamicFilter資訊做整合,再將整合後的DF資訊下發到各個worker做過濾。

改造效果:

  1. 對特定Dynamic Partition Pruning的SQL,效果明顯:

左表讀資料量從6.36T減少到358GB

2. benchmark效果:

其中,benchmark中效果好的Query主要由以下兩個因素決定:

  1. Query型別為高選擇性join場景。

  2. 底層檔案的過濾效能,例如orc檔案寫入的時候如果有sort by欄位,stripe過濾時會更高效;或者底層orc檔案如果開啟了bloomFilter,也會提高過濾效果。

總結:

  1. 動態過濾適用於Hive Connector,支援OrcFile/Parquet,下推到資料來源。

  2. 支援分割槽表的分割槽裁剪,非分割槽表的行過濾。

  3. 支援Join策略包括Broadcast Join/Dynamic Partition Pruning/Partitioned Join。

  4. 支援Join語法包括InnerJoin/SemiJoin。

5.8 其他改進

  1. 針對小檔案產生split數過多的問題,將它們合併成一個大split進行排程,減少split數和排程壓力。

  2. HDFS讀請求接入Observer Namenode,減少因Active Namenode slow rpc造成的影響。

  3. 開啟FileStatus cache,  減少對NN的RPC訪問壓力。

  4. 針對大查詢,開啟spill功能,將page寫入本地磁碟,緩解記憶體壓力。

  5. Coordinator側查詢執行計劃快取,減少生成和優化plan的時間。

  6. 多個stage的語句自動轉換成phased排程執行方式,降低叢集壓力。

  7. 當Hive分割槽或者表的Stats資訊不準的情況下,比如row_count為0,但size不為0,Presto在做Join選擇的時候會優先選擇Broadcast join,如果是一張大表,那麼整個查詢效率非常低,我們在CBO計算模組中,如果row_count為0,那麼我們通過拿分割槽的datasize資訊乘以一個預設的膨脹比來作為該表或分割槽的scan資料量,然後通過該值來進行CBO的計算,確保更準確的選擇更合適的Join型別。

6

未來規劃

  1. Presto叢集支援HPA, 低峰時自動對Presto Worker節點進行優雅縮容給到Yarn混部叢集,從而提升機器利用率,達到降本增效目的。

  2. 啟發式索引,在讀資料前提前過濾Split,orc的檔案和stripe,減少讀資料量。

  3. 支援自動物化檢視,根據使用者常見的語句,自動建立和重新整理物化檢視,無需使用者操作和管理開銷,查詢時改寫語句複用先前物化的資料。

  4. 複雜資料型別Array/Map讀優化。

  5. 基於HBO,ETL大查詢自動路由到Presto on spark,緩解Presto叢集壓力,提升作業成功率。

我們會和業界同行和開源社群保持密切技術交流,在服務好內部使用者作業的同時,也會積極反饋社群,共建社群生態。