Fllink實時計算運用(三)Flink Table API運用

語言: CN / TW / HK

1. 什麼是Table API & SQL

Table API& SQL 是一種關係型API,使用者可以像操作MySQL資料庫表一樣的操作資料,而不需要寫Java程式碼完成flink function,更不需要手工的優化Java程式碼調優。SQL對一個非程式設計師操作來講,學習成本很低,如果一個系統提供SQL支援,將很容易被使用者接受。

總結來說,關係型API的好處:

  1. 關係型API是宣告式的

  2. 查詢能夠被有效的優化

  3. 查詢可以高效的執行

  4. “Everybody” knows SQL

Flink本身是批流統一的處理框架,所以Table API和SQL,就是批流統一的上層處理API。

Table API& SQL 是流處理和批處理統一的API層,如下圖:

file

  • flink在runtime層是統一的,因為flink將批任務看做流的一種特例來執行

  • 在API層,flink為批和流提供了兩套API(DataSet和DataStream)

  • Table API是一套內嵌在Java和Scala語言中的查詢API,它允許我們以非常直觀的方式,組合來自一些關係運算符的查詢(比如select、filter和join)。

  • 對於Flink SQL,就是直接可以在程式碼中寫SQL,來實現一些查詢(Query)操作。Flink的SQL支援,基於實現了SQL標準的Apache Calcite(Apache開源SQL解析工具)。

無論輸入是批輸入還是流式輸入,在這兩套API中,指定的查詢都具有相同的語義,得到相同的結果。

2. 批處理案例實現

  1. 實現說明

    以批處理方式,載入自定義資料,並註冊為table表,然後統計每個人名出現的次數,並打印出來。

    WordCount jack 2 WordCount mike 1

  2. 實現步驟

    1. 獲取批處理執行環境

    2. 獲取Table執行環境

    3. 載入自定義資料來源資訊

    4. 將外部資料構建成表

    5. 使用table方式查詢資料

    6. 執行任務,列印結果

  3. 程式碼實現

    TableApi實現方式:

    /**
     * Table Api 實現方式
     */
    public static void tableApi() throws Exception{
        //1. 初始化執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
    
        //2. 編寫資料來源資訊
        DataSet<WordCount> input = env.fromElements(
                new WordCount("mike", 1),
                new WordCount("jack", 1),
                new WordCount("jack", 1));
    
        //3. 將dataSet轉換成Table物件
        Table table = tEnv.fromDataSet(input);
    
        //4. 對word進行分組,然後查詢指定的欄位
        Table filterTable = table
                .groupBy("word")
                .select("word, frequency.sum as frequency");
    
        //5. 將DataSet轉換成Table物件
        DataSet<WordCount> result = tEnv.toDataSet(filterTable, WordCount.class);
    
        //6. 列印輸出結果
        result.print();
    
    }
    

    SQL實現方式:

    /**
     * Table Api 實現方式
     */
    public static void flinkSQL() throws Exception{
        //1. 初始化執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        //2. 編寫資料來源資訊
        DataSet<WordCount> input = env.fromElements(
                new WordCount("mike", 1),
                new WordCount("jack", 1),
                new WordCount("jack", 1));
    
        //3. 將dataSet轉換成Table物件
        Table table = tEnv.fromDataSet(input);
        //4. 建立臨時檢視
        tEnv.createTemporaryView("WordCount", input, "word, frequency");
        //5. 執行sql查詢
        Table tableSQL = tEnv.sqlQuery(
                "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
        //6. 將DataSet轉換成Table物件
        DataSet<WordCount> result = tEnv.toDataSet(tableSQL, WordCount.class);
        //7. 列印輸出結果
        result.print();
    }
    

3. 流處理案例實現

  1. 實現說明

    採用Flink流式環境, 載入多個集合資料, 轉換為Table, 並將Table轉換為DataStream,採用SQL方式進行合併處理。

  2. 實現步驟

    1. 獲取流處理環境

    2. 設定並行度

    3. 獲取Table執行環境

    4. 載入集合資料

    5. 轉換DataStream為Table

    6. 將DataStream註冊成Table

    7. 使用union all將兩個表進行關聯

    8. 執行任務,列印輸出

  3. 程式碼實現

    StreamSqlApplication類:

    //1. 初始化流式開發環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
    //2. 建立第一個訂單流
    DataStream<Order> orderA = env.fromCollection(Arrays.asList(
            new Order(1L, "beer", 3),
            new Order(1L, "diaper", 4),
            new Order(3L, "rubber", 2)));
    
    //3. 建立第二個訂單流
    DataStream<Order> orderB = env.fromCollection(Arrays.asList(
            new Order(2L, "pen", 3),
            new Order(2L, "rubber", 3),
            new Order(4L, "beer", 1)));
    
    //4. 將DataStream轉換成Table(可以將DataStream轉換成Table)
    Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
    
    //5. 將DataStream註冊成Table(也可以將DataStream註冊成Table)
    tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
    
    //6. 使用union all將兩個表進行關聯
    Table result = tEnv.sqlQuery("SELECT * FROM " + tableA +
            " UNION ALL " +
            " SELECT * FROM OrderB ");
    
    //7. 將資料流進行輸出
    tEnv.toAppendStream(result, Order.class).print().setParallelism(1);
    
    //8. 執行任務
    env.execute();
    

4. Flink 1.9 Table架構

file

在新的架構中,有兩個查詢處理器:

  1. Flink Query Processor,也稱作Old Planner

  2. Blink Query Processor,也稱作Blink Planner

查詢處理器是 Planner 的具體實現,通過parser、optimizer、codegen(程式碼生成技術)等流程將 Table API & SQL作業轉換成 Flink Runtime 可識別的 Transformation DAG,最終由 Flink Runtime 進行作業的排程和執行。

Flink 的查詢處理器針對流計算和批處理作業有不同的分支處理,流計算作業底層的 API 是 DataStream API, 批處理作業底層的 API 是 DataSet API。

Blink 的查詢處理器則實現流批作業介面的統一,底層的 API 都是Transformation,這就意味著我們和Dataset完全沒有關係了。

5. Flink Planner 與 Blink Planner的差異

從Flink1.9開始,提供了兩種不同的 planner 實現:Blink planner 和 1.9之前舊的 Old Planner。Planner 負責將運算元轉換為 Flink 可執行的、優化之後的 Flink job。這兩個 Planner 擁有不同的優化規則和 runtime 類。

  1. 在模型角度,Flink Planner 沒有考慮流計算和批處理的統一,在底層會分別轉換到到 DataStream API 和 DataSet API 上。而 Blink Planner 將批資料集看作 bounded DataStream (有界流式資料) ,流計算作業和批處理作業最終都會轉換到 Transformation API 上。

  2. 在架構角度,Blink Planner針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,兩者大部分的程式碼和優化邏輯都是共用的。 Old Planner 針對批處理和流計算的程式碼實現的是完全獨立的兩套體系,基本沒有實現程式碼和優化邏輯複用。

除了模型和架構上的優點外,Blink Planner 在阿里巴巴集團內部的海量業務場景下沉澱了許多實用功能,集中在三個方面:

  1. Blink Planner 對程式碼生成機制做了改進、對部分運算元進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的資料傾斜優化等新功能。

  2. Blink Planner 的優化策略是基於公共子圖的優化演算法,包含了基於成本的優化(CBO)和基於規則的優化(CRO)兩種策略,優化更為全面。同時,Blink Planner 支援從 catalog 中獲取資料來源的統計資訊,這對CBO優化非常重要。

  3. Blink Planner 提供了更多的內建函式,更標準的 SQL 支援

整體看來,Blink 查詢處理器在架構上更為先進,功能上也更為完善。出於穩定性的考慮,Flink 1.9 預設依然使用 Flink Planner,使用者如果需要使用 Blink Planner,可以在作業中顯式指定。

6. Flink Planner 用法

  1. 匯入依賴

    <!-- 表程式計劃程式和執行時。這是1.9版本之前Flink的唯一計劃者。仍然是推薦的。-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
  2. 程式碼片段

    流式查詢:

    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    // Flink 流式查詢
    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
    // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings)
    
    

    批資料查詢:

    //匯入包 
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    // Flink 批資料查詢
    ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
    

7. Blink Planner 用法

  1. 匯入依賴

    <!-- 使用blink執行計劃的時候需要匯入這個包-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
  2. 程式碼片段

    流式查詢:

    //匯入包
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    // Flink 流式查詢
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
    // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
    
    

    批資料查詢:

    // Flink 批資料查詢
    EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
    TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
    

    說明: 如果作業需要執行在叢集環境,打包時將 Blink Planner 相關依賴的 scope 設定為 provided,表示這些依賴由叢集環境提供。這是因為 Flink 在編譯打包時,已經將 Blink Planner 相關的依賴打包,不需要再次引入,避免衝突。


本文由mirson創作分享,如需進一步交流,請加QQ群:19310171或訪問www.softart.cn