TiFlash 原始碼閱讀(八)TiFlash 表示式的實現與設計

語言: CN / TW / HK

TiFlash TiDB 的分析引擎,是 TiDB HTAP 形態的關鍵元件,TiFlash 原始碼閱讀系列文章將從原始碼層面介紹 TiFlash 的內部實現。 在上一期的原始碼閱讀中,大家應該對 TiFlash Proxy 模組的實現,即 TiFlash 副本是如何被新增以 及獲取資料 有了一定的瞭解。

本文主要介紹的是 TiFlash 表示式的實現與設計, 系統性地介紹了 TiFlash 表示式的基本概念,包括表示式體系,標量函式、聚合函式等,以期望讀者能夠對 TiFlash 的表示式計算有一個初步的瞭解。

本文作者:黃海升,TiFlash 研發工程師

表示式概要

表示式是承載 SQL 大部分邏輯的一個重要部分。SQL 中的表示式和程式語言中的表示式並沒有差異。表示式可以大致分為函式、常量、列引用。如 select a + 1 from table 中的  a + 1 是一個表示式,其中  + 是函式, 1  是常量, a  是列引用。

在 SQL 中,表示式會歸屬在不同的運算元裡執行,以  select a + b from tests.t where c > 0   為例,大家可以從下圖看到不同的表示式歸屬在哪些運算元裡。

表示式在 SQL 中如何劃分出來,並且歸屬在哪些運算元裡,是由一套語法規則決定的。下圖是 MySQL 8.0 Parser 的語法規則簡圖,裡面大號粗體的是運算元識別符號,後面跟著的小號欄位是歸屬這個運算元的表示式。

在瞭解了什麼是表示式之後,我們來了解一下表達式在 TiFlash 裡執行的情況。

在 TiDB HTAP 的體系裡,TiFlash 的表示式是由 TiDB 下推給 TiFlash 執行的。首先我們來回顧下 TiDB 計算下推 TiFlash 的流程。

TiDB 接收 MySQL Client 傳送的 sql,經由 Parser 和 Optimizer 解析成運算元,在之後將運算元下推到 TiFlash 裡執行。與此同時,運算元內部的表示式也會跟隨一起下推到 TiFlash 裡執行。

如下圖所示,如果某個運算元帶有 TiFlash 不支援的函式,就會導致一連串的運算元都無法下推到 TiFlash 裡執行。運算元內部的表示式都可以下推執行,是運算元下推的必要條件。

在運算元和表示式下推到 TiFlash 後,TiFlash 會用向量化執行引擎來執行這些運算元和表示式。在談到 TiFlash 的向量化執行引擎之前,我們先來講一下執行引擎的一個經典模型 Volcano Model。

Volcano Model 源自 1994 年的論文 Volcano-An Extensible and Parallel Query Evaluation System 。Volcano Model 將 SQL 分解為若干個獨立的 Operator,Operator 串聯成一棵 Operator Tree。

如上圖所示,從最源頭的 Table Scan Operator 開始,一行一行地讀取資料,Operator 處理後,傳給上游 Operator。最終 Operator Tree 一行一行地輸出結果。

下面是對 Operator 介面的一個簡單的虛擬碼描述。

struct Operator
{

Row next()
{
Row row = child.next();
....
return row;
}
}

Volcano Model 提供了一個非常簡潔的執行模型,在工程上也非常容易實現。但是 Volcano Model 在現代編譯器和硬體下執行得慢。在後續幾年了誕生了對 Volcano Model 的兩個改進方案,Codegen 和 Vectorized。TiFlash 就是使用的 Vectorized 即向量化執行。

向量化執行與 Volcano Model 基本一致,區別在於 Block by Block。

Block 是若干 Row 組合在一起的資料塊,Block 內部按 Column 儲存資料。

這樣設計的好處有幾個:

  • 虛擬函式呼叫的開銷會被減小為 1 / (Block Size)。Volcano Model 中的 Operator 和表示式通常都是用多型來實現的,在其中就會引入虛擬函式呼叫。每次 Operator Tree 和內部的表示式被呼叫就是一系列的虛擬函式呼叫,在資料量大的情況下,虛擬函式開銷甚至會成為一個不可忽視的點。Block by Block 可以讓 Operator Tree 和內部的表示式的一次呼叫處理多行而不是一行資料,從而均攤了虛擬函式開銷。

  • Cache Friendly。把會被連續處理的資料放在一個數據塊裡,提高在 Cache 上的空間區域性性。

TiFlash 表示式體系 ExpressionActions

除了向量化執行外,TiFlash 在表示式執行上還有一套獨立的執行體系 ExpressionActions ,不同於 TiDB 源自 Volcano Model 的  ExpressionTree

這兩個表示式體系在邏輯語意上是一致的,僅僅在執行過程上有差別。

如上圖所示,同樣的表示式在 TiDB 和 TiFlash 裡會分別在不同的表示式體系裡執行。

接下來以 (a + b) * (a + b) 為例子來講述一下兩個表示式體系執行的差異。

首先從 TiDB ExpressionTree 講起。 (a + b) * (a + b)  會被分解成一棵 Expression Tree,每一個 Expression 都是一個節點。 從 Column Expression 和 Literal Expression 開始讀取資料,遍歷整棵 Expression Tree,最終得出表示式結果。

如下圖所示,沿著圖中的箭頭方向,就可以從 Input 計算得出 (a + b) * (a + b) 的結果 Output。

在圖中大家可以發現 (a + b) 這個子樹出現了兩次,也就意味著  (a + b) 本身執行了兩次,那麼可不可以複用  (a + b) 的計算結果,如下圖所示連線?哪怕這樣就不是一棵樹了。

事實上是可以的,這也是 TiFlash ExpressionActions 的設計初衷: 中間計算結果複用。

在 TiFlash ExpressionActions 下,中間計算的臨時結果會作為 Column 會被寫到 Block 裡,同時我們會通過 Column Name 獲取 Block 中對應的 Column。

如上圖所示,沿右圖箭頭方向遍歷 Expression Action,即可得出 (a + b) * (a + b) 的計算結果。可以看到,同樣是  (a + b) * (a + b) ,TiFlash  ExpressionActions 裡的  (a + b)  只計算了一次。 下面是  ExpressionActions   的執行分解圖,從左到右。 大家可以對照一下分解圖,大概瞭解一下  ExpressionActions   的執行過程。

左右滑動檢視

接下來我們深入一下程式碼,從程式碼層面來了解一下  ExpressionActions   究竟做了些什麼。

class ExpressionActions
{

public:
void execute(Block & block) const
{
for (const auto & action : actions)
action.execute(action);
}

void add(const ExpressionAction & action);

void finalize(const Names & output_columns);

private:
std::vector<ExpressionAction> actions;
}

以上是  ExpressionActions  介面的簡化程式碼。 有三個主要方法:

  • execute

  • ExpressionAction::execute   的包裝。 用於執行表示式。

  • add

  • 用於外部組裝出一個 ExpressionActions

  • ExpressionActions  會維護一個 Sample Block,在 Add Action 的過程中 Sample Block 會不斷更新,模擬實際 Block 的變化情況。

  • 在 Add 的過程中,重複的 Action 會被跳過。重複 Action 的判斷條件是,該 Action 的執行結果是否已經出現在 Sample Block 裡了。

  • finalize

  • 分析 Block 內的 Column 引用情況,在合適的位置插入 Remove Action 來移除無用的 Column。

  • 在 Column 引用數歸 0 的時候,就會插入對應 Column 的 Remove Action。

  • 下圖是 (a + b) * (a + b) 的引用數分析和 Remove Action 插入情況。

ExpressionAction   是  ExpressionActions   內部的執行單元。

struct ExpressionAction
{

Type type;


void execute(Block & block) const
{
switch (type)
{
case: APPLY_FUNCTION:

case: REMOVE_COLUMN:

case: ADD_COLUMN:

}
}
}

ExpressionAction  有不同的 Type 用於對 Block 進行不同的處理

  • REMOVE_COLUMN

  • 即前文所講的 Remove Action

  • ADD_COLUMN

  • 用於執行 Literal  Expression,在 Block 插入一個 Const Column。

    • ADD_COLUMN  會在 Block 中插入一個 Column。 對於 Literal,插入的會是  ColumnConst ,即常量 Column。

    • block.insert({added_column->cloneResized(block.rows()), result_type, result_name});

  • APPLY_FUNCTION

  • 用於執行 Function Expression,由 ExpressionAction 持有的  IFunction   執行

  • APPLY_FUNCTION  會讀取 Block 中的 Argument Columns 傳給  IFunction  做計算。 計算出結果 Column 後,插入到 Block 中。

Column Expression 沒有對應的 Action Type,直接執行 Block::getPositionByName 獲取 Column 在 Block 裡的下標。但是從 TiDB 獲得的 Column Expression 計算得出的並不是 Column 在 Block 中的 Column Name,而是 Column 在 TiDB Schema 中的下標。所以 TiFlash 會維護 TiDB Schema ( std::vector<String> ) 來橋接 TiDB Column Index 和 TiFlash Column Name,如下圖所示。

標量函式在 TiFlash 中的編譯與執行

ExpressionAction  的 Type 為   APPLY_FUNCTION 時, ExpressionAction 內部會持有 IFunction 。對 ExpressionAction::execute 的呼叫都會轉發給 IFunction 執行。 IFunction 是 TiFlash 向量化標量函式的基類。

首先我們從函式在 TiFlash 中的編譯講起。 tipb 是 TiDB 與 TiFlash 之間的序列化協議,下圖的 tipb::Expr 等同是 TiDB 裡的 Expression。

對於傳入的一個 tipb::Expr ,首先分門別類,按照 Column,Literal,Function 分別處理。

如果是 Function,首先處理 Function 的所有引數。引數本身也是 tipb::Expr ,所以也會按照對應的 tipb::Expr 處理流程處理。在處理完 Function 的所有引數後,就可以去構建 IFunction 本身,然後塞入 ExpressionAction ,返回處理結果。

TiDB 對函式的標識是 tipb::ScalarFuncSig ,而 TiFlash 使用 Function Name 作為函式的標識。在 TiFlash 裡,我們會用對映表的形式將  tipb::ScalarFuncSig 對映成 Function Name。再根據 Function Name 找到對應的 TiFlash Function Builder。

對於視窗函式、聚合函式、distinct 聚合函式、標量函式都有各自的對映表。

const std::unordered_map<tipb::ExprType, String> window_func_map({
{tipb::ExprType::Rank, "rank"},
...
})
;

const std::unordered_map<tipb::ExprType, String> agg_func_map({
{tipb::ExprType::Count, "count"},
...
})
;

const std::unordered_map<tipb::ExprType, String> distinct_agg_func_map({
{tipb::ExprType::Count, "countDistinct"},
...
})
;

const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
{tipb::ScalarFuncSig::CastIntAsInt, "tidb_cast"},
...
})
;

拿到 Function Name 之後,我們就可以去找到 Function 對應的 Function Builder 去 Build 出  IFunction   實現。 TiFlash 有兩類 Function Builder: Default Function Builder 和 Special Function Builder。 前者是用於處理大多數 Function,後者處理某些特殊情況。

String DAGExpressionAnalyzerHelper::buildFunction(
DAGExpressionAnalyzer * analyzer,
const tipb::Expr & expr,
const ExpressionActionsPtr & actions)

{
const String & func_name = getFunctionName(expr);
if (function_builder_map.count(func_name) != 0)
{
return function_builder_map[func_name](analyzer, expr, actions);
}
else
{
return buildDefaultFunction(analyzer, expr, actions);
}
}

Default Function Builder 的處理本身很簡單,就是先處理所有的函式入參,然後呼叫  applyFunction 生成對應的  IFunction 實現。

String DAGExpressionAnalyzerHelper::buildDefaultFunction(
DAGExpressionAnalyzer * analyzer,
const tipb::Expr & expr,
const ExpressionActionsPtr & actions)

{
const String & func_name = getFunctionName(expr);
Names argument_names;
for (const auto & child : expr.children())
{
String name = analyzer->getActions(child, actions);
argument_names.push_back(name);
}
return analyzer->applyFunction(func_name, argument_names, actions, getCollatorFromExpr(expr));
}

對某些函式,有些特殊處理,這時就會使用到 Special Function Builder。

特殊處理的 Function Builder 會放在表裡,遇到對應的函式,會轉發過來。

下面是一些函式的特殊處理對映。

FunctionBuilderMap DAGExpressionAnalyzerHelper::function_builder_map(
{...
{"ifNull", DAGExpressionAnalyzerHelper::buildIfNullFunction},
{"multiIf", DAGExpressionAnalyzerHelper::buildMultiIfFunction},
...
{"bitAnd", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
{"bitOr", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
{"bitXor", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
{"bitNot", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
{"bitShiftLeft", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
{"bitShiftRight", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
...
})
;

一個特殊的處理的情況是複用函式實現。某些函式可以由另一個函式來代理執行,比如

leftUTF8(str, len) = substrUTF8(str, 1, len) 。如果讓 substrUTF8 來代理執行 leftUTF8 ,那麼就可以省掉 leftUTF8 本身的開發實現工作。下面是 leftUTF8(str, len) = substrUTF8(str, 1, len) 的代理實現程式碼。為 substrUTF8 生成第二個引數 1 後,將 leftUTF8 的兩個引數傳入 substrUTF8substrUTF8 就可以代理 leftUTF8 執行。

String DAGExpressionAnalyzerHelper::buildLeftUTF8Function(
DAGExpressionAnalyzer * analyzer,
const tipb::Expr & expr,
const ExpressionActionsPtr & actions)

{
const String & func_name = "substringUTF8";
Names argument_names;

// the first parameter: str
String str = analyzer->getActions(expr.children()[0], actions, false);
argument_names.push_back(str);

// the second parameter: const(1)
auto const_one = constructInt64LiteralTiExpr(1);
auto col_const_one = analyzer->getActions(const_one, actions, false);
argument_names.push_back(col_const_one);

// the third parameter: len
String name = analyzer->getActions(expr.children()[1], actions, false);
argument_names.push_back(name);

return analyzer->applyFunction(func_name, argument_names, actions, getCollatorFromExpr(expr));
}

接下來我們來看一下 IFunction   本身的介面。

class IFunction
{

public:
virtual String getName() const = 0;

virtual DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const;

virtual void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const;
};

IFunction  有三個主要方法

  • getName : 返回 Function 的 Name,Name 是作為 TiFlash 向量化函式的唯一標識來使用。

  • getReturnTypeImpl : 負責做向量化函式的型別推導,因為輸入引數資料型別的變化可能會導致輸出資料型別變化。

  • executeImpl : 負責向量化函式的執行邏輯,這也是一個向量化函式的主體部分。 一個 TiFlash 向量化函式夠不夠"向量化",夠不夠快也就看這裡了。

接下來以 jsonLength(string) 為例子,講一下向量化函式的執行

  1. 從 Block 中獲取 Json Column

  2. 建立同等大小的 Json Length Column

  3. Foreach Json Column,獲取每一個行的 Json

  4. 呼叫 GetJsonLength(Json) 獲取 Json Length,將結果插入 Json Length Column 中的對應位置。

  5. 將 Json Length Column 插入到 Block 中,完成單次計算

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
{
// 1. 獲取 json column,json column 本身是 String 型別,所以 json column 用的是 ColumnString 這個 column 實現
const ColumnPtr column = block.getByPosition(arguments[0]).column;
if (const auto * col = checkAndGetColumn<ColumnString>(column.get()))
{
// 2.建立 json len column, json len 本身是 UInt64 型別,用的是 ColumnUInt64 這個 column 實現
auto col_res = ColumnUInt64::create();
typename ColumnUInt64::Container & vec_col_res = col_res->getData();
{
// 3. 遍歷 json column,ColumnString 提供了一些裸操作內部 string 的方法,可以提高效率
const auto & data = col->getChars();
const auto & offsets = col->getOffsets();
const size_t size = offsets.size();
vec_col_res.resize(size);

ColumnString::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
// 4. 呼叫 GetJsonLength,計算出 json 的 length,插入到 json len column 中。
std::string_view sv(reinterpret_cast<const char *>(&data[prev_offset]), offsets[i] - prev_offset - 1);
vec_col_res[i] = GetJsonLength(sv);
prev_offset = offsets[i];
}
}
// 5. 將 json len column 插入到 Block 中,完成單次計算。
block.getByPosition(result).column = std::move(col_res);
}
else
throw Exception(fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()), ErrorCodes::ILLEGAL_COLUMN);
}

前段時間 TiFlash 有個社群活動,號召大家來參與 TiFlash 函式下推的工作。

以上關於標量函式的內容在社群活動的兩篇文章 TiFlash 函式下推必知必會 和  手把手教你實現 TiFlash 向量化函式 都有包含,大家可以通過這兩篇文章瞭解更多關於 TiFlash 標量函式的內容。也歡迎小夥伴們也參與到 TiFlash 函式下推的工作中來。

聚合函式在 TiFlash 中的編譯與執行

聚合函式不同於標量函式的點在於輸入 m 行資料輸出 n 行資料,且有 m >= n。

所以聚合函式不會使用 ExpressionActions 和  IFunction ,而是有獨立的執行體系。

聚合函式本身由 Aggregate 運算元來執行, Aggregate 運算元負責管理聚合函式的執行步驟,執行併發度等等。

Aggregate   運算元的執行有兩部分

  • 前者 ExpressionActions 用於執行標量函式的部分。

  • 後者 Aggregator 用於執行聚合函式的部分。

如下圖所示,對於 select max(a+b)ExpressionActions 執行 a+bAggregator 執行 max

下面詳細來說一說 Aggregator 是如何執行聚合函式的。首先 Aggregator 會多執行緒從 Input 讀取 Block,呼叫 executeOnBlock 寫入 Thread Local AggregatedDataVariantsAggregatedDataVariants 內部會儲存當前執行緒的部分聚合計算結果。在 executeOnBlock 階段完成後, Aggregator 會呼叫 mergeAndConvertBlocks 將多個 AggregatedDataVariants 合併成一個,輸出最終聚合的結果給 Output。

Aggregator::executeOnBlock   會對每一個 Key 都會建立一個 Aggregate Data。

輸入的 Row 會根據 Key 找到對應的 Aggregate Data, 呼叫 IAggregateFunction::add ,更新 Agg Function 儲存在 Aggregate Data 裡的聚合結果。

如下圖所示,Row1 和 Row4 經由 Aggregate Function 計算後,更新 Key1 儲存的 Aggregate Data;Row2 經由 Aggregate Function 計算後,更新 Key2 儲存的 Aggregate Data;Row3 和 Row5 經由 Aggregate Function 計算後,更新 Key3 儲存的 Aggregate Data。

executeOnBlock 完成後,每個執行緒都會有一個獨立的 Aggregate Data。

mergeAndConvertBlocks 階段會把其他 Aggregate Data 都合併到 Aggregate Data0 上面。

IAggregateFunction::merge  用於執行把所有執行緒計算的部分聚合結果聚合成一個最終聚合結果。

如下圖所示,Aggregate Data 在合併的時候,同一個 Key 的資料會合併到一起,與其他 Key 互不干擾。

IAggregateFunction   是聚合函式的實現基類。 接下來我們來看一下  IAggregateFunction   本身的介面。

class IAggregateFunction
{

public:
String getName() const = 0;

DataTypePtr getReturnType() const = 0;

void add(AggregateDataPtr __restrict place, const IColumn ** arg_columns, size_t row_num, Arena * arena) const = 0;

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const = 0;
};
  • add 用於計算聚合結果並更新到 place

  • merge 用於將 rhs 裡的聚合結果合併到 place

  • insertResultInto 用於將 place 裡儲存的聚合結果輸出成 Block

接下來我們以 Sum 這個聚合函式為例子來看一下聚合函式的執行過程。

template <typename T>
using AggregateFunctionSumSimple =
AggregateFunctionSum<T, typename NearestFieldType<T>::Type, AggregateFunctionSumData<typename NearestFieldType<T>::Type>>;

template <typename T, typename TResult, typename Data>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>
{
public:
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).add(column.getData()[row_num]);
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
};

template <typename T>
struct AggregateFunctionSumData
{

T sum{};

template <typename U>
void add(U value) { AggregateFunctionSumAddImpl<T>::add(sum, value); }

void merge(const AggregateFunctionSumData & rhs) { AggregateFunctionSumAddImpl<T>::add(sum, rhs.sum); }
}

template <typename T>
struct AggregateFunctionSumAddImpl
{

static void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(T & lhs, const T & rhs) { lhs += rhs; }
};
  • 實際的 Sum 實現帶有很多優化,這裡選擇最簡化的實現

  • 對於 Sum,AggregateData 的實現是 AggregateFunctionSumData ,內部維護一個 T Sum 儲存聚合結果

  • 對於 Sum, addmerge 都是執行 += 操作。

下圖是對 Sum 執行過程的一個簡化描述圖。

add 階段, Aggregator 會從 Input 讀取資料,執行 sum += input ,更新 Aggregate Data 裡的聚合結果。在 add 完成後,會執行 sum0 += sum1sum0 += sum2 ,將聚合結果合併成一個。最終輸出 sum0 作為最終結果。

:bulb:  點選文末 【閱讀原文】 立即下載試用 TiDB!