大資料物流專案:實時增量ETL儲存Kudu(七)

語言: CN / TW / HK

theme: smartblue

持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第11天,點選檢視活動詳情

Logistics_Day07:實時增量ETL儲存Kudu

1612344442449

01-[複習]-上次課程內容回顧

​ 主要講解:Kudu 儲存引擎,類似HBase資料庫,儲存資料,誕生目的:取代HDFS和HBase,既能夠實現隨機讀寫資料,又能夠批量載入分析。

  • 1)、針對海量資料隨機讀寫,實現HBase資料庫功能
  • 2)、針對海量資料批量載入,尤其列式儲存Parquet

​ Kudu框架誕生之初,考慮與分析引擎整合整合,Cloudera公司開源框架:Impala(基於記憶體分析引擎)和Apache Spark計算引擎整合。==Kudu是儲存引擎,OLAP分析資料庫,準實時分析==

1615987593807

通過思維導圖,Kudu內容提綱:

1616031099926

多多熟悉Kudu API使用,無論Java Client API還是與Spark整合。

02-[瞭解]-第7天:課程內容提綱

主要講解:物流專案開發環境搭建和編寫流式計算程式公共介面:編寫流式程式,實時從Kafka消費採集業務資料,對其進行ETL轉換處理,最終儲存到儲存引擎(Kudu、Es、ClickHouse)。

1615986629108

  • 1)、資料來源source:從Kafka中消費不同業務資料儲存資料topic
  • 物流系統Logistics:使用OGG採集,儲存JSON字串
  • CRM系統:使用Canal採集,儲存JSON字串
  • 2)、資料轉換Transformation:將獲取JSON字串進行解析,封裝實體類JavaBean物件中

1616032390633

物流專案來說,進行資料實時ETL操作,進行封裝抽象,採用Scala程式設計,模擬實時產生的資料,進行測試。

03-[理解]-專案準備之開發環境初始化

​ 由於開發專案時,在Windows系統開發,主要編寫Spark程式,涉及使用HADOOP中HDFS檔案系統API,在Windows開發時,需要配置:winutils.exehadoop.dll

Windows binaries for Hadoop versions:https://github.com/cdarlint/winutils

  • 1)、配置 HADOOP_HOME

1616032694930

比如,講師解壓目錄

1616032759465

配置Windows系統環境變數:HADOOP_HOME

image-20210525084001224

  • 2)、hadoop.dll 拷貝

1616032803152

注意:配置完成以後,建議重啟電腦;當然,如果你不配置的話,執行Spark程式時,可能會報錯。

04-[理解]-專案初始化之建立Maven工程及模組

首先,建立Maven工程和模組,再進行新增依賴和建立包和匯入工具類。

1616033294048

建立完Maven工程以後,截圖如下所示:

image-20210525090704346

  • 1)、建立專案Maven Parent父工程,刪除工程的src目錄

1616033316996

配置Maven倉庫:安裝目錄、setting配置檔案和repository目錄

image-20210525091108153

  • 2)、建立logistics-common公共模組

1616033449976

  • 3)、建立logistics-etl實時ETL處理模組

1616033502397

  • 4)、建立logistics-offline離線指標計算模組

1616033545628

05-[理解]-專案初始化之匯入POM依賴

接下來:將父工程和各個Maven Module新增pom檔案依賴

  • 1)、父工程【itcast-logistics-parent】依賴

```xml aliyun http://maven.aliyun.com/nexus/content/groups/public/ cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ jboss http://repository.jboss.com/nexus/content/groups/public mvnrepository https://mvnrepository.com/ elastic.co https://artifacts.elastic.co/maven

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <!-- SDK -->
    <java.version>1.8</java.version>
    <scala.version>2.11</scala.version>
    <!-- Junit -->
    <junit.version>4.12</junit.version>
    <!-- HTTP Version -->
    <http.version>4.5.11</http.version>
    <!-- Hadoop -->
    <hadoop.version>3.0.0-cdh6.2.1</hadoop.version>
    <!-- Spark -->
    <spark.version>2.4.0-cdh6.2.1</spark.version>
   <!-- <spark.version>2.4.0</spark.version>-->
    <!-- Spark Graph Visual -->
    <gs.version>1.3</gs.version>
    <breeze.version>1.0</breeze.version>
    <jfreechart.version>1.5.0</jfreechart.version>
    <!-- Parquet -->
    <parquet.version>1.9.0-cdh6.2.1</parquet.version>
    <!-- Kudu -->
    <kudu.version>1.9.0-cdh6.2.1</kudu.version>
    <!-- Hive -->
    <hive.version>2.1.1-cdh6.2.1</hive.version>
    <!-- Kafka -->
    <!--<kafka.version>2.1.0-cdh6.2.1</kafka.version>-->
    <kafka.version>2.1.0</kafka.version>
    <!-- ClickHouse -->
    <clickhouse.version>0.2.2</clickhouse.version>
    <!-- ElasticSearch -->
    <es.version>7.6.1</es.version>
    <!-- JSON Version -->
    <fastjson.version>1.2.62</fastjson.version>
    <!-- Apache Commons Version -->
    <commons-io.version>2.6</commons-io.version>
    <commons-lang3.version>3.10</commons-lang3.version>
    <commons-beanutils.version>1.9.4</commons-beanutils.version>
    <!-- JDBC Drivers Version-->
    <ojdbc.version>12.2.0.1</ojdbc.version>
    <mysql.version>5.1.44</mysql.version>
    <!-- Other -->
    <jtuple.version>1.2</jtuple.version>
    <!-- Maven Plugins Version -->
    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
    <maven-shade-plugin.version>3.2.1</maven-shade-plugin.version>
</properties>

<dependencyManagement>

    <dependencies>
        <!-- Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <!-- Test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- JDBC -->
        <dependency>
            <groupId>com.oracle.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>${ojdbc.version}</version>
            <systemPath>D:/BigdataUser/jdbc-drivers/ojdbc8-12.2.0.1.jar</systemPath>
            <scope>system</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <!-- Http -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>${http.version}</version>
        </dependency>
        <!-- Apache Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.version}</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-common</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.3.0</version>
        </dependency>
        <!-- Graph Visual -->
        <dependency>
            <groupId>org.graphstream</groupId>
            <artifactId>gs-core</artifactId>
            <version>${gs.version}</version>
        </dependency>
        <dependency>
            <groupId>org.graphstream</groupId>
            <artifactId>gs-ui</artifactId>
            <version>${gs.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>breeze_${scala.version}</artifactId>
            <version>${breeze.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>breeze-viz_${scala.version}</artifactId>
            <version>${breeze.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jfree</groupId>
            <artifactId>jfreechart</artifactId>
            <version>${jfreechart.version}</version>
        </dependency>
        <!-- JSON -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- Kudu -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>${kudu.version}</version>
        </dependency>
        <!-- Hive -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <!-- Clickhouse -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${clickhouse.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- ElasticSearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${es.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>x-pack-sql-jdbc</artifactId>
            <version>${es.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>${es.version}</version>
        </dependency>
        <!-- Alibaba Json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- Apache Commons -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons-io.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>${commons-beanutils.version}</version>
        </dependency>
        <!-- Other -->
        <dependency>
            <groupId>org.javatuples</groupId>
            <artifactId>javatuples</artifactId>
            <version>${jtuple.version}</version>
        </dependency>
        <!--
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.3</version>
        </dependency>
        -->
        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.0.1</version>
        </dependency>
    </dependencies>
</dependencyManagement>

```

由於Oracle資料庫驅動包,在Maven倉庫中是沒有,可以設定驅動在系統本地儲存:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-O1RRtLh7-1652004162158)(/img/1616033938957.png)]

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-Y2BnFvB8-1652004162158)(/img/1616033881167.png)]

  • 2)、公共模組【logistics-common】依賴

```xml UTF-8 1.8 1.8

<dependencies>
    <!-- Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <!-- Test -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>
    <!-- JDBC -->
    <dependency>
        <groupId>com.oracle.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <scope>system</scope>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!-- Http -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
    </dependency>
    <!-- Apache Commons -->
    <dependency>
        <groupId>commons-beanutils</groupId>
        <artifactId>commons-beanutils</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
    </dependency>
    <!-- Java Tuples -->
    <dependency>
        <groupId>org.javatuples</groupId>
        <artifactId>javatuples</artifactId>
    </dependency>
    <!-- Alibaba Json -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    <!-- Apache Kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_${scala.version}</artifactId>
    </dependency>
    <!-- Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
    </dependency>
    <!-- Graph Visual -->
    <dependency>
        <groupId>org.graphstream</groupId>
        <artifactId>gs-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.graphstream</groupId>
        <artifactId>gs-ui</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scalanlp</groupId>
        <artifactId>breeze_${scala.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scalanlp</groupId>
        <artifactId>breeze-viz_${scala.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.jfree</groupId>
        <artifactId>jfreechart</artifactId>
    </dependency>
    <!-- Kudu -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
    </dependency>
    <!-- Clickhouse -->
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
    </dependency>
    <!-- ElasticSearch -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
    <!--
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>x-pack-sql-jdbc</artifactId>
        </dependency>
    -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

```

  • 3)、實時ETL模組【logistics-etl】依賴

```xml mvnrepository https://mvnrepository.com/ default cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ elastic.co https://artifacts.elastic.co/maven

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>cn.itcast.logistics</groupId>
        <artifactId>logistics-common</artifactId>
        <version>1.0.0</version>
    </dependency>
    <!-- Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <!-- Structured Streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    <!-- Other -->
    <dependency>
        <groupId>org.javatuples</groupId>
        <artifactId>javatuples</artifactId>
    </dependency>
    <dependency>
        <groupId>net.jpountz.lz4</groupId>
        <artifactId>lz4</artifactId>
    </dependency>
    <dependency>
        <groupId>org.jfree</groupId>
        <artifactId>jfreechart</artifactId>
    </dependency>
    <!-- kudu -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.0.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

```

  • 4)、離線指標計算模組【logistics-offline】依賴

```xml UTF-8 1.8 1.8

<dependencies>
    <dependency>
        <groupId>cn.itcast.logistics</groupId>
        <artifactId>logistics-common</artifactId>
        <version>1.0.0</version>
    </dependency>
    <!-- Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <!-- Structured Streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
    </dependency>
    <dependency>
        <groupId>net.jpountz.lz4</groupId>
        <artifactId>lz4</artifactId>
    </dependency>
    <dependency>
        <groupId>org.jfree</groupId>
        <artifactId>jfreechart</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    <!-- kudu -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
    </dependency>
    <!-- Other -->
    <dependency>
        <groupId>org.javatuples</groupId>
        <artifactId>javatuples</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.0.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

```

​ 當工程Project和模組Module新增pom依賴以後,重新整理這個工程,新增相關依賴jar包,最好在每個模組下,建立測試類,執行程式看是否成功。

1616034280979

按照Maven管理工程目錄結構,建立相應目錄,如上圖所示,編寫測試程式CommonAppTest

06-[掌握]-專案初始化之匯入資料生成器模組

任務:將專案模擬生成資料 模組匯入至MavenProject工程中,具體步驟如下所述:

  • 1)、解壓【logistics-generate.zip】模組到Maven Project目錄【D:\Logistics_New\itcast-logistics-parent】下

1616034528374

  • 2)、顯示匯入模組到Maven Project工程中

1616034623264

選擇,前面解壓的模組,點選一步,直到結束

1616034687302

  • 3)、在Maven Project工程pom.xml檔案中,手動新增該模組為父工程的子模組。

1616034798350

至此結束,將專案資料模擬生成器模組匯入至Maven Projet OK。

  • 4)、初始化操作:將table-data目錄一定設定為資源目錄

1616034892381

相關程式碼功能說明:

1616036448312

07-[掌握]-專案初始化之構建公共模組

任務:對專案公共模組進行初始化操作,包含建立表,匯入工具類等等。

針對物流專案來說,涉及2個系統,物流系統Logistics:48張表和CRM系統:3張表,每張表資料都會封裝到JavaBean物件中。

==對於資料庫中51張表來說:id欄位作為表主鍵,remark欄位作為備註說明,cdt和udt分別表示資料建立時間和最後更新資料。==

1616035908137

公共模組建立包完成以後,如下圖所示:

1616036014799

在公共模組【logistics-common】的scala目錄下,建立如下程式包

1616036044389

結構如下所示:

1616036056523

匯入 JavaBean 物件:資料庫中51張表,對應JavaBean實體類,直接放入包中即可

  • 1)、將:資料\公共模組\beans目錄下檔案匯入到common

1616036189879

匯入公共處理類:連線資料庫工具類等

  • 將:資料\公共模組\utils目錄下檔案匯入到common

1616036279668

重新,重新整理整個Maven Project,匯入相關依賴。

08-[理解]-實時ETL開發之載入配置檔案

任務:首先對ETL模組進行初始化(建立包)和專案屬性配置檔案(properties)及載入配置。

  • 1)、本次專案採用Scala程式語言,因此建立scala目錄

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-pm6l9kuW-1652004162163)(/img/1616036610270.png)]

建立完成以後,目錄結構如下所示:

1616036624720

  • 2)、每個專案,需要將資料庫等連線資訊配置到屬性檔案中,方便測試、開發和生產環境修改

在公共模組【logistics-common】的resources目錄建立配置檔案:config.properties

```properties

CDH-6.2.1

bigdata.host=node2.itcast.cn

HDFS

dfs.uri=hdfs://node2.itcast.cn:8020

Local FS

local.fs.uri=file://

Kafka

kafka.broker.host=node2.itcast.cn kafka.broker.port=9092 kafka.init.topic=kafka-topics --zookeeper node2.itcast.cn:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics kafka.logistics.topic=logistics kafka.crm.topic=crm

ZooKeeper

zookeeper.host=node2.itcast.cn zookeeper.port=2181

Kudu

kudu.rpc.host=node2.itcast.cn kudu.rpc.port=7051 kudu.http.host=node2.itcast.cn kudu.http.port=8051

ClickHouse

clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver clickhouse.url=jdbc:clickhouse://node2.itcast.cn:8123/logistics clickhouse.user=root clickhouse.password=123456

ElasticSearch

elasticsearch.host=node2.itcast.cn elasticsearch.rpc.port=9300 elasticsearch.http.port=9200

Azkaban

app.first.runnable=true

Oracle JDBC

db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL" db.oracle.user=itcast db.oracle.password=itcast

MySQL JDBC

db.mysql.driver=com.mysql.jdbc.Driver db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false db.mysql.user=root db.mysql.password=123456

Data path of ETL program output

Run in the yarn mode in Linux

spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse spark.app.dfs.jars.dir=/apps/logistics/jars

Run in the local mode in Linux

spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint spark.app.local.data.dir=/apps/logistics/dat-local/warehouse spark.app.local.jars.dir=/apps/logistics/jars

Running in the local Mode in Windows

spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse spark.app.win.jars.dir=D://apps/logistics/jars ```

需要編寫工具類,讀取屬性檔案內容,解析每個Key的值,可使用ResourceBundle

1616037067692

```java package cn.itcast.logistics.common

import java.util.{Locale, ResourceBundle}

/ * 讀取配置檔案的工具類 */ object Configuration { / * 定義配置檔案操作的物件 */ private lazy val resourceBundle: ResourceBundle = ResourceBundle.getBundle( "config", new Locale("zh", "CN") ) private lazy val SEP = ":"

// CDH-6.2.1
lazy val BIGDATA_HOST: String = resourceBundle.getString("bigdata.host")

// HDFS
lazy val DFS_URI: String = resourceBundle.getString("dfs.uri")

// Local FS
lazy val LOCAL_FS_URI: String = resourceBundle.getString("local.fs.uri")

// Kafka
lazy val KAFKA_BROKER_HOST: String = resourceBundle.getString("kafka.broker.host")
lazy val KAFKA_BROKER_PORT: Integer = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))
lazy val KAFKA_INIT_TOPIC: String = resourceBundle.getString("kafka.init.topic")
lazy val KAFKA_LOGISTICS_TOPIC: String = resourceBundle.getString("kafka.logistics.topic")
lazy val KAFKA_CRM_TOPIC: String = resourceBundle.getString("kafka.crm.topic")
lazy val KAFKA_ADDRESS: String = KAFKA_BROKER_HOST + SEP + KAFKA_BROKER_PORT

// Spark
lazy val LOG_OFF = "OFF"
lazy val LOG_DEBUG = "DEBUG"
lazy val LOG_INFO = "INFO"
lazy val LOCAL_HADOOP_HOME = "D:/BigdataUser/hadoop-3.0.0"
lazy val SPARK_KAFKA_FORMAT = "kafka"
lazy val SPARK_KUDU_FORMAT = "kudu"
lazy val SPARK_ES_FORMAT = "es"
lazy val SPARK_CLICK_HOUSE_FORMAT = "clickhouse"

// ZooKeeper
lazy val ZOOKEEPER_HOST: String = resourceBundle.getString("zookeeper.host")
lazy val ZOOKEEPER_PORT: Integer = Integer.valueOf(resourceBundle.getString("zookeeper.port"))

// Kudu
lazy val KUDU_RPC_HOST: String = resourceBundle.getString("kudu.rpc.host")
lazy val KUDU_RPC_PORT: Integer = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))
lazy val KUDU_HTTP_HOST: String = resourceBundle.getString("kudu.http.host")
lazy val KUDU_HTTP_PORT: Integer = Integer.valueOf(resourceBundle.getString("kudu.http.port"))
lazy val KUDU_RPC_ADDRESS: String = KUDU_RPC_HOST + SEP + KUDU_RPC_PORT

// ClickHouse
lazy val CLICK_HOUSE_DRIVER: String = resourceBundle.getString("clickhouse.driver")
lazy val CLICK_HOUSE_URL: String = resourceBundle.getString("clickhouse.url")
lazy val CLICK_HOUSE_USER: String = resourceBundle.getString("clickhouse.user")
lazy val CLICK_HOUSE_PASSWORD: String = resourceBundle.getString("clickhouse.password")

// ElasticSearch
lazy val ELASTICSEARCH_HOST: String = resourceBundle.getString("elasticsearch.host")
lazy val ELASTICSEARCH_RPC_PORT: Integer = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))
lazy val ELASTICSEARCH_HTTP_PORT: Integer = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))
lazy val ELASTICSEARCH_ADDRESS: String = ELASTICSEARCH_HOST + SEP + ELASTICSEARCH_HTTP_PORT

// Azkaban
lazy val IS_FIRST_RUNNABLE: java.lang.Boolean = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))

// ## Data path of ETL program output ##
// # Run in the yarn mode in Linux
lazy val SPARK_APP_DFS_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.dfs.checkpoint.dir") // /apps/logistics/dat-hdfs/spark-checkpoint
lazy val SPARK_APP_DFS_DATA_DIR: String = resourceBundle.getString("spark.app.dfs.data.dir") // /apps/logistics/dat-hdfs/warehouse
lazy val SPARK_APP_DFS_JARS_DIR: String = resourceBundle.getString("spark.app.dfs.jars.dir") // /apps/logistics/jars

// # Run in the local mode in Linux
lazy val SPARK_APP_LOCAL_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.local.checkpoint.dir") // /apps/logistics/dat-local/spark-checkpoint
lazy val SPARK_APP_LOCAL_DATA_DIR: String = resourceBundle.getString("spark.app.local.data.dir") // /apps/logistics/dat-local/warehouse
lazy val SPARK_APP_LOCAL_JARS_DIR: String = resourceBundle.getString("spark.app.local.jars.dir") // /apps/logistics/jars

// # Running in the local Mode in Windows
lazy val SPARK_APP_WIN_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.win.checkpoint.dir") // D://apps/logistics/dat-local/spark-checkpoint
lazy val SPARK_APP_WIN_DATA_DIR: String = resourceBundle.getString("spark.app.win.data.dir") // D://apps/logistics/dat-local/warehouse
lazy val SPARK_APP_WIN_JARS_DIR: String = resourceBundle.getString("spark.app.win.jars.dir") // D://apps/logistics/jars

// # Oracle JDBC & # MySQL JDBC
lazy val DB_ORACLE_URL: String = resourceBundle.getString("db.oracle.url")
lazy val DB_ORACLE_USER: String = resourceBundle.getString("db.oracle.user")
lazy val DB_ORACLE_PASSWORD: String = resourceBundle.getString("db.oracle.password")

lazy val DB_MYSQL_DRIVER: String = resourceBundle.getString("db.mysql.driver")
lazy val DB_MYSQL_URL: String = resourceBundle.getString("db.mysql.url")
lazy val DB_MYSQL_USER: String = resourceBundle.getString("db.mysql.user")
lazy val DB_MYSQL_PASSWORD: String = resourceBundle.getString("db.mysql.password")

def main(args: Array[String]): Unit = {
    println("DB_ORACLE_URL = " + DB_ORACLE_URL)
    println("KAFKA_ADDRESS = " + KAFKA_ADDRESS)
}

} ```

09-[掌握]-實時ETL開發之流計算程式【模板】

任務:==如何編寫流式計算程式==,此處使用StructuredStreaming結構化流實時消費資料,進行ETL轉換。

1616037425475

具體編寫流式程式程式碼,分為三個部分完成:

  • 第一部分、編寫程式【模板】
  • 第二部分、程式碼編寫,消費資料,列印控制檯
  • 第三部分、測試,啟動MySQL資料庫和Canal及Oracle資料庫和OGG。

測試程式:==實時從Kafka消費資料(物流系統和CRM系統業務資料),將資料列印在控制檯,沒有任何邏輯==

```ini step1、構建SparkSession物件 1. 初始化設定Spark Application配置 2. 判斷Spark Application執行模式進行設定 3. 構建SparkSession例項物件

step2、消費資料,列印控制檯 4. 初始化消費物流Topic資料引數 5. 消費物流Topic資料,列印控制檯 6. 初始化消費CRM Topic資料引數 7. 消費CRM Topic資料,列印控制檯

step3、啟動等待終止 8. 啟動流式應用,等待終止 ```

建立物件LogisticsEtlApp,編寫main方式, 主要程式碼步驟如下:

```java package cn.itcast.logistics.etl.realtime

import org.apache.spark.sql.{DataFrame, SparkSession}

/* * 編寫StructuredStreaming程式,實時從Kafka訊息資料(物流相關資料和CRM相關資料),列印控制檯Console * 1. 初始化設定Spark Application配置 * 2. 判斷Spark Application執行模式進行設定 * 3. 構建SparkSession例項物件 * 4. 初始化消費物流Topic資料引數 * 5. 消費物流Topic資料,列印控制檯 * 6. 初始化消費CRM Topic資料引數 * 7. 消費CRM Topic資料,列印控制檯 * 8. 啟動流式應用,等待終止 / object LogisticsEtlApp {

def main(args: Array[String]): Unit = {
    // step1. 構建SparkSession例項物件,設定相關屬性引數值
    /*
        1. 初始化設定Spark Application配置
        2. 判斷Spark Application執行模式進行設定
        3. 構建SparkSession例項物件
     */
    val spark: SparkSession = SparkSession.builder()
        .getOrCreate()
    import spark.implicits._

    // step2. 從Kafka實時消費資料,設定Kafka Server地址和Topic名稱
    // step3. 將ETL轉換後資料列印到控制檯,啟動流式應用
    /*
        4. 初始化消費物流Topic資料引數
        5. 消費物流Topic資料,列印控制檯
        6. 初始化消費CRM Topic資料引數
        7. 消費CRM Topic資料,列印控制
     */
    val logisticsDF: DataFrame = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
        .option("subscribe", "logistics")
        .option("maxOffsetsPerTrigger", "100000")
        .load()


    // step4. 流式應用啟動以後,等待終止,關閉資源
    /*
        8. 啟動流式應用,等待終止
     */

}

} ```

10-[掌握]-實時ETL開發之流計算程式【程式設計】

編寫完成從Kafka消費資料,列印控制檯上,其中建立SparkSession例項物件時,需要設定引數值。

```java package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.Configuration import org.apache.commons.lang3.SystemUtils import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SparkSession}

/* * 編寫StructuredStreaming程式,實時從Kafka訊息資料(物流相關資料和CRM相關資料),列印控制檯Console * 1. 初始化設定Spark Application配置 * 2. 判斷Spark Application執行模式進行設定 * 3. 構建SparkSession例項物件 * 4. 初始化消費物流Topic資料引數 * 5. 消費物流Topic資料,列印控制檯 * 6. 初始化消費CRM Topic資料引數 * 7. 消費CRM Topic資料,列印控制檯 * 8. 啟動流式應用,等待終止 / object LogisticsEtlApp {

def main(args: Array[String]): Unit = {
    // step1. 構建SparkSession例項物件,設定相關屬性引數值
    // 1. 初始化設定Spark Application配置
    val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .set("spark.sql.session.timeZone", "Asia/Shanghai")
        .set("spark.sql.files.maxPartitionBytes", "134217728")
        .set("spark.sql.files.openCostInBytes", "134217728")
        .set("spark.sql.shuffle.partitions", "3")
        .set("spark.sql.autoBroadcastJoinThreshold", "67108864")
    // 2. 判斷Spark Application執行模式進行設定
    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
        //本地環境LOCAL_HADOOP_HOME
        System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
        //設定執行環境和checkpoint路徑
        sparkConf
            .set("spark.master", "local[3]")
            .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
    } else {
        //生產環境
        sparkConf
            .set("spark.master", "yarn")
            .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
    }
    // 3. 構建SparkSession例項物件
    val spark: SparkSession = SparkSession.builder()
        .config(sparkConf)
        .getOrCreate()
    import spark.implicits._

    // step2. 從Kafka實時消費資料,設定Kafka Server地址和Topic名稱
    // step3. 將ETL轉換後資料列印到控制檯,啟動流式應用
    // 4. 初始化消費物流Topic資料引數
    val logisticsDF: DataFrame = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
        .option("subscribe", "logistics")
        .option("maxOffsetsPerTrigger", "100000")
        .load()
    // 5. 消費物流Topic資料,列印控制檯
    logisticsDF.writeStream
        .queryName("query-logistics-console")
        .outputMode(OutputMode.Append())
        .format("console")
        .option("numRows", "10")
        .option("truncate", "false")
        .start()

    // 6. 初始化消費CRM Topic資料引數
    val crmDF: DataFrame = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
        .option("subscribe", "crm")
        .option("maxOffsetsPerTrigger", "100000")
        .load()
    // 7. 消費CRM Topic資料,列印控制
    crmDF.writeStream
        .queryName("query-crm-console")
        .outputMode(OutputMode.Append())
        .format("console")
        .option("numRows", "10")
        .option("truncate", "false")
        .start()

    // step4. 流式應用啟動以後,等待終止,關閉資源
    // 8. 啟動流式應用,等待終止
    spark.streams.active.foreach(query => println("啟動Query:" + query.name))
    spark.streams.awaitAnyTermination()
}

}

```

SparkSQL 引數調優設定:

  • 1)、設定會話時區:set("spark.sql.session.timeZone", "Asia/Shanghai")

  • 2)、設定讀取檔案時單個分割槽可容納的最大位元組數

set("spark.sql.files.maxPartitionBytes", "134217728")

  • 3)、設定合併小檔案的閾值:set("spark.sql.files.openCostInBytes", "134217728")

  • 4)、設定 shuffle 分割槽數:set("spark.sql.shuffle.partitions", "4")

  • 5)、設定執行 join 操作時能夠廣播給所有 worker 節點的最大位元組大小

set("spark.sql.autoBroadcastJoinThreshold", "67108864")

11-[掌握]-實時ETL開發之流計算程式【測試】

任務:執行編寫流式計算程式,實時從Kafka消費資料,列印到控制檯上。

1616039305085

  • 2)、啟動MySQL資料庫和Canal採集CRM系統業務資料

```ini 使用VMWare 啟動node1.itcast.cn虛擬機器,使用root使用者(密碼123456)登入 1) 啟動MySQL資料庫 # 檢視容器 [root@node1 ~]# docker ps -a 8b5cd2152ed9 mysql:5.7 0.0.0.0:3306->3306/tcp mysql

# 啟動容器
[root@node1 ~]# docker start mysql
myoracle

# 容器狀態
[root@node1 ~]# docker ps
8b5cd2152ed9        mysql:5.7   Up 6 minutes        0.0.0.0:3306->3306/tcp   mysql

2) 啟動CanalServer服務 # 檢視容器 [root@node1 ~]# docker ps -a 28888fad98c9 canal/canal-server:v1.1.2 0.0.0.0:11111->11111/tcp canal-server

# 啟動容器
[root@node1 ~]# docker start canal-server
myoracle

# 容器狀態
[root@node1 ~]# docker ps
28888fad98c9        canal/canal-server:v1.1.2       Up 2 minutes  0.0.0.0:11111->11111/tcp   canal-server

# 進入容器
[root@node1 ~]# docker exec -it canal-server /bin/bash
[root@28888fad98c9 admin]#

# 進入CanalServer啟動指令碼目錄
[root@28888fad98c9 admin]# cd canal-server/bin/

# 重啟CanalServer服務
[root@28888fad98c9 bin]# ./restart.sh

# 退出容器
[root@28888fad98c9 bin]# exit

```

  • 3)、啟動流式應用程式,對MySQL資料庫中CRM系統表資料進行更新和刪除

測試執行流式計算程式時,檢查本地Checkpoint目錄是否存在,如果存在,將其刪除。

1616039884369

可以啟動Oracle資料庫和OGG服務,測試是否消費資料,此處省略。

12-[掌握]-實時ETL開發之實時業務資料測試

任務:執行資料模擬生成器程式,實時向CRM系統或Logistics物流系統插入資料,Canal和OGG採集,流式程式實時消費,以實時CRM系統為例,實時向CRM系統寫入資料

1616040115880

執行資料模擬生成器程式,實時產生資料。

1616040236891

  • 2)、執行流式計算程式,檢視控制檯介面,實時消費Kafka資料

1616040267494

針對物流系統Logistics來說,可以採取同樣方式實時產生資料,進行消費。

執行模擬資料生成器:MockLogisticsDataApp,吸怪【isClean=true】表示先清空表的資料,再刪除。

1616040359469