Flink x Zeppelin ,Hive Streaming 實戰解析

語言: CN / TW / HK

Flink 1.11 正式釋出已經三週了,其中最吸引我的特性就是 Hive Streaming。正巧 Zeppelin-0.9-preview2 也在前不久釋出了,所以就寫了一篇 Zeppelin 上的 Flink Hive Streaming 的實戰解析。本文主要從以下幾部分跟大家分享:

  • Hive Streaming 的意義

  • Checkpoint & Dependency

  • 寫入 Kafka

  • Hive Streaming Sink

  • Hive Streaming Source

  • Hive Temporal Table

Hive Streaming 的意義

很多同學可能會好奇,為什麼 Flink 1.11 中,Hive Streaming 的地位這麼高?它的出現,到底能給我們帶來什麼? 

其實在大資料領域,一直存在兩種架構  Lambda  和  Kappa:

  • Lambda  架構——流批分離,靜態資料通過定時排程同步到 Hive 數倉,實時資料既會同步到 Hive,也會被實時計算引擎消費,這裡就引出了一點問題。

    • 資料口徑問題

    • 離線計算產出延時太大

    • 資料冗餘儲存

  • Kappa  架構——全部使用實時計算來產出資料,歷史資料通過回溯訊息的消費位點計算,同樣也有很多的問題,畢竟沒有一勞永逸的架構。

    • 訊息中介軟體無法保留全部歷史資料,同樣資料都是行式儲存,佔用空間太大

    • 實時計算計算曆史資料力不從心

    • 無法進行 Ad-Hoc 的分析

為了解決這些問題,行業內推出了實時數倉,解決了大部分痛點,但是還是有些地方力不從心。比如涉及到歷史資料的計算怎麼辦?我想做 Ad-Hoc 的分析又怎麼玩?所以行業內現在都是實時數倉與離線數倉並行存在,而這又帶來了更多的問題:模型需要多份、資料產出不一致、歷史資料的計算等等 。

而 Hive Streaming 的出現就可以解決這些問題!再也不用多套模型了;也不需要同一個指標因為涉及到歷史資料,寫一遍實時 SQL 再寫一遍離線 SQL;Ad-Hoc 也能做了,怎麼做?讀 Hive Streaming 產出的表就行!

接下來,讓我們從引數配置開始,接著流式的寫入 Hive,再到流式的讀取 Hive 表,最後再 Join 上 Hive 維表吧。這一整套流程都體驗後,想必大家對 Hive Streaming 一定會有更深入的瞭解,更能夠體會到它的作用。

Checkpoint & Dependency

因為只有在完成 Checkpoint 之後,檔案才會從 In-progress 狀態變成 Finish 狀態,所以,我們需要合理的去配置 Checkpoint,在 Zeppelin 中配置 Checkpoint 很簡單。

%flink.conf


# checkpoint 配置


pipeline.time-characteristic EventTime

execution.checkpointing.interval 120000

execution.checkpointing.min-pause 60000

execution.checkpointing.timeout 60000

execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION


# 依賴jar包配置


flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0

又因為我們需要從 Kafka 中讀取資料,所以將 Kafka 的依賴也加入進去了。

寫入Kafka

我們的資料來自於天池資料集,是以 CSV 的格式存在於本地磁碟,所以需要先將他們寫入 Kafka。

先建一下 CSV Source 和 Kafka Sink 的表:

%flink.ssql

SET table.sql-dialect=default;

DROP TABLE IF EXISTS source_csv;

CREATE TABLE source_csv (

user_id string,

theme_id string,

item_id string,

leaf_cate_id string,

cate_level1_id string,

clk_cnt int,

reach_time string

) WITH (

'connector' = 'filesystem',

'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv',

'format' = 'csv'

)

%flink.ssql

SET table.sql-dialect=default;

DROP TABLE IF EXISTS kafka_table;

CREATE TABLE kafka_table (

user_id string,

theme_id string,

item_id string,

leaf_cate_id string,

cate_level1_id string,

clk_cnt int,

reach_time string,

ts AS localtimestamp,

WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'theme_click_log',

'properties.bootstrap.servers' = '10.70.98.1:9092',

'properties.group.id' = 'testGroup',

'format' = 'json',

'scan.startup.mode' = 'latest-offset'


)

因為 註冊的表即可以讀又可以寫,於是我在建表時將 Watermark 加上了;又因為源資料中的時間戳已經很老了,所以我這裡採用當前時間減去5秒作為我的 Watermark。

大家可以看到,我在語句一開始指定了 SQL 方言為 Default,這是為啥呢?還有別的方言嗎?別急,聽我慢慢說。

其實在之前的版本,Flink 就已經可以和 Hive 打通,包括可以把表建在 Hive 上,但是很多語法和 Hive 不相容,包括建的表在 Hive 中也無法檢視,主要原因就是方言不相容。所以,在 Flink 1.11 中,為了減少學習成本(語法不相容),可以用 DDL 建 Hive 表並在 Hive 中查詢,Flink 支援了方言,預設的就是 Default 了,就和之前一樣,如果想建 Hive 表,並支援查詢,請使用 Hive 方言,具體可以參考下方連結。

Hive 方言:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html

再把資料從 CSV 中讀取後寫入 Kafka。

%flink.ssql(type=update)


insert into kafka_table select * from source_csv ;

再瞄一眼 Kafka,看看資料有沒有被灌進去:

看來沒問題,那麼接下來讓我們寫入 Hive。

Hive Streaming Sink

建一個Hive Sink Table,記得將方言切換到 Hive,否則會有問題。

%flink.ssql

SET table.sql-dialect=hive;

DROP TABLE IF EXISTS hive_table;

CREATE TABLE hive_table (

user_id string,

theme_id string,

item_id string,

leaf_cate_id string,

cate_level1_id string,

clk_cnt int,

reach_time string

) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES (


'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',

'sink.partition-commit.trigger'='partition-time',

'sink.partition-commit.delay'='1 min',

'sink.partition-commit.policy.kind'='metastore,success-file'


);

引數給大家稍微解釋一下:

  • partition.time-extractor.timestamp-pattern :分割槽時間抽取器,與 DDL 中的分割槽欄位保持一致;

  • sink.partition-commit.trigger :分割槽觸發器型別,可選 process-time 或partition-time。process-time:不需要上面的引數,也不需要水印,噹噹前時間大於分割槽建立時間 +sink.partition-commit.delay 中定義的時間,提交分割槽;partition-time:需要 Source 表中定義 watermark,當 watermark > 提取到的分割槽時間 +sink.partition-commit.delay 中定義的時間,提交分割槽;

  • sink.partition-commit.delay :相當於延時時間;

  • sink.partition-commit.policy.kind :怎麼提交,一般提交成功之後,需要通知 metastore,這樣 Hive 才能讀到你最新分割槽的資料;如果需要合併小檔案,也可以自定義 Class,通過實現 PartitionCommitPolicy 介面。

接下來讓我們把資料插入剛剛建立的 Hive Table:

%flink.ssql


insert into hive_table select user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table

讓程式再跑一會兒~我們先去倒一杯 95 年的 Java:coffee:️ 。

然後再看看我們的 HDFS,看看路徑下的東西。

大家也可以用 Hive 自行查詢看看,我呢就先賣個關子,一會兒用 Hive Streaming 來讀資料。

Hive Streaming Source

因為 Hive 表上面已經建立過了,所以這邊讀資料的時候直接拿來用就行了,不同的地方是需要使用 Table Hints 去覆蓋引數。 

Hive Streaming Source 最大的不足是,無法讀取已經讀取過的分割槽下新增的檔案。簡單來說就是,讀過的分割槽,就不會再讀了。看似很坑,不過仔細想想,這樣才符合流的特性。

照舊給大家說一下引數的意思:

  • stream-source.enable :顯而易見,表示是否開啟流模式。

  • stream-source.monitor-interval :監控新檔案/分割槽產生的間隔。

  • stream-source.consume-order :可以選 create-time 或者 partition-time;create-time 指的不是分割槽建立時間,而是在 HDFS 中檔案/資料夾的建立時間;partition-time 指的是分割槽的時間;對於非分割槽表,只能用 create-time。官網這邊的介紹寫的有點模糊,會讓人誤以為可以查到已經讀過的分割槽下新增的檔案,其實經過我的測試和翻看原始碼發現並不能。

  • stream-source.consume-start-offset :表示從哪個分割槽開始讀。

光說不幹假把式,讓我們撈一把資料看看~

SET 那一行得帶著,不然無法使用 Table Hints。

Hive Temporal Table

看完了 Streaming Source 和 Streaming Sink,讓我們最後再試一下 Hive 作為維表吧。

其實用 Hive 維表很簡單,只要是在 Hive 中存在的表,都可以當做維表使用,引數完全可以用 Table Hints 來覆蓋。

  • lookup.join.cache.ttl :表示快取時間;這裡值得注意的是,因為 Hive 維表會把維表所有資料快取在 TM 的記憶體中,如果維表量很大,那麼很容易就 OOM;如果 ttl 時間太短,那麼會頻繁的載入資料,效能會有很大影響。

因為是 LEFT JOIN,所以維表中不存在的資料會以 NULL 補全。

再看一眼 DAG 圖:

大家看一下畫框的地方,能看到這邊是使用的維表關聯 LookupJoin。

如果大家 SQL 語句寫錯了,丟了 for system_time as of a.p,那麼 DAG 圖就會變成這樣:

這種就不是維表 JOIN 其實更像是流和批在 JOIN。

寫在最後

Hive Streaming 的完善意味著打通了流批一體的最後一道壁壘,既可以做到歷史資料的 OLAP 分析,又可以實時吐出結果,這無疑是 ETL 開發者的福音,想必接下來的日子,會有更多的企業完成他們實時數倉的建設。

參考文件:

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/

[2]https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md

Note 下載:

https://github.com/lonelyGhostisdog/flinksql/blob/master/src/main/resources/Flink%20on%20Zeppelin/Hive%20Streaming%20Test.zpln

最後,給大家介紹一下 Flink on Zeppelin 的釘釘群,大家有問題可以在裡面討論,Apache Zeppelin PMC 簡鋒大佬也在裡面,有問題可以直接在釘群中提問交流~

作者介紹:

狄傑,蘑菇街資深資料專家,負責蘑菇街實時計算平臺 。目前 Focus 在 Flink on Zeppelin,Apache Zeppelin Contributor。

福利來了   

Apache Flink 極客挑戰賽

萬眾矚目的第二屆 Apache Flink 極客挑戰賽來啦!本次大賽全面升級,重量級助陣嘉賓專業指導,強大的資源配置供你發揮創意,還有 30w 豐厚獎金等你帶走~聚焦  Flink 與 AI 技術的應用實踐,挑戰疫情防控的世界級難題,你準備好了麼?

(點選圖片可瞭解更多大賽資訊)

點選「 閱讀 原文 」即可報名