Hudi 實踐 | 如何基於 CDH6 環境編譯 Hudi-0.9.0

語言: CN / TW / HK

1.文件編寫目的

Apache Hudi是一個Data Lakes的開源方案,是Hadoop Updates and Incrementals的簡寫,它是由Uber開發並開源的Data Lakes解決方案。Hudi 是一個豐富的平臺,用於構建具有增量資料管道的流式資料湖,具有如下基本特性/能力:

  • Hudi能夠攝入(Ingest)和管理(Manage)基於HDFS之上的大型分析資料集,主要目的是高效的減少入庫延時。

  • Hudi基於Spark來對HDFS上的資料進行更新、插入、刪除等。

  • Hudi在HDFS資料集上提供如下流原語:插入更新(如何改變資料集);增量拉取(如何獲取變更的資料)。

  • Hudi可以對HDFS上的parquet格式資料進行插入/更新操作。

  • Hudi通過自定義InputFormat與Hadoop生態系統(Spark、Hive、Parquet)整合。

  • Hudi通過Savepoint來實現資料恢復。

  • Hudi支援Spark 2.x版本,建議使用2.4.4+版本的Spark。

本篇文章Fayson主要介紹如何基於CDH6.3.2版本編譯Hudi

2.編譯環境準備

1.本次的編譯環境主要是基於Intellij Idea工具進行編譯,開啟Idea開發工具,從git上將hudi的原始碼checkout下來。

點選“Get from VCS”,選擇GitHub方式,填寫Hudi的git地址:https://github.com/apache/hudi.git

點選Clone將Hudi的master程式碼拉至本地

選中hudi工程,右鍵切換分支版本至0.9.0

點選“Branches”,選擇0.9.0版本並checkout

到此完成了Hudi原始碼的Checkout,接下來調整依賴包版本及簡單的調整程式碼進行編譯。

注意:Hudi是Java開發,在自己的開發環境中還需要調整後自己的Java環境變數。

3 . 原始碼編譯及修改

本次編譯主要是為了能夠更好的適配CDH6.3.2叢集,因此在編譯的過程中需要將Maven依賴調整為CDH6.3.2版本。

1.修改pom.xml配置檔案,將裡面的依賴修改為如下

    • 確認<repositories></repositories>部分是否有Cloudera的Maven源

    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/public/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>

    • 改<properties></properties>部分的hadoop.version、hive.version以及spark2.version的版本

<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
<hive.version>2.1.1-cdh6.3.2</hive.version>
<spark2.version>2.4.0-cdh6.3.2</spark2.version>

    • 修改hive-jdbc和hive-service兩個依賴的配置,新增排除

    <exclusion>
      <groupId>org.glassfish</groupId>
      <artifactId>javax.el</artifactId>
    </exclusion>

2.修改hudi-spark模組的org.apache.hudi.DefaultSource類中的部分程式碼段

使用CDH6.3.2版本的Spark依賴包找不到org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

3.hudi-utilities模組程式碼修改

    • org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector類中的getFileAttributesFromRecord(JSONObject record)方法

該方法未丟擲JSONExcepiton,導致編譯失敗

    • org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector類中processAndDeleteInvalidMessages方法新增JSONException異常丟擲

    • org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector類中testFileAttributesFromRecordShouldReturnsExpectOutput方法新增JSONException異常丟擲

    • org.apache.hudi.utilities.sources.helpers.TestS3EventsMetaSelector類中testNextEventsFromQueueShouldReturnsEventsFromQueue方法新增JSONException異常丟擲

4.hudi-integ-test模組程式碼修改,註釋pom.xml檔案中jackson-annotations依賴的scope)

5.hudi-spark-datasource/hudi-spark-common模組的

org.apache.hudi.DataSourceReadOptions,將如下截圖部分程式碼註釋(204-228行的if判斷)

6.完成上修改後,通過idea執行編譯操作

等待命令執行成功

至此完成了Hudi0.9.0版本的編譯。

4 . Hudi與Spark整合測試

1.在前面完成了Hudi原始碼的編譯,在packaging目錄下的hudi-spark-bundle模組可以找到編譯好的hudi-spark-bundle_2.11-0.9.0的jar包

2.將編譯好的jar包上傳至CDH叢集任意有Spark Gateway節點的伺服器上

3.使用spark-shell命令整合hudi並測試基本功能

spark-shell \
  --jars hudi-spark-bundle_2.11-0.9.0.jar \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

4.在命令列執行如下程式碼,建立一個hudi的表並插入資料

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow1"
val basePath = "hdfs:///tmp/hudi_trips_cow1"
val dataGen = new DataGenerator
//寫入資料
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.embed.timeline.server","false").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

5.執行如下程式碼讀取資料

val tripsSnapshotDF = spark.read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

檢視該表的總資料量

6.執行如下程式碼刪除資料

// 取出兩條要刪除的資料
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

// 刪除
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.embed.timeline.server","false").
  mode(Append).
  save(basePath)

// 再次查詢
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 應該返回兩條資料
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()

7.檢視HDFS上的hudi資料目錄

hadoop fs -ls -R /tmp/hudi_trips_cow1

到完成了簡單的Spark與Hudi的整合測試

1.Hudi0.9.0版本與Spark的整合,至少需要Spark2.4.4及以上版本,在更高版本中引入的ORC的支援,因此使用CDH6.3.2版本依賴進行編譯是去掉了ORC相關的程式碼段

2.在編譯的過程中,hudi依賴的hive依賴中存在低版本的jetty依賴包,導致在執行寫入時報如下異常:對於該異常的處理方式,需要在執行寫入hudi資料的程式碼段中增加option("hoodie.embed.timeline.server","false").

java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
  at io.javalin.core.util.JettyServerUtil.defaultSessionHandler(JettyServerUtil.kt:50)
  at io.javalin.Javalin.<init>(Javalin.java:94)

3.在後續的文章中會使用Hudi與支援的Hive、Spark、MR等進行詳細的測試。