8.Flink實時專案之CEP計算訪客跳出

語言: CN / TW / HK

1.訪客跳出明細介紹

首先要識別哪些是跳出行為,要把這些跳出的訪客最後一個訪問的頁面識別出來。那麼就要抓住幾個特徵:

該頁面是使用者近期訪問的第一個頁面,這個可以通過該頁面是否有上一個頁面(last_page_id)來判斷,如果這個表示為空,就說明這是這個訪客這次訪問的第一個頁面。

首次訪問之後很長一段時間(自己設定),使用者沒繼續再有其他頁面的訪問

這第一個特徵的識別很簡單,保留 last_page_id 為空的就可以了。但是第二個訪問的判斷,其實有點麻煩,首先這不是用一條資料就能得出結論的,需要組合判斷,要用一條存在的資料和不存在的資料進行組合判斷。而且要通過一個不存在的資料求得一條存在的資料。更麻煩的他並不是永遠不存在,而是在一定時間範圍內不存在。那麼如何識別有一定失效的組合行為呢?

最簡單的辦法就是 Flink 自帶的 CEP 技術。這個 CEP 非常適合通過多條資料組合來識別某個事件。

使用者跳出事件,本質上就是一個條件事件加一個超時事件的組合。

  • 流程圖

2.程式碼實現

建立任務類UserJumpDetailApp.java,從kafka讀取頁面日誌

``` java import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

/* * @author zhangbao * @date 2021/10/17 10:38 * @desc / public class UserJumpDetailApp { public static void main(String[] args) { //webui模式,需要新增pom依賴 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); // StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment(); //設定並行度 env.setParallelism(4); //設定檢查點 // env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000); // env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail")); // //指定哪個使用者讀取hdfs檔案 // System.setProperty("HADOOP_USER_NAME","zhangbao");

    //從kafka讀取資料來源
    String sourceTopic = "dwd_page_log";
    String group = "user_jump_detail_app_group";
    String sinkTopic = "dwm_user_jump_detail";
    FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
    DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);

    kafkaDs.print("user jump detail >>>");

    try {
        env.execute("user jump detail task");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

} ```

3. flink CEP程式設計

官方文件:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/libs/cep.html

處理流程

1.從kafka讀取日誌資料

2.設定時間語義為事件時間並指定事件時間欄位ts

3.按照mid分組

4.配置CEP表示式

  • 1.第一次訪問的頁面:last_page_id == null

  • 2.第一次訪問的頁面在10秒內,沒有進行其他操作,沒有訪問其他頁面

5.根據表示式篩選流

6.提取命中的資料

  • 設定超時時間標識 timeoutTag
  • flatSelect 方法中,實現 PatternFlatTimeoutFunction 中的 timeout 方法。
  • 所有 out.collect 的資料都被打上了超時標記
  • 本身的 flatSelect 方法因為不需要未超時的資料所以不接受資料。
  • 通過 SideOutput 側輸出流輸出超時資料

7.將跳出資料寫回到kafka

``` java package com.zhangbao.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternFlatSelectFunction; import org.apache.flink.cep.PatternFlatTimeoutFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;

import java.util.List; import java.util.Map;

/* * @author zhangbao * @date 2021/10/17 10:38 * @desc / public class UserJumpDetailApp { public static void main(String[] args) { //webui模式,需要新增pom依賴 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); // StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment(); //設定並行度 env.setParallelism(4); //設定檢查點 // env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000); // env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail")); // //指定哪個使用者讀取hdfs檔案 // System.setProperty("HADOOP_USER_NAME","zhangbao");

    //從kafka讀取資料來源
    String sourceTopic = "dwd_page_log";
    String group = "user_jump_detail_app_group";
    String sinkTopic = "dwm_user_jump_detail";
    FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
    DataStreamSource<String> jsonStrDs = env.addSource(kafkaSource);

    /*//測試資料
    DataStream<String> jsonStrDs = env
     .fromElements(
            "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
            "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",

            "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"home\"},\"ts\":15000} ",

            "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"detail\"},\"ts\":30000} "
    );
    dataStream.print("in json:");*/

    //對讀取到的資料進行結構轉換
    SingleOutputStreamOperator<JSONObject> jsonObjDs = jsonStrDs.map(jsonStr -> JSON.parseObject(jsonStr));

// jsonStrDs.print("user jump detail >>>"); //從flink1.12開始,時間語義預設是事件時間,不需要額外指定,如果是之前的版本,則要按以下方式指定事件時間語義 //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //指定事件時間欄位
    SingleOutputStreamOperator<JSONObject> jsonObjWithTSDs = jsonObjDs.assignTimestampsAndWatermarks(
            WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(
                    new SerializableTimestampAssigner<JSONObject>() {
                        @Override
                        public long extractTimestamp(JSONObject jsonObject, long l) {
                            return jsonObject.getLong("ts");
                        }
                    }
    ));

    //按照mid分組
    KeyedStream<JSONObject, String> ketByDs = jsonObjWithTSDs.keyBy(
            jsonObject -> jsonObject.getJSONObject("common").getString("mid")
    );

    /**
     * flink CEP表示式
     * 跳出規則,滿足兩個條件:
     *  1.第一次訪問的頁面:last_page_id == null
     *  2.第一次訪問的頁面在10秒內,沒有進行其他操作,沒有訪問其他頁面
     */
    Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first")
            .where( // 1.第一次訪問的頁面:last_page_id == null
                new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) throws Exception {
                        String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                        System.out.println("first page >>> "+lastPageId);
                        if (lastPageId == null || lastPageId.length() == 0) {
                            return true;
                        }
                        return false;
                    }
                }
            ).next("next")
            .where( //2.第一次訪問的頁面在10秒內,沒有進行其他操作,沒有訪問其他頁面
                    new SimpleCondition<JSONObject>() {
                        @Override
                        public boolean filter(JSONObject jsonObject) throws Exception {
                            String pageId = jsonObject.getJSONObject("page").getString("page_id");
                            System.out.println("next page >>> "+pageId);
                            if(pageId != null && pageId.length()>0){
                                return true;
                            }
                            return false;
                        }
                    }
            //時間限制模式,10S
            ).within(Time.milliseconds(10000));

    //將cep表示式運用到流中,篩選資料
    PatternStream<JSONObject> patternStream = CEP.pattern(ketByDs, pattern);

    //從篩選的資料中再提取資料超時資料,放到側輸出流中
    OutputTag<String> timeOutTag = new OutputTag<String>("timeOut"){};
    SingleOutputStreamOperator<Object> outputStreamDS = patternStream.flatSelect(
            timeOutTag,
            //獲取超時資料
            new PatternFlatTimeoutFunction<JSONObject, String>() {
                @Override
                public void timeout(Map<String, List<JSONObject>> map, long l, Collector<String> collector) throws Exception {
                    List<JSONObject> first = map.get("first");
                    for (JSONObject jsonObject : first) {
                        System.out.println("time out date >>> "+jsonObject.toJSONString());
                        //所有 out.collect 的資料都被打上了超時標記
                        collector.collect(jsonObject.toJSONString());
                    }
                }
            },
            //獲取未超時資料
            new PatternFlatSelectFunction<JSONObject, Object>() {
                @Override
                public void flatSelect(Map<String, List<JSONObject>> map, Collector<Object> collector) throws Exception {
                    //不超時的資料不提取,所以這裡不做操作
                }
            }
    );

    //獲取側輸出流的超時資料
    DataStream<String> timeOutDs = outputStreamDS.getSideOutput(timeOutTag);
    timeOutDs.print("jump >>> ");

    //將跳出資料寫回到kafka
    timeOutDs.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));

    try {
        env.execute("user jump detail task");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

```

測試資料

將從kafka讀取資料的方式切換成固定資料內容,如下:

``` java //測試資料 DataStream jsonStrDs = env .fromElements( "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ", "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",

            "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"home\"},\"ts\":15000} ",

            "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"detail\"},\"ts\":30000} "
    );
    dataStream.print("in json:");

```

然後從dwm_user_jump_detail主題消費資料

./kafka-console-consumer.sh --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic dwm_user_jump_detail