指標統計:基於流計算Oceanus(Flink) 實現實時UVPV統計

語言: CN / TW / HK

導語 |  最近梳理了一下如何用Flink來實現實時的UV、PV指標的統計,並和公司內微視部門的同事交流。然後針對該場景做了簡化,並發現使用Flink SQL來實現這些指標的統計會更加便捷。

一、解決方案描述

(一)概述

本方案結合本地自建Kafka叢集、騰訊雲流計算Oceanus(Flink)、雲資料庫Redis對部落格、購物等網站UV、PV指標進行實時視覺化分析。分析指標包含網站的獨立訪客數量(UV)、產品的點選量(PV)、轉化率(轉化率=成交次數/點選量)等。

相關概念介紹

UV(Unique Visitor):獨立訪客數量。訪問您網站的一臺客戶端為一個訪客,如使用者對同一頁面訪問了5次,那麼該頁面的UV只加1,因為UV統計的是去重後的使用者數而不是訪問次數。

PV(Page View):點選量或頁面瀏覽量。如使用者對同一頁面訪問了5次,那麼該頁面的PV會加5。

(二 )方案架構及優勢

根據以上實時指標統計場景,設計瞭如下架構圖:

涉及產品列表:

  • 本地資料中心(IDC)的自建Kafka叢集

  • 私有網路VPC

  • 專線接入/雲聯網/VPN連線/對等連線

  • 流計算Oceanus (Flink)

  • 雲資料庫Redis

二、前置準備

購買所需的騰訊雲資源,並打通網路。自建的Kafka叢集需根據叢集所在區域需採用VPN連線、專線連線或對等連線的方式來實現網路互通互聯。

(一)建立私有網路VPC

私有網路(VPC)是一塊在騰訊雲上自定義的邏輯隔離網路空間,在構建Oceanus叢集、Redis元件等服務時選擇的網路建議選擇同一個VPC,網路才能互通。否則需要使用對等連線、NA閘道器、VPN等方式打通網路。私有網路建立步驟請參考 幫助文件

(https://cloud.tencent.com/document/product/215/36515)

(二)建立Oceanus叢集

流計算Oceanus是大資料產品生態體系的實時化分析利器,是基於Apache Flink構建的具備一站開發、無縫連線、亞秒延時、低廉成本、安全穩定等特點的企業級實時大資料分析平臺。流計算Oceanus以實現企業資料價值最大化為目標,加速企業實時化數字化的建設程序。

在Oceanus控制檯的【叢集管理->【新建叢集】頁面建立叢集,選擇地域、可用區、VPC、日誌、儲存,設定初始密碼等。VPC及子網使用剛剛建立好的網路。建立完後Flink的叢集如下:

(三)建立Redis叢集

Redis控制檯 的【新建例項】頁面建立叢集,選擇與其他元件同一地域,同區域的同一私有網路VPC,這裡還選擇同一子網。

Redis控制檯:https://c onsol e.cloud.tencent.com/redis#/

(四)配置自建Kafka叢集

  • 修改自建Kafka叢集配置

自建Kafka叢集連線時bootstrap-servers引數常常使用hostname而不是ip來連線。但用自建Kafka叢集連線騰訊雲上的Oceanus叢集為全託管叢集,Oceanus叢集的節點上無法解析自建叢集的hostname與ip的對映關係,所以需要改監聽器地址由hostname為ip地址連線的形式。

將config/server.properties配置檔案中advertised.listeners引數配置為ip地址。示例:

# 0.10.X及以後版本
advertised.listeners=PLAINTEXT://10.1.0.10:9092
# 0.10.X之前版本
advertised.host.name=PLAINTEXT://10.1.0.10:9092

修改後重啟Kafka叢集。

注意: 若在雲上使用到自建的zookeeper地址,也需要將zk配置中的hostname修改ip地址形式。

  • 模擬傳送資料到topic

本案例使用topic為topic為uvpv-demo。

  • Kafka客戶端

進入自建Kafka叢集節點,啟動Kafka客戶端,模擬傳送資料。

./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo
>{"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
  • 使用指令碼傳送

指令碼一: Java程式碼參考:

https://cloud.tencent.com/document/product/597/54834

指令碼二:Python指令碼。

參考之前案例中Python指令碼進行適當修改即可: 《影片直播:實時資料視覺化分析》

(五)打通自建IDC叢集到騰訊雲網絡通訊

自建Kafka叢集聯通騰訊雲網絡,可通過以下前3種方式打通自建IDC到騰訊雲的網路通訊。

  • 專線接入

https://cloud.tencent.com/document/product/216 適用於本地資料中心IDC與騰訊雲網絡打通。

  • 雲聯網

https://cloud.tencent.com/document/product/877 適用於本地資料中心IDC與騰訊雲網絡打通,也可用於雲上不同地域間私有網路VPC打通。

  • VPN連線

https://cloud.tencent.com/document/product/554 適用於本地資料中心IDC與騰訊雲網絡打通。

  • 對等連線+NAT閘道器

對等連線:

https://cloud.tencent.com/document/product/553

NAT閘道器:

https://cloud.tencent.com/document/product/552 適合雲上不同地域間私有網路VPC打通,不適合本地IDC到騰訊雲網絡

本方案中使用了VPN連線的方式,實現本地IDC和雲上網路的通訊。參考連結: 建立VPC到IDC的連線 (路由表)

(https://cloud.tencent.com/document/product/554/52854)

根據方案繪製了下面的網路架構圖:

三、方案實現

(一)業務目標

利用流計算Oceanus實現網站UV、PV、轉化率指標的實時統計,這裡只列取以下3種統計指標:

網站的獨立訪客數量UV。Oceanus處理後在Redis中通過set型別儲存獨立訪客數量,同時也達到了對同一訪客的資料去重的目的。

網站商品頁面的點選量PV。Oceanus處理後在Redis中使用list型別儲存頁面點選量。

轉化率(轉化率=成交次數/點選量)。Oceanus處理後在Redis中用String儲存即可。

(二)源資料格式

Kafka topic:uvpv-demo(瀏覽記錄)

Kafka內部採用json格式儲存,資料格式如下:

# 瀏覽記錄
{
"record_type":0, # 0 表示瀏覽記錄
"user_id": 6,
"client_ip": "100.0.0.6",
"product_id": 101,
"create_time": "2021-09-06 16:00:00"
}


# 購買記錄
{
"record_type":1, # 1 表示購買記錄
"user_id": 6,
"client_ip": "100.0.0.8",
"product_id": 101,
"create_time": "2021-09-08 18:00:00"
}

(三)編寫Flink SQL作業

示例中實現了UV、PV和轉化率3個指標的獲取邏輯,並寫入Sink端。

  • 定義Source

CREATE TABLE `input_web_record` (
`record_type` INT,
`user_id` INT,
`client_ip` VARCHAR,
`product_id` INT,
`create_time` TIMESTAMP,
`times` AS create_time,
WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
'connector' = 'kafka', -- 可選 'kafka','kafka-0.11'. 注意選擇對應的內建 Connector
'topic' = 'uvpv-demo',
'scan.startup.mode' = 'earliest-offset',
--'properties.bootstrap.servers' = '82.157.27.147:9092',
'properties.bootstrap.servers' = '10.1.0.10:9092',
'properties.group.id' = 'WebRecordGroup', -- 必選引數, 一定要指定 Group ID
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 結構解析異常
'json.fail-on-missing-field' = 'false' -- 如果設定為 true, 則遇到缺失欄位會報錯 設定為 false 則缺失欄位設定為 null
);
  • 定義Sink

-- UV sink
CREATE TABLE `output_uv` (
`userids` STRING,
`user_id` STRING
) WITH (
'connector' = 'redis',
'command' = 'sadd', -- 使用集合儲存uv(支援命令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis連線地址,叢集模式多個節點使用'',''分隔。
-- 'additional-key' = '<key>', -- 用於指定hset和zadd的key。hset、zadd必須設定。
'password' = 'yourpassword'
);


-- PV sink
CREATE TABLE `output_pv` (
`pagevisits` STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
'connector' = 'redis',
'command' = 'lpush', -- 使用列表儲存pv(支援命令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis連線地址,叢集模式多個節點使用'',''分隔。
-- 'additional-key' = '<key>', -- 用於指定hset和zadd的key。hset、zadd必須設定。
'password' = 'yourpassword'
);


-- 轉化率 sink
CREATE TABLE `output_conversion_rate` (
`conversion_rate` STRING,
`rate` STRING
) WITH (
'connector' = 'redis',
'command' = 'set', -- 使用列表儲存pv(支援命令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis連線地址,叢集模式多個節點使用'',''分隔。
-- 'additional-key' = '<key>', -- 用於指定hset和zadd的key。hset、zadd必須設定。
'password' = 'yourpassword'
);
  • 業務邏輯

-- 加工得到 UV 指標,統計所有時間內的 UV
INSERT INTO output_uv
SELECT
'userids' AS `userids`,
CAST(user_id AS string) AS user_id
FROM input_web_record ;


-- 加工並得到 PV 指標,統計每 10 分鐘內的 PV
INSERT INTO output_pv
SELECT
'pagevisits' AS pagevisits,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;


-- 加工並得到轉化率指標,統計每 10 分鐘內的轉化率
INSERT INTO output_conversion_rate
SELECT
'conversion_rate' AS conversion_rate,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;

(四)結果驗證

通常情況,會通過Web網站來展示統計到的UV、PV指標,這裡為了簡單直接在Redis控制檯 (https://console.cloud.tencent.com/redis#/) 登入進行查詢:

  • userids: 儲存UV

  • pagevisits: 儲存PV

  • conversion_rate: 儲存轉化率,即購買商品次數/總頁面點選量。

四、總結

通過自建Kafka叢集採集資料,在流計算Oceanus (Flink) 中實時進行欄位累加、視窗聚合等操作,將加工後的資料儲存在雲資料庫Redis,統計到實時重新整理的UV、PV等指標。這個方案在Kafka json格式設計時為了簡便易懂做了簡化處理,將瀏覽記錄和產品購買記錄都放在了同一個topic中,重點通過打通自建IDC和騰訊雲產品間的網路來展現整個方案。針對超大規模的UV去重,微視的同事採用了Redis hyperloglog方式來實現UV統計。相比直接使用set型別方式有極小的記憶體空間佔用的優點,詳情見連結:

https://cloud.tencent.com/developer/article/1889162

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

:point_down: 擊下方 「閱讀原文」 ,瞭解騰訊雲流計算Oceanus更多資訊~