實時即未來,大資料專案車聯網之原始資料實時ETL任務消費資料策略(3)
theme: smartblue
持續創作,加速成長!這是我參與「掘金日新計劃 · 10 月更文挑戰」的第3天,點選檢視活動詳情
實時ETL業務開發
1 原始資料實時ETL分析
根據kafka叢集中的資料進行實時ETL過濾,對資料進行劃分,並將劃分的資料分別落地到不同的資料庫中。
2 實時ETL開發流程分析
原始資料實時ETL,讀取kafka中的原始json資料,解析json資料,flink實時邏輯開發,資料落地到hive與hbase
l 實現步驟:
原始資料實時ETL流式任務建立
原始資料實時ETL任務設定
定義任務Kakfa資料來源資訊(topic,group,key、value序列化資訊等)
任務新增Kafka資料來源,消費資料
原始資料json解析
邏輯處理,區分原始資料中的解析失敗的資料和解析成功的資料
資料落地HDFSSink,建立Hive表與HDFS資料進行關聯
定時任務指令碼
3 解析工具類引入
解析工具類建立的程式包路徑: cn. maynor.streaming.utils
1 日期處理工具
l 在程式包下建立日期工具類 DateUtil .java
l 定義時間處理的列舉類
略
l 日期處理工具,包含:字串轉日期,獲得當前日期、日期格式化等處理
直接獲得當前日期,格式:“yyyy-MM-dd HH:mm:ss”
直接獲得當前日期,格式:”yyyyMMdd”
字串日期格式轉換,傳入引數格式:“yyyy-MM-dd”,轉成Date型別
字串日期格式轉換,傳入引數格式:“yyyy-MM-dd HH:mm:ss”,轉成Date型別
字串日期格式轉換,傳入引數格式:”yyyy-MM-dd HH:mm:ss“,轉成”yyyyMMdd”格式
略
l 測試日期工具類
2 配置檔案載入工具
l 配置檔案載入工具,載入key在conf.properties檔案中對應的值
l 在程式包下建立日期工具類 ConfigLoader
n ClassLoader(類載入器),載入conf.properties
通過Properties的load方法載入InputStream
編寫方法實現獲得string的key值
編寫方法實現獲得int的key值
3 字串處理工具
後續在使用hbase的rowkey生成的時候,為了避免熱點問題,將字串進行反轉,因此建立字串反轉方法
l 在程式包下建立日期工具類 StringUtil .java
l 定義字串反轉的方法
略
4 JSON解析工具
Json解析工具,即編寫原始資料json解析工具類,使用第一章json解析中的第二種方式org.json解析:原始資料\sourcedata.txt
l 編寫json解析物件:ItcastDataObj物件
n json的key作為物件的屬性
擴充套件欄位
型別轉換、構造方法、toString方法等
l Json解析工具類: JsonParseUtil
區分出來正確資料或者異常資料
Vin不為空且 terminalTime(事件時間)不為空,即正常資料,反之異常資料
為擴充套件欄位進行賦值操作:errorData、terminalTimeStamp
略
l 工具類測試方法: JsonParseUtilTest
4 原始資料 實時 ETL任務設定
l Task任務建立所在的程式包: cn. maynor.streaming.task
l 建立task: KafkaSourceDataTask
l 原始資料實時ETL任務編寫,流任務設定檢查點、任務重啟策略、kafka分割槽發現、資料積壓
checkpoint
任務重啟策略
kafka分割槽發現
資料積壓
1 checkpoint配置
l 選擇合適的Checkpoint儲存方式
l CheckPoint儲存方式存在三種
官方文件:http://ci.apache.org/projects/flink/flink-docs-release- 10/ops/state/state_backends.html
n MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend
伺服器中配置預設狀態儲存路徑:flink-conf.yaml
```bash
The backend that will be used to store operator state checkpoints
state.backend: filesystem
Directory for storing checkpoints 伺服器環境儲存多個掛載的本地磁碟為最優
prod envirment file:///data/flink/checkpoints
state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints ```
不同 StateBackend 之間的效能以及安全性是有很大差異的。通常情況下,在程式碼中設定檢查點儲存,MemoryStateBackend 適合應用於測試環境,線上環境則最好選擇存在掛在儲存在磁碟上的FsStateBackend、RocksDBStateBackend。
這有兩個原因:首先,RocksDBStateBackend 是外部儲存,其他兩種 Checkpoint 儲存方式都是 JVM 堆儲存。 受限於 JVM 堆記憶體的大小,Checkpoint 狀態大小以及安全性可能會受到一定的制約;
其次,RocksDBStateBackend 支援增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀態。 與完整檢查點相比,增量檢查點可以顯著縮短 checkpointing 時間,但代價是需要更長的恢復時間。
“state.checkpoints.dir”引數來指定所有的checkpoints資料和元資料儲存的位置
java
// 本地訪問hdfs設定系統屬性
System.setProperty("HADOOP_USER_NAME", "root");
// 建立flink的流式環境、設定任務的檢查點(資料儲存hdfs)、設定分割槽發現、設定任務的重啟策略、資料積壓內部解決策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 設定流式資料的參照時間 ProcessingTime:事件被處理時機器的系統時間;IngestionTime:事件進入flink事件;EventTime:事件資料中某個欄位的時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2.checkpoint配置 開啟檢查點,設定時間間隔為5分鐘
env.enableCheckpointing(300000);
// 3.檢查點Model設定 exactly once 僅消費一次 保證訊息不丟失不重複
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 4 防止checkpoint 太過於頻繁而導致業務處理的速度下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// 5 設定checkpoint的超時時間
env.getCheckpointConfig().setCheckpointTimeout(20000);
// 6 設定checkpoint最大的嘗試次數,次數必須 >= 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 7 設定取消任務時保留checkpoint,checkpoint預設會在整個job取消時被刪除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 8 設定執行job過程中,儲存檢查點錯誤時,job不失敗
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
String hdfsUri = ConfigLoader.getProperty("hdfsUri");
// 9.設定檢查點儲存的位置,使用RocksDBStateBackend,儲存在hdfs分散式檔案系統中,增量檢查點
try {
env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask"));
} catch (IOException e) {
e.printStackTrace();
}