實時即未來,大數據項目車聯網之原始數據實時ETL任務消費數據策略(3)

語言: CN / TW / HK

theme: smartblue

持續創作,加速成長!這是我參與「掘金日新計劃 · 10 月更文挑戰」的第3天,點擊查看活動詳情

實時ETL業務開發

1 原始數據實時ETL分析

根據kafka集羣中的數據進行實時ETL過濾,對數據進行劃分,並將劃分的數據分別落地到不同的數據庫中。

image-20221007105927589

2 實時ETL開發流程分析

原始數據實時ETL,讀取kafka中的原始json數據,解析json數據,flink實時邏輯開發,數據落地到hive與hbase

image-20221007110005934

l 實現步驟:

原始數據實時ETL流式任務創建

原始數據實時ETL任務設置

定義任務Kakfa數據源信息(topic,group,key、value序列化信息等)

任務添加Kafka數據源,消費數據

原始數據json解析

邏輯處理,區分原始數據中的解析失敗的數據和解析成功的數據

數據落地HDFSSink,創建Hive表與HDFS數據進行關聯

定時任務腳本

image-20221007110019679

3 解析工具類引入

解析工具類創建的程序包路徑: cn. maynor.streaming.utils

1 日期處理工具

l 在程序包下創建日期工具類 DateUtil .java

l 定義時間處理的枚舉類

l 日期處理工具,包含:字符串轉日期,獲得當前日期、日期格式化等處理

直接獲得當前日期,格式:“yyyy-MM-dd HH:mm:ss”

直接獲得當前日期,格式:”yyyyMMdd”

字符串日期格式轉換,傳入參數格式:“yyyy-MM-dd”,轉成Date類型

字符串日期格式轉換,傳入參數格式:“yyyy-MM-dd HH:mm:ss”,轉成Date類型

字符串日期格式轉換,傳入參數格式:”yyyy-MM-dd HH:mm:ss“,轉成”yyyyMMdd”格式

l 測試日期工具類

2 配置文件加載工具

l 配置文件加載工具,加載key在conf.properties文件中對應的值

l 在程序包下創建日期工具類 ConfigLoader

n ClassLoader(類加載器),加載conf.properties

通過Properties的load方法加載InputStream

編寫方法實現獲得string的key值

編寫方法實現獲得int的key值

3 字符串處理工具

後續在使用hbase的rowkey生成的時候,為了避免熱點問題,將字符串進行反轉,因此創建字符串反轉方法

l 在程序包下創建日期工具類 StringUtil .java

l 定義字符串反轉的方法

4 JSON解析工具

Json解析工具,即編寫原始數據json解析工具類,使用第一章json解析中的第二種方式org.json解析:原始數據\sourcedata.txt

l 編寫json解析對象:ItcastDataObj對象

n json的key作為對象的屬性

擴展字段

類型轉換、構造方法、toString方法等

講義關聯資料\ItcastDataObj屬性.md

講義關聯資料\ItcastDataObj方法.md

l Json解析工具類: JsonParseUtil

講義關聯資料\JsonParseUtil.java

區分出來正確數據或者異常數據

Vin不為空且 terminalTime(事件時間)不為空,即正常數據,反之異常數據

為擴展字段進行賦值操作:errorData、terminalTimeStamp

l 工具類測試方法: JsonParseUtilTest

img

4 原始數據 實時 ETL任務設置

l Task任務創建所在的程序包: cn. maynor.streaming.task

l 創建task: KafkaSourceDataTask

img

l 原始數據實時ETL任務編寫,流任務設置檢查點、任務重啟策略、kafka分區發現、數據積壓

checkpoint

任務重啟策略

kafka分區發現

數據積壓

1 checkpoint配置

l 選擇合適的Checkpoint存儲方式

l CheckPoint存儲方式存在三種

官方文檔:https://ci.apache.org/projects/flink/flink-docs-release- 10/ops/state/state_backends.html

n MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend

服務器中配置默認狀態存儲路徑:flink-conf.yaml

```bash

The backend that will be used to store operator state checkpoints

state.backend: filesystem

Directory for storing checkpoints 服務器環境存儲多個掛載的本地磁盤為最優

prod envirment file:///data/flink/checkpoints

state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints ```

不同 StateBackend 之間的性能以及安全性是有很大差異的。通常情況下,在代碼中設置檢查點存儲,MemoryStateBackend 適合應用於測試環境,線上環境則最好選擇存在掛在存儲在磁盤上的FsStateBackend、RocksDBStateBackend。

這有兩個原因:首先,RocksDBStateBackend 是外部存儲,其他兩種 Checkpoint 存儲方式都是 JVM 堆存儲。 受限於 JVM 堆內存的大小,Checkpoint 狀態大小以及安全性可能會受到一定的制約;

其次,RocksDBStateBackend 支持增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀態。 與完整檢查點相比,增量檢查點可以顯著縮短 checkpointing 時間,但代價是需要更長的恢復時間。

“state.checkpoints.dir”參數來指定所有的checkpoints數據和元數據存儲的位置

java // 本地訪問hdfs設置系統屬性 System.setProperty("HADOOP_USER_NAME", "root"); // 創建flink的流式環境、設置任務的檢查點(數據存儲hdfs)、設置分區發現、設置任務的重啟策略、數據積壓內部解決策略 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // 設置流式數據的參照時間 ProcessingTime:事件被處理時機器的系統時間;IngestionTime:事件進入flink事件;EventTime:事件數據中某個字段的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2.checkpoint配置 開啟檢查點,設置時間間隔為5分鐘 env.enableCheckpointing(300000); // 3.檢查點Model設置 exactly once 僅消費一次 保證消息不丟失不重複 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 4 防止checkpoint 太過於頻繁而導致業務處理的速度下降 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 5 設置checkpoint的超時時間 env.getCheckpointConfig().setCheckpointTimeout(20000); // 6 設置checkpoint最大的嘗試次數,次數必須 >= 1 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 7 設置取消任務時保留checkpoint,checkpoint默認會在整個job取消時被刪除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 8 設置執行job過程中,保存檢查點錯誤時,job不失敗 env.getCheckpointConfig().setFailOnCheckpointingErrors(false); String hdfsUri = ConfigLoader.getProperty("hdfsUri"); // 9.設置檢查點存儲的位置,使用RocksDBStateBackend,存儲在hdfs分佈式文件系統中,增量檢查點 try { env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask")); } catch (IOException e) { e.printStackTrace(); }