10分鐘教你寫一個數據庫

語言: CN / TW / HK

今天教大家藉助一款框架快速實現一個數據庫,這個框架就是Calcite,下面會帶大家通過兩個例子快速教會大家怎麼實現,一個是可以通過 SQL 語句的方式可以直接查詢檔案內容,第二個是模擬 Mysql 查詢功能,以及最後告訴大家怎麼實現 SQL 查詢 Kafka 資料。

Calcite

Calcite 是一個用於優化異構資料來源的查詢處理的可插拔基礎框架(他是一個框架),可以將任意資料(Any data, Anywhere)DML 轉換成基於 SQL 的 DML 引擎,並且我們可以選擇性的使用它的部分功能。

Calcite能幹什麼

  1. 使用 SQL 訪問記憶體中某個資料

  2. 使用 SQL 訪問某個檔案的資料

  3. 跨資料來源的資料訪問、聚合、排序等(例如 Mysql 和 Redis 資料來源中的資料進行join)

當我們需要自建一個數據庫的時候,資料可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方介面資料等等,我們只有資料,我們想讓這些資料支援 SQL 形式動態增刪改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等專案中,資料處理系統都是使用 Calcite 來做 SQL 解析和查詢優化,當然,還有部分用來構建自己的 JDBC driver。

名詞解釋

Token

就是將標準 SQL(可以理解為Mysql)關鍵詞以及關鍵詞之間的字串截取出來,每一個token,會被封裝為一個SqlNodeSqlNode會衍生很多子類,比如Select會被封裝為SqlSelect,當前 SqlNode 也能反解析為 SQL 文字。

RelDataTypeField

某個欄位的名稱和型別資訊

RelDataType

多個 RelDataTypeField 組成了 RelDataType,可以理解為資料行

Table

一個完整的表的資訊

Schema

所有元資料的組合,可以理解為一組 Table 或者庫的概念

開始使用

1. 引入包

java <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <!-- 目前最新版本 2022-09-10日更新--> <version>1.32.0</version> </dependency>

2. 建立model.json檔案和表結構csv

model.json 裡面主要描述或者說告訴 Calcite 如何建立 Schema,也就是告訴框架怎麼創建出庫。

json { "version": "1.0",//忽略 "defaultSchema": "CSV",//設定預設的schema "schemas": [//可定義多個schema { "name": "CSV",//相當於namespace和上面的defaultSchema的值對應 "type": "custom",//寫死 "factory": "csv.CsvSchemaFactory",//factory的類名必須是你自己實現的factory的包的全路徑 "operand": { //這裡可以傳遞自定義引數,最終會以map的形式傳遞給factory的operand引數 "directory": "csv"//directory代表calcite會在resources下面的csv目錄下面讀取所有的csv檔案,factory建立的Schema會吧這些檔案全部構建成Table,可以理解為讀取資料檔案的根目錄,當然key的名稱也不一定非得用directory,你可以隨意指定 } } ] }

接下來還需要定義一個 csv 檔案,用來定義表結構。

NAME:string,MONEY:string aixiaoxian,10000萬 xiaobai,10000萬 adong,10000萬 maomao,10000萬 xixi,10000萬 zizi,10000萬 wuwu,10000萬 kuku,10000萬

整個專案的結構大概就是這樣:

3. 實現Schema的工廠類

在上述檔案中指定的包路徑下去編寫CsvSchemaFactory 類,實現 SchemaFactory 介面,並且實現裡面唯一的方法create 方法,建立Schema(庫)。

java public class CsvSchemaFactory implements SchemaFactory { /** * parentSchema 父節點,一般為root * name 為model.json中定義的名字 * operand 為model.json中定於的資料,這裡可以傳遞自定義引數 * * @param parentSchema Parent schema * @param name Name of this schema * @param operand The "operand" JSON property * @return */ @Override public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { final String directory = (String) operand.get("directory"); File directoryFile = new File(directory); return new CsvSchema(directoryFile, "scannable"); } }

4. 自定義Schma類

有了 SchemaFactory,接下來需要自定義 Schema 類。

自定義的 Schema 需要實現 Schema 介面,但是直接實現要實現的方法太多,我們去實現官方的 AbstractSchema 類,這樣就只需要實現一個方法就行(如果有其他定製化需求可以實現原生介面)。

核心的邏輯就是createTableMap方法,用於創建出 Table 表。

他會掃描指定的Resource下面的所有 csv 檔案,將每個檔案對映成Table物件,最終以map形式返回,Schema介面的其他幾個方法會用到這個物件。

java //實現這一個方法就行了 @Override protected Map<String, Table> getTableMap() { if (tableMap == null) { tableMap = createTableMap(); } return tableMap; } private Map<String, Table> createTableMap() { // Look for files in the directory ending in ".csv" final Source baseSource = Sources.of(directoryFile); //會自動過濾掉非指定檔案字尾的檔案,我這裡寫的csv File[] files = directoryFile.listFiles((dir, name) -> { final String nameSansGz = trim(name, ".gz"); return nameSansGz.endsWith(".csv"); }); if (files == null) { System.out.println("directory " + directoryFile + " not found"); files = new File[0]; } // Build a map from table name to table; each file becomes a table. final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); for (File file : files) { Source source = Sources.of(file); final Source sourceSansCsv = source.trimOrNull(".csv"); if (sourceSansCsv != null) { final Table table = createTable(source); builder.put(sourceSansCsv.relative(baseSource).path(), table); } } return builder.build(); }

5. 自定義 Table

Schema 有了,並且資料檔案 csv 也對映成 Table 了,一個 csv檔案對應一個 Table

接下來我們去自定義 Table,自定義 Table 的核心是我們要定義欄位的型別和名稱,以及如何讀取 csv檔案。

  1. 先獲取資料型別和名稱,即單表結構,從csv檔案頭中獲取(當前檔案頭需要我們自己定義,包括規則我們也可以定製化)。

```java / * Base class for table that reads CSV files. */ public abstract class CsvTable extends AbstractTable { protected final Source source; protected final @Nullable RelProtoDataType protoRowType; private @Nullable RelDataType rowType; private @Nullable List fieldTypes; ​ / * Creates a CsvTable. / CsvTable(Source source, @Nullable RelProtoDataType protoRowType) { this.source = source; this.protoRowType = protoRowType; } / * 建立一個CsvTable,繼承AbstractTable,需要實現裡面的getRowType方法,此方法就是獲取當前的表結構。 Table的型別有很多種,比如還有檢視型別,AbstractTable類中幫我們預設實現了Table介面的一些方法,比如getJdbcTableType 方法,預設為Table型別,如果有其他定製化需求可直接實現Table介面。 和AbstractSchema很像 / @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { if (protoRowType != null) { return protoRowType.apply(typeFactory); } if (rowType == null) { rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source, null); } return rowType; } ​ /* * Returns the field types of this CSV table. / public List getFieldTypes(RelDataTypeFactory typeFactory) { if (fieldTypes == null) { fieldTypes = new ArrayList<>(); CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source, fieldTypes); } return fieldTypes; }

public static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source, @Nullable List fieldTypes) { final List types = new ArrayList<>(); final List names = new ArrayList<>(); try (CSVReader reader = openCsv(source)) { String[] strings = reader.readNext(); if (strings == null) { strings = new String[]{"EmptyFileHasNoColumns:boolean"}; } for (String string : strings) { final String name; final RelDataType fieldType; //就是簡單的讀取字串冒號前面是名稱,冒號後面是型別 final int colon = string.indexOf(':'); if (colon >= 0) { name = string.substring(0, colon); String typeString = string.substring(colon + 1); Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString); if (decimalMatcher.matches()) { int precision = Integer.parseInt(decimalMatcher.group(1)); int scale = Integer.parseInt(decimalMatcher.group(2)); fieldType = parseDecimalSqlType(typeFactory, precision, scale); } else { switch (typeString) { case "string": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR); break; case "boolean": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN); break; case "byte": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT); break; case "char": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR); break; case "short": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT); break; case "int": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER); break; case "long": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT); break; case "float": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL); break; case "double": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE); break; case "date": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE); break; case "timestamp": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP); break; case "time": fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME); break; default: LOGGER.warn( "Found unknown type: {} in file: {} for column: {}. Will assume the type of " + "column is string.", typeString, source.path(), name); fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR); break; } } } else { // 如果沒定義,預設都是String型別,欄位名稱也是string name = string; fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR); } names.add(name); types.add(fieldType); if (fieldTypes != null) { fieldTypes.add(fieldType); } } } catch (IOException e) { // ignore } if (names.isEmpty()) { names.add("line"); types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR)); } return typeFactory.createStructType(Pair.zip(names, types)); } } ```

  1. 獲取檔案中的資料,上面把Table的表結構欄位名稱和型別都獲取到了以後,就剩最後一步了,獲取檔案中的資料。我們需要自定義一個類,實現 ScannableTable 介面,並且實現裡面唯一的方法 scan 方法,其實本質上就是讀檔案,然後把檔案的每一行的資料和上述獲取的 fileType 進行匹配。

```java @Override public Enumerable scan(DataContext root) { JavaTypeFactory typeFactory = root.getTypeFactory(); final List fieldTypes = getFieldTypes(typeFactory); final List fields = ImmutableIntList.identity(fieldTypes.size()); final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); return new AbstractEnumerable<@Nullable Object[]>() { @Override public Enumerator<@Nullable Object[]> enumerator() { //返回我們自定義的讀取資料的類 return new CsvEnumerator<>(source, cancelFlag, false, null, CsvEnumerator.arrayConverter(fieldTypes, fields, false)); } }; }

public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream, @Nullable String @Nullable [] filterValues, RowConverter rowConverter) { this.cancelFlag = cancelFlag; this.rowConverter = rowConverter; this.filterValues = filterValues == null ? null : ImmutableNullableList.copyOf(filterValues); try {

        this.reader = openCsv(source);
        //跳過第一行,因為第一行是定義型別和名稱的
        this.reader.readNext(); // skip header row
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

//CsvEnumerator必須實現calcit自己的迭代器,裡面有current、moveNext方法,current是返回當前遊標所在的資料記錄,moveNext是將遊標指向下一個記錄,官網中自己定義了一個型別轉換器,是將csv檔案中的資料轉換成檔案頭指定的型別,這個需要我們自己來實現 @Override public E current() { return castNonNull(current); }

@Override
public boolean moveNext() {
    try {
        outer:
        for (; ; ) {
            if (cancelFlag.get()) {
                return false;
            }
            final String[] strings = reader.readNext();
            if (strings == null) {
                current = null;
                reader.close();
                return false;
            }
            if (filterValues != null) {
                for (int i = 0; i < strings.length; i++) {
                    String filterValue = filterValues.get(i);
                    if (filterValue != null) {
                        if (!filterValue.equals(strings[i])) {
                            continue outer;
                        }
                    }
                }
            }
            current = rowConverter.convertRow(strings);
            return true;
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

    protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
        if (fieldType == null || string == null) {
            return string;
        }
        switch (fieldType.getSqlTypeName()) {
            case BOOLEAN:
                if (string.length() == 0) {
                    return null;
                }
                return Boolean.parseBoolean(string);
            case TINYINT:
                if (string.length() == 0) {
                    return null;
                }
                return Byte.parseByte(string);
            case SMALLINT:
                if (string.length() == 0) {
                    return null;
                }
                return Short.parseShort(string);
            case INTEGER:
                if (string.length() == 0) {
                    return null;
                }
                return Integer.parseInt(string);
            case BIGINT:
                if (string.length() == 0) {
                    return null;
                }
                return Long.parseLong(string);
            case FLOAT:
                if (string.length() == 0) {
                    return null;
                }
                return Float.parseFloat(string);
            case DOUBLE:
                if (string.length() == 0) {
                    return null;
                }
                return Double.parseDouble(string);
            case DECIMAL:
                if (string.length() == 0) {
                    return null;
                }
                return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
            case DATE:
                if (string.length() == 0) {
                    return null;
                }
                try {
                    Date date = TIME_FORMAT_DATE.parse(string);
                    return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                } catch (ParseException e) {
                    return null;
                }
            case TIME:
                if (string.length() == 0) {
                    return null;
                }
                try {
                    Date date = TIME_FORMAT_TIME.parse(string);
                    return (int) date.getTime();
                } catch (ParseException e) {
                    return null;
                }
            case TIMESTAMP:
                if (string.length() == 0) {
                    return null;
                }
                try {
                    Date date = TIME_FORMAT_TIMESTAMP.parse(string);
                    return date.getTime();
                } catch (ParseException e) {
                    return null;
                }
            case VARCHAR:
            default:
                return string;
        }
    }

```

6. 最後

至此我們需要準備的東西:庫、表名稱、欄位名稱、欄位型別都有了,接下來我們去寫我們的 SQL 語句查詢我們的資料檔案。

建立好幾個測試的資料檔案,例如上面專案結構中我建立 2 個 csv 檔案USERINFO.csvASSET.csv,然後建立測試類。

這樣跑起來,就可以通過 SQL 語句的方式直接查詢資料了。

```java public class Test { public static void main(String[] args) throws SQLException { Connection connection = null; Statement statement = null; try { Properties info = new Properties(); info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath()); connection = DriverManager.getConnection("jdbc:calcite:", info); statement = connection.createStatement(); print(statement.executeQuery("select * from asset "));

        print(statement.executeQuery(" select * from userinfo "));

        print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));

        print(statement.executeQuery(" select * from userinfo where age >60 "));

        print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
    } finally {
        connection.close();
    }
}

private static void print(ResultSet resultSet) throws SQLException {
    final ResultSetMetaData metaData = resultSet.getMetaData();
    final int columnCount = metaData.getColumnCount();
    while (resultSet.next()) {
        for (int i = 1; ; i++) {
            System.out.print(resultSet.getString(i));
            if (i < columnCount) {
                System.out.print(", ");
            } else {
                System.out.println();
                break;
            }
        }
    }
}

} ```

查詢結果:

這裡在測試的時候踩到2個坑,大家如果自己實驗的時候可以避免下。

  1. Calcite 預設會把你的 SQL 語句中的表名和類名全部轉換為大寫,因為預設的 csv(其他檔案也一樣)檔案的名稱就是表名,除非你自定義規則,所以你的檔名要寫成大寫。

  2. Calcite 有一些預設的關鍵字不能用作表名,不然會查詢失敗,比如我剛開始定的user.csv就一直查不出來,改成USERINFO就可以了,這點和Mysql 的內建關鍵字差不多,也可以通過個性化配置去改。

演示Mysql

  1. 首先,還是先準備 Calcite 需要的東西:庫、表名稱、欄位名稱、欄位型別。

如果資料來源使用Mysql的話,這些都不用我們去 JAVA 服務中去定義,直接在 Mysql 客戶端建立好,這裡直接建立兩張表用於測試,就和我們的csv檔案一樣。

``sql CREATE TABLEUSERINFO1(NAMEvarchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,AGE` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE TABLE ASSET ( NAME varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL, MONEY varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3; ```

  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 這些都不需要建立,因為 Calcite 預設提供了 Mysql 的 Adapter介面卡。

  2. 其實,上述兩步都不需要做,我們真正要做的是,告訴 Calcite 你的 JDBC 的連線資訊就行了,也是在 model.json 檔案中定義。

json { "version": "1.0", "defaultSchema": "Demo", "schemas": [ { "name": "Demo", "type": "custom", // 這裡是calcite預設的SchemaFactory,裡面的流程和我們上述自己定義的相同,下面會簡單看看原始碼。 "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory", "operand": { // 我用的是mysql8以上版本,所以這裡注意包的名稱 "jdbcDriver": "com.mysql.cj.jdbc.Driver", "jdbcUrl": "jdbc:mysql://localhost:3306/irving", "jdbcUser": "root", "jdbcPassword": "123456" } } ] }

  1. 在專案中引入 Mysql 的驅動包

java <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.30</version> </dependency>

  1. 寫好測試類,這樣直接就相當於完成了所有的功能了。

```java public class TestMysql { public static void main(String[] args) throws SQLException { Connection connection = null; Statement statement = null; try { Properties info = new Properties(); info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath()); connection = DriverManager.getConnection("jdbc:calcite:", info); statement = connection.createStatement(); statement.executeUpdate(" insert into userinfo1 values ('xxx',12) "); print(statement.executeQuery("select * from asset "));

        print(statement.executeQuery(" select * from userinfo1 "));

        print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));

        print(statement.executeQuery(" select * from userinfo1 where age >60 "));

        print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
    } finally {
        connection.close();
    }

}

private static void print(ResultSet resultSet) throws SQLException {
    final ResultSetMetaData metaData = resultSet.getMetaData();
    final int columnCount = metaData.getColumnCount();
    while (resultSet.next()) {
        for (int i = 1; ; i++) {
            System.out.print(resultSet.getString(i));
            if (i < columnCount) {
                System.out.print(", ");
            } else {
                System.out.println();
                break;
            }
        }
    }
}

} ```

查詢結果:

Mysql實現原理

上述我們在 model.json 檔案中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory類,可以看下這個類的程式碼。

這個類是把 FactorySchema 寫在了一起,其實就是呼叫schemafactory類的create方法建立一個 schema 出來,和我們上面自定義的流程是一樣的。

其中JdbcSchema類也是 Schema 的子類,所以也會實現getTable方法(這個我們上述也實現了,我們當時是獲取表結構和表的欄位型別以及名稱,是從csv檔案頭中讀檔案的),JdbcSchema的實現是通過連線 Mysql 服務端查詢元資料資訊,再將這些資訊封裝成 Calcite需要的物件格式。

這裡同樣要注意 csv方式的2個注意點,大小寫和關鍵字問題。

```java public static JdbcSchema create( SchemaPlus parentSchema, String name, Map operand) { DataSource dataSource; try { final String dataSourceName = (String) operand.get("dataSource"); if (dataSourceName != null) { dataSource = AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName); } else { //會走在這裡來,這裡就是我們在model.json中指定的jdbc的連線資訊,最終會建立一個datasource final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl"); final String jdbcDriver = (String) operand.get("jdbcDriver"); final String jdbcUser = (String) operand.get("jdbcUser"); final String jdbcPassword = (String) operand.get("jdbcPassword"); dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword); } } catch (Exception e) { throw new RuntimeException("Error while reading dataSource", e); } String jdbcCatalog = (String) operand.get("jdbcCatalog"); String jdbcSchema = (String) operand.get("jdbcSchema"); String sqlDialectFactory = (String) operand.get("sqlDialectFactory");

if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
  return JdbcSchema.create(
      parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
  SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
      SqlDialectFactory.class, sqlDialectFactory);
  return JdbcSchema.create(
      parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
}

}

@Override public @Nullable Table getTable(String name) { return getTableMap(false).get(name); }

private synchronized ImmutableMap getTableMap( boolean force) { if (force || tableMap == null) { tableMap = computeTables(); } return tableMap; }

private ImmutableMap computeTables() { Connection connection = null; ResultSet resultSet = null; try { connection = dataSource.getConnection(); final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection); final String catalog = catalogSchema.left; final String schema = catalogSchema.right; final Iterable tableDefs; Foo threadMetadata = THREAD_METADATA.get(); if (threadMetadata != null) { tableDefs = threadMetadata.apply(catalog, schema); } else { final List tableDefList = new ArrayList<>(); // 獲取元資料 final DatabaseMetaData metaData = connection.getMetaData(); resultSet = metaData.getTables(catalog, schema, null, null); while (resultSet.next()) { //獲取庫名,表明等資訊 final String catalogName = resultSet.getString(1); final String schemaName = resultSet.getString(2); final String tableName = resultSet.getString(3); final String tableTypeName = resultSet.getString(4); tableDefList.add( new MetaImpl.MetaTable(catalogName, schemaName, tableName, tableTypeName)); } tableDefs = tableDefList; }

  final ImmutableMap.Builder<String, JdbcTable> builder =
      ImmutableMap.builder();
  for (MetaImpl.MetaTable tableDef : tableDefs) {
    final String tableTypeName2 =
        tableDef.tableType == null
        ? null
        : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
    final TableType tableType =
        Util.enumVal(TableType.OTHER, tableTypeName2);
    if (tableType == TableType.OTHER  && tableTypeName2 != null) {
      System.out.println("Unknown table type: " + tableTypeName2);
    }
    //  最終封裝成JdbcTable物件
    final JdbcTable table =
        new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
            tableDef.tableName, tableType);
    builder.put(tableDef.tableName, table);
  }
  return builder.build();
} catch (SQLException e) {
  throw new RuntimeException(
      "Exception while reading tables", e);
} finally {
  close(connection, null, resultSet);
}

} ```

SQL執行流程

OK,到這裡基本上兩個簡單的案例已經演示好了,最後補充一下整個Calcite架構和整個 SQL 的執行流程。

整個流程如下:SQL解析(Parser)=> SQL校驗(Validator)=> SQL查詢優化(optimizer)=> SQL生成 => SQL執行

SQL Parser

所有的 SQL 語句在執行前都需要經歷 SQL 解析器解析,解析器的工作內容就是將 SQL 中的 Token 解析成抽象語法樹,每個樹的節點都是一個 SqlNode,這個過程其實就是 Sql Text => SqlNode 的過程。

我們前面的 Demo 沒有自定義 Parser,是因為 Calcite 採用了自己預設的 Parser(SqlParserImpl)。

SqlNode

SqlNode是整個解析中的核心,比如圖中你可以發現,對於每個比如selectfromwhere關鍵字之後的內容其實都是一個SqlNode

parserConfig方法主要是設定 SqlParserFactory 的引數,比如我們上面所說的我本地測試的時候踩的大小寫的坑,就可以在這裡設定。

直接呼叫setCaseSensitive=false即不會將 SQL 語句中的表名列名轉為大寫,下面是預設的,其他的引數可以按需配置。

SQL Validator

SQL 語句先經過 Parser,然後經過語法驗證器,注意 Parser 並不會驗證語法的正確性。

其實 Parser 只會驗證 SQL 關鍵詞的位置是否正確,我們上述2個 Parser 的例子中都沒有建立 schematable 這些,但是如果這樣寫,那就會報錯,這個錯誤就是 parser 檢測後丟擲來的(ParseLocationErrorTest)。

真正的校驗在 validator 中,會去驗證查詢的表名是否存在,查詢的欄位是否存在,型別是否匹配,這個過程比較複雜,預設的 validatorSqlValidatorImpl

查詢優化

比如關係代數,比如什麼投影、笛卡爾積這些,Calcite提供了很多內部的優化器,也可以實現自己的優化器。

介面卡

Calcite 是不包含儲存層的,所以提供一種介面卡的機制來訪問外部的資料儲存或者儲存引擎。

最後,進階

官網裡面寫了未來會支援Kafka介面卡到公共Api中,到時候使用起來就和上述整合Mysql一樣方便,但是現在還沒有支援,我這裡給大家提供個自己實現的方式,這樣就可以通過 SQL 的方式直接查詢 Kafka 中的 Topic 資料等資訊。

這裡我們內部整合實現了KSQL的能力,查詢結果是OK的。

還是像上述步驟一樣,我們需要準備庫、表名稱、欄位名稱、欄位型別、資料來源(多出來的地方)。

  1. 自定義Sql解析,之前我們都沒有自定義解析,這裡需要自定義解析,是因為我需要動態解析sqlwhere條件裡面的partation

  2. 配置解析器,就是之前案例中提到的配置大小寫之類的

  3. 建立解析器,使用的預設SqlParseImpl
  4. 開始解析,生成AST,我們可以基於生成的SqlNode做一些業務相關的校驗和引數解析

  5. 介面卡獲取資料來源

```java public class KafkaConsumerAdapter { public static List executor(KafkaSqlInfo kafkaSql) { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(props); List topics = new ArrayList<>(); for (Integer partition : kafkaSql.getPartition()) { TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition); topics.add(tp); } consumer.assign(topics); for (TopicPartition tp : topics) { Map offsets = consumer.endOffsets(Collections.singleton(tp)); long position = 500; if (offsets.get(tp).longValue() > position) { consumer.seek(tp, offsets.get(tp).longValue() - 500); } else { consumer.seek(tp, 0); } } List results = new ArrayList<>(); boolean flag = true; while (flag) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { //轉成我定義的物件集合 KafkaResult result = new KafkaResult(); result.setPartition(record.partition()); result.setOffset(record.offset()); result.setMsg(record.value()); result.setKey(record.key()); results.add(result); } if (!records.isEmpty()) { flag = false; } } consumer.close(); return results; }

} ```

  1. 執行查詢,就可以得到我們想要的效果了。

``java public class TestKafka { public static void main(String[] args) throws Exception { KafkaService kafkaService = new KafkaService(); //把解析到的引數放在我自己定義的kafkaSqlInfo物件中 KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * fromcmdb-calltopowherepartition` in (0,1,2) limit 1000 "); //介面卡獲取資料來源,主要是從上述的sqlInfo物件中去poll資料 List results = KafkaConsumerAdapter.executor(sqlInfo); //執行查詢 query(sqlInfo.getTableName(), results, sqlInfo.getSql());

       sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%'  limit 1000 ");
       results = KafkaConsumerAdapter.executor(sqlInfo);
       query(sqlInfo.getTableName(), results, sqlInfo.getSql());


       sqlInfo = kafkaService.parseSql("select count(*) AS addad  from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
       results = KafkaConsumerAdapter.executor(sqlInfo);
       query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   }

   private static void query(String tableName, List<KafkaResult> results,
                             String sql) throws Exception {
       //建立model.json,設定我的SchemaFactory,設定庫名
       String model = createTempJson();
       //設定我的表結構,表名稱和表字段名以及型別
       KafkaTableSchema.generateSchema(tableName, results);
       Properties info = new Properties();
       info.setProperty("lex", Lex.JAVA.toString());
       Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
       Statement st = connection.createStatement();
       //執行
       ResultSet result = st.executeQuery(sql);
       ResultSetMetaData rsmd = result.getMetaData();
       List<Map<String, Object>> ret = new ArrayList<>();
       while (result.next()) {
           Map<String, Object> map = new LinkedHashMap<>();
           for (int i = 1; i <= rsmd.getColumnCount(); i++) {
               map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
           }
           ret.add(map);
       }
       result.close();
       st.close();
       connection.close();
   }

   private static void print(ResultSet resultSet) throws SQLException {
       final ResultSetMetaData metaData = resultSet.getMetaData();
       final int columnCount = metaData.getColumnCount();
       while (resultSet.next()) {
           for (int i = 1; ; i++) {
               System.out.print(resultSet.getString(i));
               if (i < columnCount) {
                   System.out.print(", ");
               } else {
                   System.out.println();
                   break;
               }
           }
       }
   }

   private static String createTempJson() throws IOException {
       JSONObject object = new JSONObject();
       object.put("version", "1.0");
       object.put("defaultSchema", "QAKAFKA");
       JSONArray array = new JSONArray();
       JSONObject tmp = new JSONObject();
       tmp.put("name", "QAKAFKA");
       tmp.put("type", "custom");
       tmp.put("factory", "kafka.KafkaSchemaFactory");
       array.add(tmp);
       object.put("schemas", array);
       return object.toJSONString();
   }

} ```

  • 生成臨時的model.json,之前是基於檔案,現在基於text字串,mode=inline模式
  • 設定我的表結構、表名稱、欄位名、欄位型別等,並放置在記憶體中,同時將介面卡查詢出來的資料也放進去table裡面
  • 獲取連線,執行查詢,完美!