探究Presto SQL引擎(2)-淺析Join

語言: CN / TW / HK

作者:vivo網際網路技術-Shuai Guangying

在《探究Presto SQL引擎(1)-巧用Antlr》中,我們介紹了Antlr的基本用法以及如何使用Antlr4實現解析SQL查詢CSV資料,更加深入理解Presto查詢引擎支援的SQL語法以及實現思路。

本次帶來的是系列文章的第2篇,本文梳理了Join的原理,以及Join演算法在Presto中的實現思路。通過理論和實踐的結合,可以在理解原理的基礎上,更加深入理解Join演算法在OLAP場景下的工程落地技巧,比如火山模型,列式儲存,批量處理等思想的應用。

一、背景

在業務開發中使用資料庫,通常會有規範不允許過多表的Join。例如阿里巴巴開發手冊中,有如下的規定:

【強制】超過三個表禁止Join。需要Join的欄位,資料型別必須絕對一致;多表關聯查詢時,保證被關聯的欄位需要有索引。說明:即使雙表Join也要注意表索引、SQL效能。

在大資料數倉的建設中,儘管我們有星型結構和雪花結構,但是最終交付業務使用的大多是寬表。

可以看出業務使用資料庫中的一個矛盾點:我們需要Join來提供靈活的關聯操作,但是又要儘量避免多表和大表Join帶來的效能問題。這是為什麼呢?

二、Join的基本原理

在資料庫中Join提供的語義是非常豐富的。簡單總結如下:

圖片

通常理解Join的實現原理,從Cross Join是最好的切入點,也就是所謂的笛卡爾積。對於集合進行笛卡爾積運算,理解非常簡單,就是窮舉兩個集合中元素所有的組合情況。在資料庫中,集合就對應到資料表中的所有行(tuples),集合中的元素就對應到單行(tuple)。所以實現Cross Join的演算法也就呼之欲出了。

實現的程式碼樣例如下:

List<Tuple>  r = newArrayList(
        new Tuple(newArrayList(1,"a")),
        new Tuple(newArrayList(2,"b")));
 
List<Tuple>  s = newArrayList(
        new Tuple(newArrayList(3,"c")),
        new Tuple(newArrayList(4,"d")));
 
int cnt =0;
for(Tuple ri:r){
    for(Tuple si:s){
        Tuple c = new Tuple().merge(ri).merge(si);
        System.out.println(++cnt+": "+ c);
    }
}
/**
 * out:
 1: [1, a, 3, c]
 2: [1, a, 4, d]
 3: [2, b, 3, c]
 4: [2, b, 4, d]
 */

可以看出實現邏輯非常簡單,就是兩個For迴圈巢狀。

2.1 Nested Loop Join演算法

在這個基礎上,實現Inner Join的第一個演算法就順其自然了。非常直白的名稱:Nested Loop,實現關鍵點如下:

圖片

(來源:Join Processing in Relational Databases)

其中,θ操作符可以是:=, !=, <, >, ≤, ≥。

相比笛卡爾積的實現思路,也就是添加了一層if條件的判斷用於過濾滿足條件的組合。

對於Nested Loop演算法,最關鍵的點在於它的執行效率。假如參與Join的兩張表一張量級為1萬,一張量級為10w,那麼進行比較的次數為1w*10w=10億次。在大資料時代,通常一張表資料量都是以億為單位,如果使用Nested Loop Join演算法,那麼Join操作的比較次數直接就是天文數字了。所以Nested Loop Join基本上是作為萬不得已的保底方案。Nested Loop這個框架下,常見的優化措施如下:

  • 小表驅動大表,即資料量較大的集作為於for迴圈的內部迴圈。

  • 一次處理一個數據塊,而不是一條記錄。也就是所謂的Block Nested Loop Join,通過分塊降低IO次數,提升快取命中率。

值得一提的是Nested Loop Join的思想雖然非常樸素,但是天然的具備分散式、並行的能力。這也是為什麼各類NoSQL資料庫中依然保留Nested Loop Join實現的重要一點。雖然單機序列執行慢,但是可以並行化的話,那就是加機器能解決的問題了。

2.2 Sort Merge Join演算法

通過前面的分析可以知道,Nested Loop Join演算法的關鍵問題在於比較次數過多,演算法的複雜度為O(m*n),那麼突破口也得朝著這個點。如果集合中的元素是有序的,比較的次數會大幅度降低,避免很多無意義的比較運算。對於有序的所以Join的第二種實現方式如下所描述:

圖片

(來源:Join Processing in Relational Databases)s)

通過將JOIN操作拆分成Sort和Merge兩個階段實現Join操作的加速。對於Sort階段,是可以提前準備好可以複用的。這樣的思想對於MySQL這類關係型資料庫是非常友好的,這也能解釋阿里巴巴開發手冊中要求關聯的欄位必須建立索引,因為索引保證了資料有序。該演算法時間複雜度為排序開銷O(m_log(m)+n_log(n))+合併開銷O(m+n)。但是通常由於索引保證了資料有序,索引其時間複雜度為O(m+n)。

2.3 Hash Join演算法

Sort Merge Join的思想在落地中有一定的限制。所謂成也蕭何敗蕭何,對於基於Hadoop的數倉而言,保證資料儲存的有序性這個點對於效能影響過大。在海量資料的背景下,維護索引成本是比較大的。而且索引還依賴於使用場景,不可能每個欄位都建一個索引。在資料表關聯的場景是大表關聯小表時,比如:使用者表(大表)--當日訂單表(小表);事實表(大表)–維度表(小表),可以通過空間換時間。回想一下,在基礎的資料結構中,tree結構和Hash結構可謂資料處理的兩大法寶:一個保證資料有序方便實現區間搜尋,一個通過hash函式實現精準命中點對點查詢效率高。

在這樣的背景下,通過將小表Hash化,實現Join的想法也就不足為奇了。

圖片

(來源:Join Processing in Relational Databases)

而且即使一張表在單機環境生成Hash記憶體消耗過大,還可以利用Hash將資料進行切分,實現分散式能力。所以,在Presto中Join演算法通常會選擇Hash Join,該演算法的時間複雜度為O(m+n)。

通過相關資料的學習,可以發現Join演算法的實現原理還是相當簡單的,排序和Hash是資料結構最為基礎的內容。瞭解了Join的基本思想,如何落地實踐出來呢?畢竟talk is cheap。在專案中實現Join之前,需要一些鋪墊知識。通常來說核心演算法是皇冠上的明珠,但是僅有明珠是不夠的還需要皇冠作為底座。

三、Join工程化前置條件

3.1 SQL處理架構-火山模型

在將Join演算法落地前,需要先了解一下資料庫處理資料的基本架構。在理解架構的基礎上,才能將Join演算法放置到合適的位置。在前面系列文章中探討了基於antlr實現SQL語句的解析。可以發現SQL語法支援的操作型別非常豐富:查詢表(TableScan),過濾資料(Filter),排序(Order),限制(Limit),欄位進行運算(Project), 聚合(Group),關聯(Join)等。為了實現上述的能力,需要一個具備並行化能力且可擴充套件的架構。

1994年Goetz Graefe在論文《Volcano-An Extensible and Parallel Query Evaluation System》提出了一個架構設計思想,這就是大名鼎鼎的火山模型,也稱為迭代模型。火山模型其實包含了檔案系統和查詢處理兩個部分,這裡我們重點關注查詢處理的設計思想。架構圖如下:

圖片

(來源:《Balancing vectorized execution with bandwidth-optimized storage》)

簡單解讀一下:

**職責分離:**將不同操作獨立成一個的Operator,Operator採用open-next-close的迭代器模式。

例如對於SQL 。

SELECT Id, Name, Age, (Age - 30) * 50 AS Bonus
FROM People
WHERE Age > 30

對應到Scan, Select, Project三個Operator,資料互動通過next()函式實現。上述的理論在Presto中可以對應起來,例如Presto中幾個常用的Operator, 基本上是見名知意:

圖片

動態組裝:Operator基於SQL語句的解析實現動態組裝,多個Operator形成一個管道(pipeline)。

例如:print和predicate兩個operator形成一個管道:

圖片

(來源: 《Volcano-An Extensible and Parallel Query Evaluation System》)

在火山模型的基礎上,Presto吸收了資料庫領域的其他思想,對基礎的火山模型進行了優化改造,主要體現在如下幾點:

  1. Operator資料處理優化成一次一個Page,而不是一次行(也稱為tuple)。

  2. Page的儲存採用列式結構。即相同的列封裝到一個Block中。

批量處理結合列式儲存奠定了向量化計算的基礎。這也是資料庫領域的優化方向。

3.2 批量處理和列式儲存

在研讀Presto原始碼時,幾乎到處都可以看到Page/Block的身影。所以理解Page/Block背後的思想是理解Presto實現機制的基礎。有相關書籍和文件講解Page/Block的概念,但是由於這些概念是跟其他概念混在一起呈現,導致一時間不容易理解。

筆者認為Type-Block-Page三者放在一起,更容易理解。我們使用資料庫,通常需要定義表,欄位名稱,欄位型別。在傳統的DBMS中,通常是按行儲存資料,通常結構如下:

圖片

(來源:《資料庫系統實現》)

但是通常OLAP場景不需要讀取所有的欄位,基於這樣的場景,就衍生出來了列式儲存。就是我們看到的如下結構:

圖片

(來源:《Presto技術內幕》)

即每個欄位對應一個Block, 多個Block的切面才是一條記錄,也就是所謂的行,在一些論文中稱為tuple。通過對比可以清楚看出Presto中,Page就是典型了列式儲存的實現。所以在Presto中,每個Type必然會關聯到一種Block。例如:bigint型別就對應著LongArrayBlockBuilder,varchar型別對應著VariableWidthBlock。

理解了原理,操作Page/Block就變得非常簡單了,簡單的demo程式碼如下:

import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.google.common.collect.Lists;
import io.airlift.slice.Slice;
 
import java.util.List;
 
import static io.airlift.slice.Slices.utf8Slice;
 
/**
 * PageBlockDemo
 *
 * @version 1.0
 * @since 2021/6/22 19:26
 */
public class PageBlockDemo {
 
    private static Page buildPage(List<Type> types,List<Object[]> dataSet){
        PageBuilder pageBuilder = new PageBuilder(types);
        // 封裝成Page
        for(Object[] row:dataSet){
            // 完成一行
            pageBuilder.declarePosition();
            for (int column = 0; column < types.size(); column++) {
                BlockBuilder out =  pageBuilder.getBlockBuilder(column);
 
                Object colVal = row[column];
                if(colVal == null){
                    out.appendNull();
                }else{
                    Type type = types.get(column);
                    Class<?> javaType = type.getJavaType();
                    if(javaType == long.class){
                        type.writeLong(out,(long)colVal);
                    }else if(javaType == Slice.class){
                        type.writeSlice(out, utf8Slice((String)colVal));
                    }else{
                        throw new UnsupportedOperationException("not implemented");
                    }
                }
            }
        }
        // 生成Page
        Page page = pageBuilder.build();
        pageBuilder.reset();
        return page;
    }
 
    private static void readColumn(List<Type> types,Page page){
        // 從Page中讀取列
        for(int column=0;column<types.size();column++){
            Block block = page.getBlock(column);
            Type type = types.get(column);
            Class<?> javaType = type.getJavaType();
 
            System.out.print("column["+type.getDisplayName()+"]>>");
            List<Object> colList = Lists.newArrayList();
            for(int pos=0;pos<block.getPositionCount();pos++){
                if(javaType == long.class){
                    colList.add(block.getLong(pos));
                }else if(javaType == Slice.class){
                    colList.add(block.getSlice(pos,0,block.getSliceLength(pos)).toStringUtf8());
                }else{
                    throw new UnsupportedOperationException("not implemented");
                }
            }
            System.out.println(colList);
        }
    }
 
    public static void main(String[] args) {
        /**
         * 假設有兩個欄位,一個欄位型別為int, 一個欄位型別為varchar
         */
        List<Type> types = Lists.newArrayList(BigintType.BIGINT, VarcharType.VARCHAR);
 
        // 按行儲存
        List<Object[]> dataSet = Lists.newArrayList(
                new Object[]{1L,"aa"},
                new Object[]{2L,"ba"},
                new Object[]{3L,"cc"},
                new Object[]{4L,"dd"});
 
        Page page = buildPage(types, dataSet);
 
        readColumn(types,page);
 
    }
}
// 執行結果:
//column[bigint]>>[1, 2, 3, 4]
//column[varchar]>>[aa, ba, cc, dd]

將資料封裝成Page在各個Operator中流轉,一方面避免了物件的序列化和反序列化成本,另一方面相比tuple的方式降低了函式呼叫的開銷。這跟集裝箱運貨降低運輸成本的思想是類似的。

四、Join演算法的工程實踐

理解了Join的核心演算法和基礎架構,結合前文中對antlr實現SQL表示式的解析以及實現where條件過濾,我們已經具備了實現Join的基礎條件。接下來簡單講述一下Join演算法的落地流程。首先在語法層面需要支援Join的語法,由於本文目的在於研究演算法實現流程,而不在於實現完整的Join功能,因此我們暫且先考慮支援兩張表單欄位的等值Join語法。

首先在語法上需要支援Join, 基於antlr語法的定義關鍵點如下:

querySpecification
    : SELECT  selectItem (',' selectItem)*
      (FROM relation (',' relation)*)?
      (WHERE where=booleanExpression)?
    ;
 
selectItem
    : expression  #selectSingle
    ;
 
relation
    : left=relation
      (
        joinType JOIN rightRelation=relation joinCriteria
      )                                           #joinRelation
    | sampledRelation                             #relationDefault
    ;
 
joinType
    : INNER?
    ;
 
joinCriteria
    : ON booleanExpression
    ;

上述的語法定義將Join的關鍵要素拆解得非常清晰:Join的左表, Join的型別,Join關鍵詞, Join的右表, Join的關聯條件。例如,通常我們最簡單的Join語句用例如下(借用presto的tpch資料來源):

select t2.custkey, t2.phone, t1.orderkey from orders t1 inner join customer t2 on t1.custkey=t2.custkey limit 10;

對應著語法和SQL語句用例,可以看到在將Join演算法落地,還需要考慮如下細節點:

  • 檢測SQL語句,確保SQL語句符合語法要求。

  • 梳理表的別名和欄位的對應關係,確保查詢的欄位和表能夠對應起來,Join條件的欄位型別能夠匹配。

  • Join演算法的選取,是HashJoin還是NestedLoopJoin還是SortMergeJoin?

  • 哪個表是build表,哪個表是probe表?

  • Join條件的判斷如何實現?

  • 整個查詢涉及到Operator如何組裝,以實現最終結果的輸出?

我們回顧一下SQL執行的關鍵流程:

圖片

(來源: Query Execution Flow Architecture (SQL Server))

基於上面的流程,問題其實已經有了答案。

  • **Parser:**藉助antlr的能力即可實現SQL語法的檢測。

  • **Binding:**基於SQL語句生成AST,利用元資料檢測欄位和表的對映關係以及Join條件的欄位型別。

  • **Planner:**基於AST生成查詢計劃。

  • **Executor:**基於查詢計劃生成對應的Operator並執行。

以NestedLoop Join演算法為例,瞭解一下Presto的實現思路。對於NestedLoopJoin Join演算法的落地,在Presto中其實是拆解為兩個階段:組合階段和過濾階段。在實現JoinOperator時,只需負責兩個表資料的笛卡爾積組合即可。核心程式碼如下:

// NestedLoopPageBuilder中實現兩個Page計算笛卡爾積的處理邏輯,這裡RunLengthEncodedBlock用於一個元素複製,典型地笛卡爾積計算中需要將一列元素從1行復製成多行。
@Override
public Page next()
{
    if (!hasNext()) {
        throw new NoSuchElementException();
    }
 
    if (noColumnShortcutResult >= 0) {
        rowIndex = maxRowIndex;
        return new Page(noColumnShortcutResult);
    }
 
    rowIndex++;
 
    // Create an array of blocks for all columns in both pages.
    Block[] blocks = new Block[numberOfProbeColumns + numberOfBuildColumns];
 
    // Make sure we always put the probe data on the left and build data on the right.
    int indexForRleBlocks = buildPageLarger ? 0 : numberOfProbeColumns;
    int indexForPageBlocks = buildPageLarger ? numberOfProbeColumns : 0;
 
    // For the page with less rows, create RLE blocks and add them to the blocks array
    for (int i = 0; i < smallPage.getChannelCount(); i++) {
        Block block = smallPage.getBlock(i).getSingleValueBlock(rowIndex);
        blocks[indexForRleBlocks] = new RunLengthEncodedBlock(block, largePage.getPositionCount());
        indexForRleBlocks++;
    }
 
    // Put the page with more rows in the blocks array
    for (int i = 0; i < largePage.getChannelCount(); i++) {
        blocks[indexForPageBlocks + i] = largePage.getBlock(i);
    }
 
    return new Page(largePage.getPositionCount(), blocks);
}

五、小結

本文簡單梳理了Join的基本演算法以及在Presto中實現的基本框架,並以NestedLoop Join演算法為例,演示了在Presto中的實現核心點。可以看出相比原始的演算法描述,Presto的工程落地是截然不同: 不僅支援了所有的Join語義,而且實現了分散式能力。這其中有架構層面的思考,也有效能層面的思考,非常值得探索跟研究。就Join演算法,可以探索的點還有很多,比如多表Join的順序選取,大表跟小表Join的演算法優化,Semi Join的演算法優化,Join演算法資料傾斜的問題等等,可謂路漫漫其修遠兮,將在後續系列文章中繼續分析探索。

六、參考資料

  1. Presto原始碼

  2. Join Processing in Relational Databases

  3. Volcano-An Extensible and Parallel Query Evaluation System