實時即未來,大資料專案車聯網之原始資料實時ETL任務消費資料策略(3)

語言: CN / TW / HK

theme: smartblue

持續創作,加速成長!這是我參與「掘金日新計劃 · 10 月更文挑戰」的第3天,點選檢視活動詳情

實時ETL業務開發

1 原始資料實時ETL分析

根據kafka叢集中的資料進行實時ETL過濾,對資料進行劃分,並將劃分的資料分別落地到不同的資料庫中。

image-20221007105927589

2 實時ETL開發流程分析

原始資料實時ETL,讀取kafka中的原始json資料,解析json資料,flink實時邏輯開發,資料落地到hive與hbase

image-20221007110005934

l 實現步驟:

原始資料實時ETL流式任務建立

原始資料實時ETL任務設定

定義任務Kakfa資料來源資訊(topic,group,key、value序列化資訊等)

任務新增Kafka資料來源,消費資料

原始資料json解析

邏輯處理,區分原始資料中的解析失敗的資料和解析成功的資料

資料落地HDFSSink,建立Hive表與HDFS資料進行關聯

定時任務指令碼

image-20221007110019679

3 解析工具類引入

解析工具類建立的程式包路徑: cn. maynor.streaming.utils

1 日期處理工具

l 在程式包下建立日期工具類 DateUtil .java

l 定義時間處理的列舉類

l 日期處理工具,包含:字串轉日期,獲得當前日期、日期格式化等處理

直接獲得當前日期,格式:“yyyy-MM-dd HH:mm:ss”

直接獲得當前日期,格式:”yyyyMMdd”

字串日期格式轉換,傳入引數格式:“yyyy-MM-dd”,轉成Date型別

字串日期格式轉換,傳入引數格式:“yyyy-MM-dd HH:mm:ss”,轉成Date型別

字串日期格式轉換,傳入引數格式:”yyyy-MM-dd HH:mm:ss“,轉成”yyyyMMdd”格式

l 測試日期工具類

2 配置檔案載入工具

l 配置檔案載入工具,載入key在conf.properties檔案中對應的值

l 在程式包下建立日期工具類 ConfigLoader

n ClassLoader(類載入器),載入conf.properties

通過Properties的load方法載入InputStream

編寫方法實現獲得string的key值

編寫方法實現獲得int的key值

3 字串處理工具

後續在使用hbase的rowkey生成的時候,為了避免熱點問題,將字串進行反轉,因此建立字串反轉方法

l 在程式包下建立日期工具類 StringUtil .java

l 定義字串反轉的方法

4 JSON解析工具

Json解析工具,即編寫原始資料json解析工具類,使用第一章json解析中的第二種方式org.json解析:原始資料\sourcedata.txt

l 編寫json解析物件:ItcastDataObj物件

n json的key作為物件的屬性

擴充套件欄位

型別轉換、構造方法、toString方法等

講義關聯資料\ItcastDataObj屬性.md

講義關聯資料\ItcastDataObj方法.md

l Json解析工具類: JsonParseUtil

講義關聯資料\JsonParseUtil.java

區分出來正確資料或者異常資料

Vin不為空且 terminalTime(事件時間)不為空,即正常資料,反之異常資料

為擴充套件欄位進行賦值操作:errorData、terminalTimeStamp

l 工具類測試方法: JsonParseUtilTest

img

4 原始資料 實時 ETL任務設定

l Task任務建立所在的程式包: cn. maynor.streaming.task

l 建立task: KafkaSourceDataTask

img

l 原始資料實時ETL任務編寫,流任務設定檢查點、任務重啟策略、kafka分割槽發現、資料積壓

checkpoint

任務重啟策略

kafka分割槽發現

資料積壓

1 checkpoint配置

l 選擇合適的Checkpoint儲存方式

l CheckPoint儲存方式存在三種

官方文件:http://ci.apache.org/projects/flink/flink-docs-release- 10/ops/state/state_backends.html

n MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend

伺服器中配置預設狀態儲存路徑:flink-conf.yaml

```bash

The backend that will be used to store operator state checkpoints

state.backend: filesystem

Directory for storing checkpoints 伺服器環境儲存多個掛載的本地磁碟為最優

prod envirment file:///data/flink/checkpoints

state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints ```

不同 StateBackend 之間的效能以及安全性是有很大差異的。通常情況下,在程式碼中設定檢查點儲存,MemoryStateBackend 適合應用於測試環境,線上環境則最好選擇存在掛在儲存在磁碟上的FsStateBackend、RocksDBStateBackend。

這有兩個原因:首先,RocksDBStateBackend 是外部儲存,其他兩種 Checkpoint 儲存方式都是 JVM 堆儲存。 受限於 JVM 堆記憶體的大小,Checkpoint 狀態大小以及安全性可能會受到一定的制約;

其次,RocksDBStateBackend 支援增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀態。 與完整檢查點相比,增量檢查點可以顯著縮短 checkpointing 時間,但代價是需要更長的恢復時間。

“state.checkpoints.dir”引數來指定所有的checkpoints資料和元資料儲存的位置

java // 本地訪問hdfs設定系統屬性 System.setProperty("HADOOP_USER_NAME", "root"); // 建立flink的流式環境、設定任務的檢查點(資料儲存hdfs)、設定分割槽發現、設定任務的重啟策略、資料積壓內部解決策略 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // 設定流式資料的參照時間 ProcessingTime:事件被處理時機器的系統時間;IngestionTime:事件進入flink事件;EventTime:事件資料中某個欄位的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2.checkpoint配置 開啟檢查點,設定時間間隔為5分鐘 env.enableCheckpointing(300000); // 3.檢查點Model設定 exactly once 僅消費一次 保證訊息不丟失不重複 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 4 防止checkpoint 太過於頻繁而導致業務處理的速度下降 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 5 設定checkpoint的超時時間 env.getCheckpointConfig().setCheckpointTimeout(20000); // 6 設定checkpoint最大的嘗試次數,次數必須 >= 1 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 7 設定取消任務時保留checkpoint,checkpoint預設會在整個job取消時被刪除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 8 設定執行job過程中,儲存檢查點錯誤時,job不失敗 env.getCheckpointConfig().setFailOnCheckpointingErrors(false); String hdfsUri = ConfigLoader.getProperty("hdfsUri"); // 9.設定檢查點儲存的位置,使用RocksDBStateBackend,儲存在hdfs分散式檔案系統中,增量檢查點 try { env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask")); } catch (IOException e) { e.printStackTrace(); }