大資料物流專案:實時增量ETL儲存Kudu(八)

語言: CN / TW / HK

theme: smartblue

持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第12天,點選檢視活動詳情

Logistics_Day08:實時增量ETL儲存Kudu

01-[複習]-上次課程內容回顧

主要講解2個方面內容:搭建物流專案環境(Maven Project)和結構化流程式(測試)

``` 1、搭建物流專案環境 - Windows系統開發環境初始化 設定HADOOP_HOME:指向在windows下編譯HADOOP,bin目錄winutils.exe和hadoop.dll 設定hadoop.dll 檔案:放置C:\Windows/System32 - 建立MavenProject工程 1個父工程,4個子模組(common、etl、generate、offline) 建立工程、建立模組、加入POM依賴 匯入genereate資料生成器模組 進行初始化操作 建立基礎包、匯入工具類 屬性檔案:config.properties,連線服務資訊引數

2、測試結構化流程式 編寫結構化流應用程式,實時從Kafka消費資料(2個Topic,對應2個業務系統資料),將其列印控制檯 - 啟動資料庫和採集框架,對錶的資料進行更新和刪除,流式是否消費到資料 - 執行資料模擬生成器,實時產生業務資料,插入到資料庫表中,流式是否消費到資料

```

02-[瞭解]-第8天:課程內容提綱

主要物流專案業務資料實時ETL轉換操作,流程如下圖中:process方法功能

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-YzpbYYdC-1652014600908)(image-20210526072519874.)]

在流式應用程式中,通常都是從Kafka消費資料,基本上形成固定程式碼結構 spark.readStream.format("kafka").option("bootstrop.servers").option("topic", "").load() 最重要核心程式碼邏輯: 對消費到Kafka業務資料(JSON字串)進行ETL轉換 1. JSON 字串 -> JavaBean物件中 2. JavaBean 物件 -> 提取欄位,封裝到具體表對應POJO物件,方便儲存業務資料

​ 由於物流專案中,需要編寫多個流式計算程式,實時消費Kafka資料,進行ETL轉換,儲存到不同引擎,封裝流式計算程式公共介面,定義程式執行業務流程步驟。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-eIQ7RaET-1652014600910)(image-20210526084023859.)]

03-[掌握]-實時ETL開發之封裝流計算公共介面

為什麼封裝介面:物流專案來說,需要編寫3個流式應用程式,消費業務資料,ETL轉換後儲存到不同引擎(Kudu、Es和CK),步驟基本類似:

==在etl模組【logistics-etl】的 realtime 包下建立 BasicStreamApp 特質Trait,定義方法==

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-pEW3MnrA-1652014600911)(1616040734866.)]

  • 實現方法:建立load,讀取Kafka叢集指定主題Topi的資料
  • 實現方法:建立process方法,對消費資料進行ETL轉換操作
  • 實現方法:建立save方法,將ETL轉換後資料儲存至外部儲存引擎

```java package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.Configuration import org.apache.spark.sql.{DataFrame, SparkSession}

/* * 所有ETL流式處理的基類,實時增量ETL至:Kudu、Elasticsearch和ClickHouse都要實現此基類,定義三個方法 * - 1. 載入資料:load * - 2. 處理資料:process * - 3. 儲存資料:save / trait BasicStreamApp {

/**
 * 讀取資料的方法
 *
 * @param spark SparkSession
 * @param topic 指定消費的主題
 * @param selectExpr 預設值:CAST(value AS STRING)
 */
def load(spark: SparkSession, topic: String, selectExpr: String = "CAST(value AS STRING)"): DataFrame = {
    spark.readStream
        .format(Configuration.SPARK_KAFKA_FORMAT)
        .option("kafka.bootstrap.servers", Configuration.KAFKA_ADDRESS)
        .option("subscribe", topic)
        .option("maxOffsetsPerTrigger", "100000")
        .load()
        .selectExpr(selectExpr)
}

/**
 * 資料的處理
 *
 * @param streamDF 流式資料集StreamingDataFrame
 * @param category 業務資料型別,比如物流系統業務資料,CRM系統業務資料等
 * @return 流式資料集StreamingDataFrame
 */
def process(streamDF: DataFrame, category: String): DataFrame

/**
 * 資料的儲存
 *
 * @param streamDF 儲存資料集DataFrame
 * @param tableName 儲存表的名稱
 * @param isAutoCreateTable 是否自動建立表,預設建立表
 */
def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit

} ```

​ 當公共介面完成以後,某個實時ETL應用,建立物件object時,繼承公共介面,實現其中:processsave方法即可。

04-[掌握]-實時ETL開發之SparkUtils工具類

任務:==編寫工具類SparkUtils,建立SparkSession例項物件,並且可以對應用進行設定。==

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Cx4tJuDH-1652014600911)(1616049306846.)]

建立物件SparkUtils,按照上述結構,實現具體方法,程式碼如下所示:

```java package cn.itcast.logistics.common

import org.apache.commons.lang3.SystemUtils import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession

/* * Spark 操作的工具類 / object SparkUtils {

// 定義變數,型別匿名函式型別,返回為SparkConf物件
lazy val sparkConf = () => {
    new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .set("spark.sql.session.timeZone", "Asia/Shanghai")
        .set("spark.sql.files.maxPartitionBytes", "134217728")
        .set("spark.sql.files.openCostInBytes", "134217728")
        .set("spark.sql.shuffle.partitions", "3")
        .set("spark.sql.autoBroadcastJoinThreshold", "67108864")
}

// 依據應用執行作業系統,設定執行模式:local本地、yarn叢集
lazy val autoSettingEnv = (sparkConf: SparkConf) => {
    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
        //本地環境LOCAL_HADOOP_HOME
        System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
        //設定執行環境和checkpoint路徑
        sparkConf
            .set("spark.master", "local[3]")
            .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
    } else {
        //生產環境
        sparkConf
            .set("spark.master", "yarn")
            .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
    }
    // 返回設定以後SparkConf物件
    sparkConf
}

/**
 * 建立sparkSession物件
 * @param sparkConf SparkConf例項,設定應用慘啊戶數
 * @param clazz 每個應用Class例項物件
 */
def createSparkSession(sparkConf: SparkConf, clazz: Class[_]): SparkSession = {
    SparkSession.builder()
        .appName(clazz.getSimpleName.stripSuffix("$"))
        .config(sparkConf)
        .getOrCreate()
}

} ```

編寫main方法,建立SparkSession物件,檢視4040介面頁面(執行緒休眠即可)

```scala

// 測試
def main(args: Array[String]): Unit = {
    val spark: SparkSession = createSparkSession(
        autoSettingEnv(sparkConf()), this.getClass
    )
    println(spark)

    Thread.sleep(10000000)
    spark.stop()
}

```

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-irIwFlIe-1652014600912)(1616049966978.)]

05-[理解]-實時ETL開發之KuduStreamApp程式

任務:編寫流式程式,實時消費Kafka資料,進行ETL轉換,最終儲存到Kudu表中,繼承公共介面:BasicStreamApp,實現其中方法。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Ijwk5iEE-1652014600912)(https://gitee.com/the_efforts_paid_offf/picture-blog/raw/master20220508182256.)]

具體開發步驟如下所示:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-kaLMr39B-1652014600912)(1616050178069.)]

​ 實時Kudu ETL應用程式入口,資料處理邏輯步驟:

ini step1. 建立SparkSession例項物件,傳遞SparkConf step2. 從Kafka資料來源實時消費資料 step3. 對獲取Json資料進行ETL轉換 step4. 儲存轉換後資料到外部儲存 step5. 應用啟動以後,等待終止結束

```java package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.SparkUtils import org.apache.spark.sql.{DataFrame, SparkSession}

/* * Kudu資料管道應用:實現Kudu資料庫的實時ETL操作 / object KuduStreamApp extends BasicStreamApp {

/**
 * 資料的處理
 *
 * @param streamDF 流式資料集StreamingDataFrame
 * @param category 業務資料型別,比如物流系統業務資料,CRM系統業務資料等
 * @return 流式資料集StreamingDataFrame
 */
override def process(streamDF: DataFrame, category: String): DataFrame = ???

/**
 * 資料的儲存
 *
 * @param streamDF          儲存資料集DataFrame
 * @param tableName         儲存表的名稱
 * @param isAutoCreateTable 是否自動建立表,預設建立表
 */
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = ???

/*
實時Kudu ETL應用程式入口,資料處理邏輯步驟:
    step1. 建立SparkSession例項物件,傳遞SparkConf
    step2. 從Kafka資料來源實時消費資料
    step3. 對獲取Json資料進行ETL轉換
    step4. 儲存轉換後資料到外部儲存
    step5. 應用啟動以後,等待終止結束
*/
def main(args: Array[String]): Unit = {
    // step1. 建立SparkSession例項物件,傳遞SparkConf
    val spark: SparkSession = SparkUtils.createSparkSession(
        SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
    )
    import spark.implicits._

    // step2. 從Kafka資料來源實時消費資料
    // 物流系統Topic資料
    val logisticsDF: DataFrame = load(spark, "logistics")
    val crmDF: DataFrame = load(spark, "crm")

    // step3. 對獲取Json資料進行ETL轉換
    val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
    val etlCrmDF: DataFrame = process(logisticsDF, "crm")

    // step4. 儲存轉換後資料到外部儲存
    save(etlLogisticsDF, "logistics-console")
    save(etlCrmDF, "crm-console")

    // step5. 應用啟動以後,等待終止結束
    spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
    spark.streams.awaitAnyTermination()
}

}

```

上述程式碼,已經實現MAIN方法,接下來只要實現其中:process【ETL轉換】和save【儲存資料】方法即可。

先不考慮ETL業務邏輯和具體save儲存,資料直接進行ETL轉換,將資料列印到控制檯即可。

  • 實現方法:process,沒有任何邏輯

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-jFM2zN69-1652014600913)(1616051030658.)]

  • 實現方法:save,將資料列印控制檯

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-pmeOkIof-1652014600913)(1616051118704.)]

```java package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.SparkUtils import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SparkSession}

/* * Kudu資料管道應用:實現Kudu資料庫的實時ETL操作 / object KuduStreamApp extends BasicStreamApp {

/**
 * 資料的處理,此時不進行任何業務邏輯轉換
 *
 * @param streamDF 流式資料集StreamingDataFrame
 * @param category 業務資料型別,比如物流系統業務資料,CRM系統業務資料等
 * @return 流式資料集StreamingDataFrame
 */
override def process(streamDF: DataFrame, category: String): DataFrame = {
    val etlStreamDF: DataFrame = category match {
        // TODO: 物流系統業務資料
        case "logistics" =>
            streamDF

        // TODO: CRM系統業務資料
        case "crm" =>
            streamDF

        // TODO: 其他業務系統資料
        case _ => streamDF
    }
    // 返回ETL轉換後的資料
    etlStreamDF
}

/**
 * 資料的儲存,此時僅僅將資料列印控制檯
 *
 * @param streamDF          儲存資料集DataFrame
 * @param tableName         儲存表的名稱
 * @param isAutoCreateTable 是否自動建立表,預設建立表
 */
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
    streamDF.writeStream
        .queryName(s"query-${tableName}")
        .outputMode(OutputMode.Append())
        .format("console")
        .option("numRows", "100")
        .option("truncate", "false")
        .start()
}

/*
實時Kudu ETL應用程式入口,資料處理邏輯步驟:
    step1. 建立SparkSession例項物件,傳遞SparkConf
    step2. 從Kafka資料來源實時消費資料
    step3. 對獲取Json資料進行ETL轉換
    step4. 儲存轉換後資料到外部儲存
    step5. 應用啟動以後,等待終止結束
*/
def main(args: Array[String]): Unit = {
    // step1. 建立SparkSession例項物件,傳遞SparkConf
    val spark: SparkSession = SparkUtils.createSparkSession(
        SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
    )
    import spark.implicits._

    // step2. 從Kafka資料來源實時消費資料
    // 物流系統Topic資料
    val logisticsDF: DataFrame = load(spark, "logistics")
    val crmDF: DataFrame = load(spark, "crm")

    // step3. 對獲取Json資料進行ETL轉換
    val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
    val etlCrmDF: DataFrame = process(logisticsDF, "crm")

    // step4. 儲存轉換後資料到外部儲存
    save(etlLogisticsDF, "logistics-console")
    save(etlCrmDF, "crm-console")

    // step5. 應用啟動以後,等待終止結束
    spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
    spark.streams.awaitAnyTermination()
}

}

```

程式設計完成以後,執行流式計算程式:KuduStreamApp,啟動MySQL資料庫和Canal及Oracle資料庫和OGG

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-SlCVJaiw-1652014600914)(1616051529743.)]

06-[理解]-實時ETL開發之Kafka資料JSON格式

​ 實時從Kafka消費資料,無論是OGG採集還是Canal採集資料,都是以JSON字串格式傳送KafkaTopic,此時需要檢視OGG採集資料格式欄位和Canal採集資料格式欄位。

  • 1)、OGG 採集資料格式

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Of6k9pCU-1652014600914)(1616051796175.)]

具體分析:插入資料Insert、更新資料update和刪除資料delete

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-hSS9jVRN-1652014600914)(1616052050412.)]

  • 2)、Canal採集資料格式

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-6qUKxp1f-1652014600915)(1616052064308.)]

具體檢視Canal採集資料,分析欄位

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-mqaOU4ec-1652014600915)(1616052381480.)]

07-[掌握]-實時ETL開發之定義資料Bean物件

​ 無論是OGG採集資料還是Canal採集資料,JSON資料各式欄位,基本一致,所以定義JavaBean,分別解析封裝資料到JavaBean物件

  • 1)、OGG採集JSON資料:7個欄位
  • 2)、Canal採集JSON資料:12個欄位
  • 1)、定義 Bean 物件基類

​ 根據資料來源不同可以分為OGG資料和Canal資料,兩者之間有相同的屬性:table,因此將該屬性作為公共屬性進行提取,抽象成基類。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-zmcLPflD-1652014600916)(1616053553019.)]

```scala package cn.itcast.logistics.common.beans.parser;

import java.io.Serializable;

/* * 根據資料來源定義抽象類,資料來源:ogg 和 canal, 兩者有共同的table屬性 / public abstract class MessageBean implements Serializable {

private static final long serialVersionUID = 373363837132138843L;

private String table;

public String getTable() {
    return table;
}

public void setTable(String table) {
    this.table = table;
}

@Override
public String toString() {
    return table;
}

} ```

  • 2)、定義 OGG 資料 Bean 物件

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-YcbgHW28-1652014600917)(1616053885882.)]

```scala package cn.itcast.logistics.common.beans.parser;

import java.util.Map;

/* * 定義消費OGG資料的JavaBean物件 * { * "table": "ITCAST.tbl_route", //表名:庫名.表名 * "op_type": "U", //操作型別:U表示修改 * "op_ts": "2020-10-08 09:10:54.000774", * "current_ts": "2020-10-08T09:11:01.925000", * "pos": "00000000200006645758", * "before": { //操作前的欄位集合 * "id": 104, * "start_station": "東莞中心", * "start_station_area_id": 441900, * "start_warehouse_id": 1, * "end_station": "蚌埠中轉部", * "end_station_area_id": 340300, * "end_warehouse_id": 107, * "mileage_m": 1369046, * "time_consumer_minute": 56172, * "state": 1, * "cdt": "2020-02-02 18:51:39", * "udt": "2020-02-02 18:51:39", * "remark": null * }, * "after": { //操作後的欄位集合 * "id": 104, * "start_station": "東莞中心", * "start_station_area_id": 441900, * "start_warehouse_id": 1, * "end_station": "TBD", * "end_station_area_id": 340300, * "end_warehouse_id": 107, * "mileage_m": 1369046, * "time_consumer_minute": 56172, * "state": 1, * "cdt": "2020-02-02 18:51:39", * "udt": "2020-02-02 18:51:39", * "remark": null * } * } / public class OggMessageBean extends MessageBean {

private static final long serialVersionUID = -4763944161833712521L;

//定義操作型別
private String op_type;

@Override
public void setTable(String table) {
    //如果表名不為空
    if (table != null && !table.equals("")) {
        table = table.replaceAll("[A-Z]+\\.", "");
    }
    super.setTable(table);
}

public String getOp_type() {
    return op_type;
}

public void setOp_type(String op_type) {
    this.op_type = op_type;
}

public String getOp_ts() {
    return op_ts;
}

public void setOp_ts(String op_ts) {
    this.op_ts = op_ts;
}

public String getCurrent_ts() {
    return current_ts;
}

public void setCurrent_ts(String current_ts) {
    this.current_ts = current_ts;
}

public String getPos() {
    return pos;
}

public void setPos(String pos) {
    this.pos = pos;
}

public Map<String, Object> getBefore() {
    return before;
}

public void setBefore(Map<String, Object> before) {
    this.before = before;
}

public Map<String, Object> getAfter() {
    return after;
}

public void setAfter(Map<String, Object> after) {
    this.after = after;
}

//操作時間
private String op_ts;

@Override
public String toString() {
    return "OggMessageBean{" +
        "table='" + super.getTable() + '\'' +
        ", op_type='" + op_type + '\'' +
        ", op_ts='" + op_ts + '\'' +
        ", current_ts='" + current_ts + '\'' +
        ", pos='" + pos + '\'' +
        ", before=" + before +
        ", after=" + after +
        '}';
}

/**
 * 返回需要處理的列的集合
 * @return
 */
public Map<String, Object> getValue() {
    //如果執行的是刪除操作,則返回before節點的列的集合,如果執行的是插入和更新操作,則返回after節點的列的集合
    if (after == null) {
        return before;
    } else {
        return after;
    }
}

//同步時間
private String current_ts;
//偏移量
private String pos;
//操作之前的資料
private Map<String, Object> before;
//操作之後的資料
private Map<String, Object> after;

}

```

​ 當OGG採集資料時,需要獲取關心操作的資料,定義方法:getValue,刪除資料時獲取before資料,插入或更新獲取after資料,程式碼如下:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-IapFgUjs-1652014600918)(1616054052067.)]

  • 3)、定義 Canal 資料 Bean 物件

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-ZckDhdXi-1652014600918)(1616054087741.)]

```scala package cn.itcast.logistics.common.beans.parser;

import java.util.List; import java.util.Map;

/* * 定義消費canal資料對應的JavaBean物件 * { * "data": [{ * "id": "1", * "name": "北京", * "tel": "222", * "mobile": "1111", * "detail_addr": "北京", * "area_id": "1", * "gis_addr": "1", * "cdt": "2020-10-08 17:20:12", * "udt": "2020-11-05 17:20:16", * "remark": null * }], * "database": "crm", * "es": 1602148867000, * "id": 15, * "isDdl": false, * "mysqlType": { * "id": "bigint(20)", * "name": "varchar(50)", * "tel": "varchar(20)", * "mobile": "varchar(20)", * "detail_addr": "varchar(100)", * "area_id": "bigint(20)", * "gis_addr": "varchar(20)", * "cdt": "datetime", * "udt": "datetime", * "remark": "varchar(100)" * }, * "old": [{ * "tel": "111" * }], * "sql": "", * "sqlType": { * "id": -5, * "name": 12, * "tel": 12, * "mobile": 12, * "detail_addr": 12, * "area_id": -5, * "gis_addr": 12, * "cdt": 93, * "udt": 93, * "remark": 12 * }, * "table": "crm_address", * "ts": 1602148867311, * "type": "UPDATE" //修改資料 * } / public class CanalMessageBean extends MessageBean {

private static final long serialVersionUID = -3147101694588578078L;

//操作的資料集合
private List<Map<String, Object>> data;

public List<Map<String, Object>> getData() {
    return data;
}

public void setData(List<Map<String, Object>> data) {
    this.data = data;
}

public String getDatabase() {
    return database;
}

public void setDatabase(String database) {
    this.database = database;
}

public Long getEs() {
    return es;
}

public void setEs(Long es) {
    this.es = es;
}

public Long getId() {
    return id;
}

public void setId(Long id) {
    this.id = id;
}

public boolean isDdl() {
    return isDdl;
}

public void setDdl(boolean ddl) {
    isDdl = ddl;
}

public Map<String, Object> getMysqlType() {
    return mysqlType;
}

public void setMysqlType(Map<String, Object> mysqlType) {
    this.mysqlType = mysqlType;
}

public String getOld() {
    return old;
}

public void setOld(String old) {
    this.old = old;
}

public String getSql() {
    return sql;
}

public void setSql(String sql) {
    this.sql = sql;
}

public Map<String, Object> getSqlType() {
    return sqlType;
}

public void setSqlType(Map<String, Object> sqlType) {
    this.sqlType = sqlType;
}


public Long getTs() {
    return ts;
}

public void setTs(Long ts) {
    this.ts = ts;
}

public String getType() {
    return type;
}

public void setType(String type) {
    this.type = type;
}

//資料庫名稱
private String database;
private Long es;
private Long id;
private boolean isDdl;
private Map<String, Object> mysqlType;
private String old;
private String sql;
private Map<String, Object> sqlType;
private Long ts;
private String type;

/**
 * 重寫父類的settable方法,將表名修改成統一的字首
 * @param table
 */
@Override
public void setTable(String table) {
    if(table!=null && !table.equals("")){
        if(table.startsWith("crm_")) {
            table = table.replace("crm_", "tbl_");
        }
    }
    super.setTable(table);
}

}

```

CRM系統中每個表的名稱都有字首【crm_】,解析JSON字串時,將其替換【tbl_

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-FP63yxtA-1652014600918)(1616054221345.)]

​ 至此,定義MessageBean實體類,接下來,需要編寫程式碼解析JSON字串為MessageBean物件,封裝例項物件中,方便獲取各個欄位的值,進行相應的處理轉換操作。

注意:定義MessageBean實體類使用Java語言,沒有使用Scala語言,後續使用fastJson庫解析。

08-[掌握]-實時ETL開發之資料轉換Bean及測試

任務:==編寫程式碼,解析JSON字串為MessageBean物件,屬於實時ETL轉換第一步。==

  • 1)、如何解析JSON字串為JavaBean物件呢???

使用阿里巴巴JSON庫:fastJson,既能解析JSON為Bean物件,又能轉換Bean物件為JSON字串

為什麼使用fastJson解析?? fastJson解析Json字串時,使用起來比較簡單,此外庫基於Java語言開發,對JavaBean物件支援非常的好,對Scala語言支援不好,所以MessageBean使用Java語言定義的,沒有使用Scala語言。

  • 轉換JSON為Bean物件:JSON.parseObject(jsonStr, classOf[StuBean])

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-DiTGRUf4-1652014600919)(1616054946045.)]

  • 將Bean物件轉換為JSON字串:JSON.toJSONString(stuBean, true)

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-wAfJcjCm-1652014600920)(1616054978516.)]

```scala package cn.itcast.logistics.test;

import java.util.Objects;

public class StuBean {

private Integer id ;
private String name ;

public StuBean() {
}

public StuBean(Integer id, String name) {
    this.id = id;
    this.name = name;
}

public Integer getId() {
    return id;
}

public void setId(Integer id) {
    this.id = id;
}

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    StuBean stuBean = (StuBean) o;
    return Objects.equals(id, stuBean.id) &&
        Objects.equals(name, stuBean.name);
}

@Override
public int hashCode() {
    return Objects.hash(id, name);
}

@Override
public String toString() {
    return "StuBean{" +
        "id=" + id +
        ", name='" + name + '\'' +
        '}';
}

}

// ===============================================================

package cn.itcast.logistics.test

import com.alibaba.fastjson.JSON

object FastJsonTest {

def main(args: Array[String]): Unit = {

    // 定義json字串
    val jsonStr: String =
        """
          |{
          |  "id": 10001,
          |  "name": "zhangsan"
          |}
          |""".stripMargin
    // JSON -> JavaBean
    val stuBean: StuBean = JSON.parseObject(jsonStr, classOf[StuBean])
    println(stuBean)


    // JavaBean轉換為JSON字串
    val stuJson: String = JSON.toJSONString(stuBean, true)
    println(stuJson)
}

}

```

  • 2)、流式程式使用Scala語言編寫,為什麼MessageBean使用Java語言??

由於使用FastJson解析Json字串,所以使用Java語言編寫。由於Scala語言中CaseClass對FastJson支援不是很友好,有的時候,解析會出問題,無法完成解析操作。

FastJson庫使用Java編寫的,符合Java語言規範,但是不符合Scala與規範約束。

任務:編寫程式碼,將從Kafka消費JSON字串資料,解析為MessageBean物件即可

  • 1)、首先對物流系統資料,使用OGG採集數資料進行解析處理,核心程式碼如下:

scala case "logistics" => val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF // 由於從Kafka消費資料,只獲取value訊息,將其轉換DataSet .as[String] // 過濾資料 .filter(msg => null != msg && msg.trim.length > 0) // 解析每條資料 .map{ msg => JSON.parseObject(msg, classOf[OggMessageBean]) }(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定編碼器 // 返回資料 oggBeanStreamDS.toDF()

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-R4yvOxD0-1652014600920)(1616055524533.)]

​ 當將DataFrame轉換Dataset進行操作時,尤其呼叫轉換函式(比如map、filter、flatMap)等等,需要指定編碼器Encoder(Dataset 強型別資料結構,指定型別編碼)。

預設情況下,當Dataset資料型別為:元組型別、CaseClass或基本資料型別,都會提供預設編碼器Encoder,除此之外資料型別,比如自定義Java語言Bean物件,必須指定編碼器。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-PdlhZQfN-1652014600921)(1616055716033.)]

執行測試程式,在Oracle資料庫中操作資料,檢視控制檯列印結果

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-VvKeLUp6-1652014600921)(1616055935505.)]

  • 2)、將Canal採集JSON資料,轉換MessageBean物件,核心程式碼

```java // TODO: CRM系統業務資料 case "crm" => implicit val canalEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean]) val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF // 過濾資料 .filter(row => !row.isNullAt(0)) // 解析資料,對分割槽資料操作 .mapPartitions { iter => iter.map { row => val jsonValue: String = row.getAsString // 解析JSON字串 JSON.parseObject(jsonValue, classOf[CanalMessageBean]) } }

            // 返回轉換後的資料
            canalBeanStreamDS.toDF()

```

其中採用隱式引數方式,傳遞定義編碼器Encoder

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-EC1VljWg-1652014600921)(1616056559325.)]

再次執行流式計算程式:KuduStreamApp,在MySQL資料庫中修改表的資料,檢視控制檯列印結果

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Rg1sVo7i-1652014600923)(1616056711227.)]

09-[理解]-實時ETL開發之轉換POJO【思路】

任務:分析從Kafka消費資料(JSON轉換MessageBean物件),哪些欄位是關係值。

  • OGG採集Oracle資料庫資料:7個欄位
  • Canal採集MySQL資料庫資料:12個欄位

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-tc5InAk0-1652014600923)(image-20210526072519874.)]

OGG採集Oracle資料庫表的資料,傳送到Kafka中JSON字串,轉換為MessageBean以後:

  • 第一個欄位:table,對哪個表進行操作
  • 第二個欄位:op_type,資料操作型別,三個值【插入I、更新U、刪除D
  • 第三個欄位:資料欄位,可能是after(插入和更新),可能是before(刪除),提供方法getValue

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-VOZkVqp6-1652014600923)(1616118817076-1621984868452.)]

Canal採集MySQL資料庫資料時,業務中關心欄位:與OGG採集資料關心欄位基本一致

  • 第一個欄位:table,表的名稱,對哪個表進行操作
  • 第二個欄位:type,資料操作型別,三個值【INSERT、UPDATE、DELETE
  • 第三個欄位:data,真正操作資料

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-4F8lXt9R-1652014600924)(1616119230880-1621984868452.)]

​ 經過前面分析可知,無論OGG採集海Canal採集資料,將JSON字串封裝為MessageBean以後,需要提取其中核心欄位資料,進行轉換操作。

將提取欄位【type】型別和【data】資料,封裝到具體表table的實體類【POJO】中,後面方便進行操作。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-7WYVty8J-1652014600924)(1616119695869-1621984868453.)]

10-[掌握]-實時ETL開發之OggBean轉換POJO程式設計

任務:以OGG採集資料為例,針對其中一個張表【tbl_areas】進行轉換操作。

1)、依據table欄位判斷資料:tbl_areas 2)、獲取資料欄位值:getValue方法,將其轉換為POJO物件 3)、過濾掉轉換為null資料

  • 1)、定義表名稱的隱射

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Ff2HZHtE-1652014600924)(1616120220576-1621984868453.)]

```java package cn.itcast.logistics.common

/* * 定義表名:根據表的名字定義屬性 / object TableMapping {

// Logistics 物流系統表名稱
val AREAS: String = "tbl_areas"
val CHARGE_STANDARD: String = "tbl_charge_standard"
val CODES: String = "tbl_codes"
val COLLECT_PACKAGE: String = "tbl_collect_package"
val COMPANY: String = "tbl_company"
val COMPANY_DOT_MAP: String = "tbl_company_dot_map"
val COMPANY_TRANSPORT_ROUTE_MA: String = "tbl_company_transport_route_ma"
val COMPANY_WAREHOUSE_MAP: String = "tbl_company_warehouse_map"
val CONSUMER_SENDER_INFO: String = "tbl_consumer_sender_info"
val COURIER: String = "tbl_courier"
val DELIVER_PACKAGE: String = "tbl_deliver_package"
val DELIVER_REGION: String = "tbl_deliver_region"
val DELIVERY_RECORD: String = "tbl_delivery_record"
val DEPARTMENT: String = "tbl_department"
val DOT: String = "tbl_dot"
val DOT_TRANSPORT_TOOL: String = "tbl_dot_transport_tool"
val DRIVER: String = "tbl_driver"
val EMP: String = "tbl_emp"
val EMP_INFO_MAP: String = "tbl_emp_info_map"
val EXPRESS_BILL: String = "tbl_express_bill"
val EXPRESS_PACKAGE: String = "tbl_express_package"
val FIXED_AREA: String = "tbl_fixed_area"
val GOODS_RACK: String = "tbl_goods_rack"
val JOB: String = "tbl_job"
val OUT_WAREHOUSE: String = "tbl_out_warehouse"
val OUT_WAREHOUSE_DETAIL: String = "tbl_out_warehouse_detail"
val PKG: String = "tbl_pkg"
val POSTAL_STANDARD: String = "tbl_postal_standard"
val PUSH_WAREHOUSE: String = "tbl_push_warehouse"
val PUSH_WAREHOUSE_DETAIL: String = "tbl_push_warehouse_detail"
val ROUTE: String = "tbl_route"
val SERVICE_EVALUATION: String = "tbl_service_evaluation"
val STORE_GRID: String = "tbl_store_grid"
val TRANSPORT_RECORD: String = "tbl_transport_record"
val TRANSPORT_TOOL: String = "tbl_transport_tool"
val VEHICLE_MONITOR: String = "tbl_vehicle_monitor"
val WAREHOUSE: String = "tbl_warehouse"
val WAREHOUSE_EMP: String = "tbl_warehouse_emp"
val WAREHOUSE_RACK_MAP: String = "tbl_warehouse_rack_map"
val WAREHOUSE_RECEIPT: String = "tbl_warehouse_receipt"
val WAREHOUSE_RECEIPT_DETAIL: String = "tbl_warehouse_receipt_detail"
val WAREHOUSE_SEND_VEHICLE: String = "tbl_warehouse_send_vehicle"
val WAREHOUSE_TRANSPORT_TOOL: String = "tbl_warehouse_transport_tool"
val WAREHOUSE_VEHICLE_MAP: String = "tbl_warehouse_vehicle_map"
val WAY_BILL: String = "tbl_waybill"
val WAYBILL_LINE: String = "tbl_waybill_line"
val WAYBILL_STATE_RECORD: String = "tbl_waybill_state_record"
val WORK_TIME: String = "tbl_work_time"

// CRM 系統業務資料表名稱
val ADDRESS: String = "tbl_address"
val CONSUMER_ADDRESS_MAP: String = "tbl_consumer_address_map"
val CUSTOMER: String = "tbl_customer"

} ```

  • 2)、編寫核心程式碼,從OggMessageBean中提取欄位值,封裝到具體POJO物件中

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-clUR4VHN-1652014600926)(1616120552233-1621984868453.)]

11-[掌握]-實時ETL開發之轉換POJO【資料解析器】

任務:編寫資料解析器中方法【toAreaBean】實現,從MessageBean中提取欄位值,封裝到POJO實體類物件。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-i2Q21jEU-1652014600927)(1616121581555-1621984868453.)]

  • 當提取MessageBean中資料欄位值,如何將其封裝到POJO物件中呢??

首先將資料段值:Map資料型別 轉換 JSON字串,再將JSON字串轉換為 POJO物件

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-5HvECsad-1652014600927)(1616121249979-1621984868453.)]

最終實現程式碼如下所示:

```scala package cn.itcast.logistics.etl.parse

import java.util

import cn.itcast.logistics.common.beans.logistics.AreasBean import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, MessageBean, OggMessageBean} import com.alibaba.fastjson.JSON

object DataParser {

/**
 * 判斷messageBean是否是OggMessageBean
 */
private def getOggMessageBean(bean: MessageBean): OggMessageBean = {
    bean match {
        case ogg: OggMessageBean => ogg
    }
}

/**
 * 判斷messageBean是否是CanalMessageBean
 */
private def getCanalMessageBean(bean: MessageBean): CanalMessageBean = {
    bean match {
        case canal: CanalMessageBean => canal
    }
}

/**
 * 提取ogg(I、U、D)和canal(insert、update、delete)資料的optype屬性,轉換成統一的操作字串
 *
 * @param opType 資料操作型別:insert、update、delete,任意一種
 */
private def getOpType(opType: String): String = {
    opType match {
        case "I" => "insert"
        case "U" => "update"
        case "D" => "delete"
        case "INSERT" => "insert"
        case "UPDATE" => "update"
        case "DELETE" => "delete"
        case _ => "insert"
    }
}

// ================== 物流Logistics系統業務資料解析 ==================
/*
    從MessageBean提取資料欄位值和資料操作型別,將其封裝到具體POJO物件中
    TODO: 將物流Logistics系統:tbl_areas表的欄位資訊轉換成AreaBean物件
 */
def toAreaBean(bean: MessageBean): AreasBean = {
    // a. 轉換MessageBean物件為OggMessageBean物件
    val oggMessageBean: OggMessageBean = getOggMessageBean(bean)
    // b. 獲取資料操作型別
    val opType: String = getOpType(oggMessageBean.getOp_type)
    // c. 獲取操作資料值
    val dataValue: util.Map[String, AnyRef] = oggMessageBean.getValue
    // d. 將資料封裝到POJO物件
    // 第一步、轉換map為json字串
    val dataJson: String = JSON.toJSONString(dataValue, true)
    println(dataJson)
    // 第二步、json字串為pojo
    val areasBean = JSON.parseObject(dataJson, classOf[AreasBean])
    // 第三步、判斷解析不為null時,設定資料操作型別
    if(null != areasBean){
        areasBean.setOpType(opType)
    }
    // e. 返回封裝物件
    areasBean
}

} ```

12-[理解]-實時ETL開發之轉換POJO【隱式轉換】

​ 當對Dataset資料結構進行操作時(呼叫函式,轉換函式,比如map、flatMap)等,資料返回型別如果不是元組、CaseClass及基本型別,需要指定編碼器Encoder

將JSON字串轉換為MessageBean物件時,指定Encoder編碼器。

  • 方式一、方法後面緊跟圓括號,指定對應編碼器

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-A9yz4kyl-1652014600927)(1616123417063-1621984868453.)]

  • 方式二、隱式引數,指定編碼器,程式自動傳入

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-TtT2n2Ek-1652014600928)(1616123450168-1621984868453.)]

​ 當從MessageBean中提取資料欄位值以後,將其封裝到對應POJO物件時,需要指定編碼器,否則程式報錯,具體如下圖所示:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-KoWkGwAc-1652014600928)(1616123541413-1621984868453.)]

學習SparkSQL框架,如何匯入元組型別、CaseClass型別和基本型別 編碼器。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-MhaVBfT0-1652014600928)(1616123609397-1621984868453.)]

​ 檢視implicits父類:SQLImplicits物件,包含很對定義隱式轉換函式,返回型別都是各種編碼器Encoder

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-kXuGyKtt-1652014600929)(1616123726466-1621984868453.)]

參考SQLImplicits物件中編碼器定義,自己定義編碼器:BeanImplicits,實現隱式匯入和轉換操作。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-kraEvGbD-1652014600929)(1616123830045-1621984868453.)]

```scala package cn.itcast.logistics.common

import cn.itcast.logistics.common.beans.crm. import cn.itcast.logistics.common.beans.logistics. import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean} import org.apache.spark.sql.{Encoder, Encoders}

/* * 擴充套件自定義POJO的隱式轉換實現 / object BeanImplicits {

// 定義MessageBean隱式引數Encoder值
implicit def newOggMessageBeanEncoder: Encoder[OggMessageBean] = Encoders.bean(classOf[OggMessageBean])

implicit def newCanalMessageBeanEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean])

// Logistics Bean
implicit def newAreasBeanEncoder: Encoder[AreasBean] = Encoders.bean(classOf[AreasBean])

implicit def newChargeStandardBeanEncoder: Encoder[ChargeStandardBean] = Encoders.bean(classOf[ChargeStandardBean])

implicit def newCodesBeanEncoder: Encoder[CodesBean] = Encoders.bean(classOf[CodesBean])

implicit def newCollectPackageBeanEncoder: Encoder[CollectPackageBean] = Encoders.bean(classOf[CollectPackageBean])

implicit def newCompanyBeanEncoder: Encoder[CompanyBean] = Encoders.bean(classOf[CompanyBean])

implicit def newCompanyDotMapBeanEncoder: Encoder[CompanyDotMapBean] = Encoders.bean(classOf[CompanyDotMapBean])

implicit def newCompanyTransportRouteMaBeanEncoder: Encoder[CompanyTransportRouteMaBean] = Encoders.bean(classOf[CompanyTransportRouteMaBean])

implicit def newCompanyWarehouseMapBeanEncoder: Encoder[CompanyWarehouseMapBean] = Encoders.bean(classOf[CompanyWarehouseMapBean])

implicit def newConsumerSenderInfoBeanEncoder: Encoder[ConsumerSenderInfoBean] = Encoders.bean(classOf[ConsumerSenderInfoBean])

implicit def newCourierBeanEncoder: Encoder[CourierBean] = Encoders.bean(classOf[CourierBean])

implicit def newDeliverPackageBeanEncoder: Encoder[DeliverPackageBean] = Encoders.bean(classOf[DeliverPackageBean])

implicit def newDeliverRegionBeanEncoder: Encoder[DeliverRegionBean] = Encoders.bean(classOf[DeliverRegionBean])

implicit def newDeliveryRecordBeanEncoder: Encoder[DeliveryRecordBean] = Encoders.bean(classOf[DeliveryRecordBean])

implicit def newDepartmentBeanEncoder: Encoder[DepartmentBean] = Encoders.bean(classOf[DepartmentBean])

implicit def newDotBeanEncoder: Encoder[DotBean] = Encoders.bean(classOf[DotBean])

implicit def newDotTransportToolBeanEncoder: Encoder[DotTransportToolBean] = Encoders.bean(classOf[DotTransportToolBean])

implicit def newDriverBeanEncoder: Encoder[DriverBean] = Encoders.bean(classOf[DriverBean])

implicit def newEmpBeanEncoder: Encoder[EmpBean] = Encoders.bean(classOf[EmpBean])

implicit def newEmpInfoMapBeanEncoder: Encoder[EmpInfoMapBean] = Encoders.bean(classOf[EmpInfoMapBean])

implicit def newExpressBillBeanEncoder: Encoder[ExpressBillBean] = Encoders.bean(classOf[ExpressBillBean])

implicit def newExpressPackageBeanEncoder: Encoder[ExpressPackageBean] = Encoders.bean(classOf[ExpressPackageBean])

implicit def newFixedAreaBeanEncoder: Encoder[FixedAreaBean] = Encoders.bean(classOf[FixedAreaBean])

implicit def newGoodsRackBeanEncoder: Encoder[GoodsRackBean] = Encoders.bean(classOf[GoodsRackBean])

implicit def newJobBeanEncoder: Encoder[JobBean] = Encoders.bean(classOf[JobBean])

implicit def newOutWarehouseBeanEncoder: Encoder[OutWarehouseBean] = Encoders.bean(classOf[OutWarehouseBean])

implicit def newOutWarehouseDetailBeanEncoder: Encoder[OutWarehouseDetailBean] = Encoders.bean(classOf[OutWarehouseDetailBean])

implicit def newPkgBeanEncoder: Encoder[PkgBean] = Encoders.bean(classOf[PkgBean])

implicit def newPostalStandardBeanEncoder: Encoder[PostalStandardBean] = Encoders.bean(classOf[PostalStandardBean])

implicit def newPushWarehouseBeanEncoder: Encoder[PushWarehouseBean] = Encoders.bean(classOf[PushWarehouseBean])

implicit def newPushWarehouseDetailBeanEncoder: Encoder[PushWarehouseDetailBean] = Encoders.bean(classOf[PushWarehouseDetailBean])

implicit def newRouteBeanEncoder: Encoder[RouteBean] = Encoders.bean(classOf[RouteBean])

implicit def newServiceEvaluationBeanEncoder: Encoder[ServiceEvaluationBean] = Encoders.bean(classOf[ServiceEvaluationBean])

implicit def newStoreGridBeanEncoder: Encoder[StoreGridBean] = Encoders.bean(classOf[StoreGridBean])

implicit def newTransportToolBeanEncoder: Encoder[TransportToolBean] = Encoders.bean(classOf[TransportToolBean])

implicit def newVehicleMonitorBeanEncoder: Encoder[VehicleMonitorBean] = Encoders.bean(classOf[VehicleMonitorBean])

implicit def newWarehouseBeanEncoder: Encoder[WarehouseBean] = Encoders.bean(classOf[WarehouseBean])

implicit def newWarehouseEmpBeanEncoder: Encoder[WarehouseEmpBean] = Encoders.bean(classOf[WarehouseEmpBean])

implicit def newWarehouseRackMapBeanEncoder: Encoder[WarehouseRackMapBean] = Encoders.bean(classOf[WarehouseRackMapBean])

implicit def newWarehouseReceiptBeanEncoder: Encoder[WarehouseReceiptBean] = Encoders.bean(classOf[WarehouseReceiptBean])

implicit def newWarehouseReceiptDetailBeanEncoder: Encoder[WarehouseReceiptDetailBean] = Encoders.bean(classOf[WarehouseReceiptDetailBean])

implicit def newWarehouseSendVehicleBeanEncoder: Encoder[WarehouseSendVehicleBean] = Encoders.bean(classOf[WarehouseSendVehicleBean])

implicit def newWarehouseTransportToolBeanEncoder: Encoder[WarehouseTransportToolBean] = Encoders.bean(classOf[WarehouseTransportToolBean])

implicit def newWarehouseVehicleMapBeanEncoder: Encoder[WarehouseVehicleMapBean] = Encoders.bean(classOf[WarehouseVehicleMapBean])

implicit def newWaybillBeanEncoder: Encoder[WaybillBean] = Encoders.bean(classOf[WaybillBean])

implicit def newWaybillLineBeanEncoder: Encoder[WaybillLineBean] = Encoders.bean(classOf[WaybillLineBean])

implicit def newWaybillStateRecordBeanEncoder: Encoder[WaybillStateRecordBean] = Encoders.bean(classOf[WaybillStateRecordBean])

implicit def newWorkTimeBeanEncoder: Encoder[WorkTimeBean] = Encoders.bean(classOf[WorkTimeBean])

implicit def newTransportRecordBeanEncoder: Encoder[TransportRecordBean] = Encoders.bean(classOf[TransportRecordBean])

// CRM Bean
implicit def newCustomerBeanEncoder: Encoder[CustomerBean] = Encoders.bean(classOf[CustomerBean])

implicit def newAddressBeanEncoder: Encoder[AddressBean] = Encoders.bean(classOf[AddressBean])

implicit def newConsumerAddressMapBeanEncoder: Encoder[ConsumerAddressMapBean] = Encoders.bean(classOf[ConsumerAddressMapBean])

}

```

在流式程式程式碼中,匯入自定義隱式轉換物件即可。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-oNnoiPwH-1652014600929)(1616123959706-1621984868453.)]

13-[掌握]-實時ETL開發之OggBean轉換POJO測試

任務:啟動容器,進入容器中啟動Oracle資料庫和OGG,修改Oracle資料庫表的資料測試應用

  • 1)、啟動Kafka訊息佇列

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-iEpB4Ge7-1652014600930)(1616124159348-1621984868453.)]

  • 2)、啟動Oracle資料庫和OGG

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-BWH9eKqo-1652014600930)(1616124185566-1621984868453.)]

  • 3)、執行流式應用程式

首先,如果檢查點目錄存在,最好將其刪除;此外,註釋掉消費CRM系統資料程式碼,此時僅僅測試物流系統資料

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-t8nzhTJ0-1652014600930)(1616124259387-1621984868453.)]

啟動流式應用程式,執行操作,檢視控制檯輸出結果,可以看到將Map集合(儲存資料)轉換為JSON字串

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-7dGFJr0B-1652014600931)(1616124564721-1621984868453.)]

  • 4)、使用DBeave對資料庫表的資料進行更新和刪除

更新2條資料,刪除1條資料,看到如下介面:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-4ADiDT4K-1652014600931)(1616124523908-1621984868453.)]