Fllink實時計算運用(五)Flink Table API & SQL 案例實戰

語言: CN / TW / HK

1. Table API & SQL 實戰運用

  1. 案例說明

    • 功能說明

      通過socket讀取資料來源,進行單詞的統計處理。

    • 實現流程

      • 初始化Table執行環境

      • 轉換操作處理:

        1)以空格進行分割

        2)給每個單詞計數累加1

        3)根據單詞進行分組處理

        4)求和統計

        5)輸出列印資料

      • 執行任務

  2. FlinkTable API 方式實現

    StreamTableApiApplication,程式碼實現:

    //獲取流處理的執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    
    //獲取Table的執行環境
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    //接入資料來源
    DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922);
    
    //對字串進行分詞壓平
    SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            Arrays.stream(line.split(" ")).forEach(out::collect);
        }
    });
    
    //將DataStream轉換成Table物件,欄位名預設的是f0,給定欄位名是word
    Table table = tabEnv.fromDataStream(words, "word");
    
    //按照單詞進行分組聚合操作
    Table resultTable = table.groupBy("word").select("word, sum(1L) as counts");
    
    //在流處理中,資料會源源不斷的產生,需要累加處理,只能採用用toRestractStream
    //        DataStream<WordCount> wordCountDataStream = tabEnv.toAppendStream(resultTable, WordCount.class);
    //        wordCountDataStream.printToErr("toAppendStream>>>");
    
    DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
    wordCountDataStream.printToErr("toRetractStream>>>");
    
    env.execute();
    

    測試驗證:

    開啟socket輸入, 輸入字串:

    [root@flink1 flink-1.11.2]# nc -lk 9922
    
  3. FlinkTable SQL 方式實現

    程式碼實現:

    StreamTableSqlApplication實現類:

    //獲取流處理的執行環境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    
    //獲取Table的執行環境
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    //接入資料來源
    DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922);
    
    //對字串進行分詞壓平
    SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            Arrays.stream(line.split(" ")).forEach(out::collect);
        }
    });
    
    //將DataStream轉換成Table物件,欄位名預設的是f0,給定欄位名是word
    tabEnv.registerDataStream("t_wordcount", words, "word");
    
    //按照單詞進行分組聚合操作
    Table resultTable = tabEnv.sqlQuery("select word,count(1) as counts from t_wordcount group by word");
    
    DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
    wordCountDataStream.printToErr("toRetractStream>>>");
    env.execute();
    

2. Flink SQL 滾動視窗實戰

  1. Flink SQL 視窗說明

    Flink SQL支援的視窗聚合主要是兩種:Window聚合和Over聚合。這裡主要介紹Window聚合。Window聚合支援兩種時間屬性定義視窗:Event Time和Processing Time。每種時間屬性型別支援三種視窗型別:滾動視窗(TUMBLE)、滑動視窗(HOP)和會話視窗(SESSION)

  2. 案例說明

    統計在過去的1分鐘內有多少使用者點選了某個的網頁,可以通過定義一個視窗來收集最近1分鐘內的資料,並對這個視窗內的資料進行計算。

    測試資料:

    使用者名稱 訪問地址 訪問時間
    張三 http://taobao.com/xxx 2021-05-10 10:00:00
    張三 http://taobao.com/xxx 2021-05-10 10:00:10
    張三 http://taobao.com/xxx 2021-05-10 10:00:49
    張三 http://taobao.com/xxx 2021-05-10 10:01:05
    張三 http://taobao.com/xxx 2021-05-10 10:01:58
    李四 http://taobao.com/xxx 2021-05-10 10:02:10
  3. 滾動視窗運用

    滾動視窗(Tumbling windows)要用Tumble類來定義,另外還有三個方法:

    • over:定義視窗長度

    • on:用來分組(按時間間隔)或者排序(按行數)的時間欄位

    • as:別名,必須出現在後面的groupBy中

    實現步驟:

    • 初始化流執行環境

    • 在流模式下使用blink planner

    • 建立使用者點選事件資料

    • 將源資料寫入臨時檔案並獲取絕對路徑

    • 建立表載入使用者點選事件資料

    • 對錶執行SQL查詢,並將結果作為新表檢索

    • Table轉換成DataStream

    • 執行任務

    TumbleUserClickApplication,實現程式碼:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 將源資料寫入臨時檔案並獲取絕對路徑
    String contents =
            "張三,http://taobao.com/xxx,2021-05-10 10:00:00\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:00:10\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:00:49\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:01:05\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:01:58\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:02:10\n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            "  username varchar,\n" +
            "  click_url varchar,\n" +
            "  ts TIMESTAMP(3),\n" +
            "  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
            ") WITH (\n" +
            "  'connector.type' = 'filesystem',\n" +
            "  'connector.path' = '" + path + "',\n" +
            "  'format.type' = 'csv'\n" +
            ")";
    
    tabEnv.sqlUpdate(ddl);
    
    //對錶資料進行sql查詢,並將結果作為新表進行查詢
    String query = "SELECT\n" +
            "  TUMBLE_START(ts, INTERVAL '1' MINUTE),\n" +
            "  TUMBLE_END(ts, INTERVAL '1' MINUTE),\n" +
            "  username,\n" +
            "  COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();
    

    以1分鐘作為時間滾動視窗,水印延遲2秒。

    輸出結果:

    4> 2021-10-10T10:00,2021-10-10T10:01,張三,3
    4> 2021-10-10T10:01,2021-10-10T10:02,張三,2
    4> 2021-10-10T10:02,2021-10-10T10:03,張三,1
    

3. Flink SQL 滑動視窗實戰

  1. 實現步驟

    • 初始化流執行環境

    • 在流模式下使用blink planner

    • 建立使用者點選事件資料

    • 將源資料寫入臨時檔案並獲取絕對路徑

    • 建立表載入使用者點選事件資料

    • 對錶執行SQL查詢,並將結果作為新表檢索

    • Table轉換成DataStream

    • 執行任務

  2. 實現程式碼

    程式碼HopUserClickApplication:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 將源資料寫入臨時檔案並獲取絕對路徑
    String contents =
            "張三,http://taobao.com/xxx,2020-10-10 10:00:00\n" +
                    "張三,http://taobao.com/xxx,2020-10-10 10:00:10\n" +
                    "張三,http://taobao.com/xxx,2020-10-10 10:00:49\n" +
                    "張三,http://taobao.com/xxx,2020-10-10 10:01:05\n" +
                    "張三,http://taobao.com/xxx,2020-10-10 10:01:58\n" +
                    "張三,http://taobao.com/xxx,2020-10-10 10:02:10\n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            "  username varchar,\n" +
            "  click_url varchar,\n" +
            "  ts TIMESTAMP(3),\n" +
            "  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
            ") WITH (\n" +
            "  'connector.type' = 'filesystem',\n" +
            "  'connector.path' = '" + path + "',\n" +
            "  'format.type' = 'csv'\n" +
            ")";
    
    tabEnv.sqlUpdate(ddl);
    
    //對錶資料進行sql查詢,並將結果作為新表進行查詢,每隔30秒,統計一次過去1分鐘的資料
    String query = "SELECT\n" +
            "  HOP_START(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),\n" +
            "  HOP_END(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),\n" +
            "  username,\n" +
            "  COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();
    

    每隔30秒,統計一次過去1分鐘的使用者點選數量。

    輸出結果:

    4> 2021-05-10T09:59:30,2021-05-10T10:00:30,張三,2
    4> 2021-05-10T10:00,2021-05-10T10:01,張三,3
    4> 2021-05-10T10:00:30,2021-05-10T10:01:30,張三,2
    4> 2021-05-10T10:01,2021-05-10T10:02,張三,2
    4> 2021-05-10T10:01:30,2021-05-10T10:02:30,張三,2
    4> 2021-05-10T10:02,2021-05-10T10:03,張三,1
    

4. Flink SQL 會話視窗實戰

  1. 實現步驟

    • 初始化流執行環境

    • 在流模式下使用blink planner

    • 建立使用者點選事件資料

    • 將源資料寫入臨時檔案並獲取絕對路徑

    • 建立表載入使用者點選事件資料

    • 對錶執行SQL查詢,並將結果作為新表檢索

    • Table轉換成DataStream

    • 執行任務

  2. 程式碼實現:

    程式碼:SessionUserClickApplication

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 將源資料寫入臨時檔案並獲取絕對路徑
    String contents =
            "張三,http://taobao.com/xxx,2021-05-10 10:00:00\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:00:10\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:00:49\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:01:05\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:01:58\n" +
                    "張三,http://taobao.com/xxx,2021-05-10 10:02:10\n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            "  username varchar,\n" +
            "  click_url varchar,\n" +
            "  ts TIMESTAMP(3),\n" +
            "  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
            ") WITH (\n" +
            "  'connector.type' = 'filesystem',\n" +
            "  'connector.path' = '" + path + "',\n" +
            "  'format.type' = 'csv'\n" +
            ")";
    
    tabEnv.sqlUpdate(ddl);
    
    //對錶資料進行sql查詢,並將結果作為新表進行查詢,每隔30秒統計一次資料
    String query = "SELECT\n" +
            "  SESSION_START(ts, INTERVAL '30' SECOND),\n" +
            "  SESSION_END(ts, INTERVAL '30' SECOND),\n" +
            "  username,\n" +
            "  COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY SESSION (ts, INTERVAL '30' SECOND), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();
    

    每隔30秒統計一次使用者點選資料.

    輸出結果:

    4> 2021-05-10T10:00,2021-05-10T10:00:40,張三,2
    4> 2021-05-10T10:00:49,2021-05-10T10:01:35,張三,2
    4> 2021-05-10T10:01:58,2021-05-10T10:02:40,張三,2
    

本文由mirson創作分享,如需進一步交流,請加QQ群:19310171或訪問www.softart.cn