資料湖Iceberg | 如何正確使用Iceberg

語言: CN / TW / HK

在介紹如何使用Iceberg之前,先簡單地介紹一下Iceberg catalog的概念。catalog是Iceberg對錶進行管理(create、drop、rename等)的一個元件。目前Iceberg主要支援HiveCatalog和HadoopCatalog兩種Catalog。其中HiveCatalog將當前表metadata檔案路徑儲存在Metastore,這個表metadata檔案是所有讀寫Iceberg表的入口,所以每次讀寫Iceberg表都需要先從Metastore中取出對應的表metadata檔案路徑,然後再解析這個Metadata檔案進行接下來的操作。而HadoopCatalog將當前表metadata檔案路徑記錄在一個檔案目錄下,因此不需要連線Metastore。

在下述案例中,筆者會分別使用HiveCatalog和HadoopCatalog建立Iceberg表並進行讀寫操作,再結合案例在下一篇文章中介紹兩者的不同之處。

1

下載Iceberg

在Ic eberg 官方下載頁面(http://iceberg.apache.org/releases/)下載Iceberg Spark runtime Jar。 目前社群已經發布了兩個版本: apache-iceberg-0.7.0-incubating和0.8.0-incubating,建議下載最新的0.8.0版本。 這裡需要注意的是,Iceberg在使用上就是一個Jar包,這個Jar包可以放在Spark的Jars目錄下,然後就可以使用Spark建立Iceberg表、將資料匯入Iceberg表中以及從Iceberg表中查詢。 另外需要注意的一點是,目前Iceberg僅支援Spark 2.4版本。

我們先將下載的iceberg-spark-runtime-0.8.0-incubating.jar放到spark的classpath下,然後啟動spark-shell:

./spark-shell --jars ../../iceberg-spark-runtime-0.8.0-incubating.jar

2

基於HiveCatalog示例

1.使用Spark建立Iceberg表

使用HiveCatalog建立Iceberg表之前需要先在Hive中建立對應的資料庫,否則執行建表語句的話會報錯。下面是完整的建表語句:

import org.apache.iceberg.hive.HiveTableCatalog
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.spark.sql.types._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import java.sql.Timestamp
val conf = spark.sessionState.newHadoopConf()
conf.set("hive.metastore.uris","xxxxxx")
conf.set("hive.metastore.warehouse.dir","xxxxxx")
val catalog = new HiveTableCatalog(conf)
//need to create hiveiceberg at Hive env firstly
val name = TableIdentifier.of("hive_iceberg", "action_logs_36")
val sparkSchema = StructType(List(StructField("id", IntegerType,true),
StructField("user", StringType,false),StructField("action", StringType,false),
StructField("music_id", LongType,false), StructField("event_time", TimestampType,false)))
val icebergSchema = SparkSchemaUtil.convert(sparkSchema)
val spec = PartitionSpec.builderFor(icebergSchema).hour("event_time").identity("action").build
val table = catalog.createTable(name, icebergSchema, spec)

建好表後可以在hive查詢到建立的表:

hive (default)> use hive_iceberg;
OK
Time taken: 0.148 seconds
hive (hive_iceberg)> show tables;
OK
database_nametab_name
hive_icebergaction_logs
Time taken: 0.035 seconds, Fetched: 1 row(s)
hive (hive_iceberg)> show create table action_logs;
OK
createtab_stmt
CREATE EXTERNAL TABLE `action_logs`(
`id` int COMMENT '',
`user` string COMMENT '',
`action` string COMMENT '',
`music_id` bigint COMMENT '',
`event_time` timestamp COMMENT '')
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.FileInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.mapred.FileOutputFormat'
LOCATION
'hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/hive_iceberg.db/action_logs'
TBLPROPERTIES (
'metadata_location'='/libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00000-e7c1e6ce-8eb9-4faf-a176-bd94dec3c0e4.metadata.json',
'numFiles'='1',
'table_type'='ICEBERG',
'totalSize'='1448',
'transient_lastDdlTime'='1591587097')
Time taken: 0.501 seconds, Fetched: 20 row(s)

2.使用Spark匯入資料到Iceberg表

iceberg表建好之後,就可以使用spark將資料匯入Iceberg表中。下面是完整的示例:

val action_data = Seq(Row(5,"lly","view",13643L,Timestamp.valueOf("2020-06-05 03:03:00")),
Row(7,"mint_1989","view",34769L,Timestamp.valueOf("2020-06-05 04:17:00")),
Row(6,"lly","click",13643L,Timestamp.valueOf("2020-06-05 03:04:00")),
Row(8,"mint_1989","click",34769L,Timestamp.valueOf("2020-06-05 04:17:05")))
val df = spark.createDataFrame(sc.makeRDD(action_data), sparkSchema)
df.write.format("iceberg").mode("append").save("hive_iceberg.action_logs_35")

3.使用Spark查詢Iceberg表

scala>spark.read.format("iceberg").load("hive_iceberg.action_logs_35").show
+---+---------+------+--------+-------------------+
| id| user|action|music_id| event_time|
+---+---------+------+--------+-------------------+
| 1| lly| view| 13643|2020-06-05 03:03:00|
| 2| lly| click| 13643|2020-06-05 03:04:00|
| 3|mint_1989| view| 34769|2020-06-05 04:17:00|
| 3|mint_1989| click| 34769|2020-06-05 04:17:05|
+---+---------+------+--------+-------------------+

3

基於HadoopCatalog示例

基於HadoopCatalog的用法與HiveCatalog基本相同,主要的區別在於建表的時候稍有不同,如下所示:

import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.hadoop.conf.Configuration
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/")
val name = TableIdentifier.of("hadoop_iceberg", "action_logs")
...
val table = catalog.createTable(name,icebergSchema, spec)

匯入和查詢的時候也有一點區別:

df.write.format("iceberg").mode("append").save("hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/hadoop_iceberg/action_logs")
spark.read.format("iceberg").load("hdfs://ntsdb0.jd.163.org:9000/libis/hive-2.3.6/hadoop_iceberg/action_logs").show

4

Iceberg表的目錄組織形式

1.HiveCatalog

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs
Found 2 items
drwxrwxrwx - hadoop supergroup 0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data
drwxrwxrwx - hadoop supergroup 0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata

中data目錄下儲存資料檔案,metadata目錄下儲存元資料檔案。

2.metadata目錄

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata
Found 4 items
-rw-r--r-- 1 hadoop supergroup 1448 2020-06-08 11:31 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00000-e7c1e6ce-8eb9-4faf-a176-bd94dec3c0e4.metadata.json
-rw-r--r-- 1 hadoop supergroup 2217 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json
-rw-r--r-- 1 hadoop supergroup 5040 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro
-rw-r--r-- 1 hadoop supergroup 2567 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro

其中00 001-62 ade 8a b-c1cf-40d3-bc21-fd5027bc3a5 5.metadata.json中儲存表的schema、partition spec以及當前snapshot manifests檔案路徑。snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro儲存manifest檔案路徑。bb641961-162a-49a8-b567-885430d4e799-m0.avro記錄本次提交的檔案以及檔案級別元資料。

3.data目錄

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click
Found 1 items
-rw-r--r-- 1 hadoop supergroup 1425 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet

4.HadoopCatalog

Hadoopcatalog與Hivecatalog的的data目錄完全相同,metadata目錄下檔案稍有不同,HadoopCatalog管理的metadata目錄如下所示:

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata
Found 5 items
-rw-r--r-- 1 hadoop supergroup 5064 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/b222d277-2692-4e35-9327-3716dec9f070-m0.avro
-rw-r--r-- 1 hadoop supergroup 2591 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/snap-3124052841098464551-1-b222d277-2692-4e35-9327-3716dec9f070.avro
-rw-r--r-- 1 hadoop supergroup 1476 2020-06-08 17:23 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v1.metadata.json
-rw-r--r-- 1 hadoop supergroup 2261 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v2.metadata.json
-rw-r--r-- 1 hadoop supergroup 1 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text
其中檔案version-hint.text中儲存當前iceberg表的最新snapshot_id,如下所示:
hadoop@xxx:~$ hdfs dfs -cat /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text 2

說明該 表的最新s napshot_id是2,即對應的 snapshot元資料檔案是v2.met adata.json,解析v2.metadata.json可以獲取到該表當前最新snapshot對應的schema、partition spec、父snapshot以及該snapshot對應的manifestList檔案路徑等,因此version-hint.text是HadoopCatalog獲取當前snapshot版本的入口。

閱讀到這裡,讀者可能就會問了,HiveCatalo g的metadata目錄下並沒有version-hint.text檔案,那它獲取當前snapshot版本的入口在哪裡呢?它的入口在Metastore中的schema裡面,讀者可以在HiveCatalog建表schema中的TBPROPERTIES中有個key是"metadata_location ",對應的value就是當前最新的snapshot檔案。因此,有兩點需要說明:

  1. HiveCatalog建立的表,每次提交寫入檔案生成新的snapshot後都需要更新Metastore中的metadata_location欄位。

  2. HiveCatalog和HadoopCatalog不能混用。即使用HiveCatalog建立的表,再使用HadoopCatalog是不能正常載入的,反之亦然。

5

為什麼選擇HadoopCatalog

上面說到Iceberg目前支援兩種Catalog,而且兩種Catalog相互不相容。 那這裡有兩個問題:

  1. 社群是出於什麼考慮實現兩種不相容的Catalog?

  2. 因為兩者不相容,必須選擇其一作為系統唯一的Catalog,那是選擇HiveCatalog還是HadoopCatalog,為什麼?

先回答第一個問題。社群是出於什麼考慮實現兩種不相容的Catalog?

在回答這個問題之前,首先回顧一下上一篇文章中介紹到的基於HadoopCatalog,Iceberg實現資料寫入提交的ACID機制,最終的結論是使用了樂觀鎖機制和HDFS rename的原子性一起保障寫入提交的ACID。如果某些檔案系統比如S3不支援rename的原子性呢?那就需要另外一種機制保障寫入提交的ACID,HiveCatalog就是另一種不依賴檔案系統支援,但是可以提供ACID支援的方案,它在每次提交的時候都更新MySQL中同一行記錄,這樣的更新MySQL本身是可以保證ACID的。這就是社群為什麼會支援兩種不相容Catalog的本質原因。

再來回答第二個問題。HadoopCatalog依賴於HDFS提供的rename原子性語義,而HiveCatalog不依賴於任何檔案系統的rename原子性語義支援,因此基於HiveCatalog的表不僅可以支援HDFS,而且可以支援s3、oss等其他檔案系統。但是HadoopCatalog可以認為只支援HDFS表,比較難以遷移到其他檔案系統。但是HadoopCatalog寫入提交的過程只依賴HDFS,不和Metastore/MySQL互動,而HiveCatalog每次提交都需要和Metastore/MySQL互動,可以認為是強依賴於Metastore,如果Metastore有異常,基於HiveCatalog的Iceberg表的寫入和查詢會有問題。相反,HadoopCatalog並不依賴於Metastore,即使Metastore有異常,也不影響Iceberg表的寫入和查詢。

考慮到我們目前主要還是依賴HDFS,同時不想強依賴於Metastore,所以我們選擇HadoopCatalog作為我們系統唯一的Catalog。即使有一天,想要把HDFS上的表遷移到S3上去,也是可以辦到的,大家想想,無論是HadoopCatalog還是HiveCatalog,資料檔案和元資料檔案本身都是相同的,只是標記當前最新的snapshot的入口不一樣,那隻需要簡單的手動變換一下入口就可以實現Catalog的切換,切換到HiveCatalog上之後,就可以擺脫HDFS的依賴,問題並不大。

6

Iceberg表相關元資料內容

上一篇介紹Iceberg的文章 中講到了Iceberg寫入資料檔案提交snapshot的時候會依次生成manifest檔案、manifestList檔案以及snapshot檔案,那這些元資料檔案中都存放什麼記錄呢?如果使用HiveCatalog建立的表,可以直接使用spark讀出相關元資料記錄。如果是HadoopCatalog建立的表,暫時不能直接使用spark讀出相關資訊,但是也可以使用檔案解析器解析出相關元資料檔案內容。

下文基於HiveCatalog建表模式介紹Iceberg相關元資料:

scala>spark.read.format("iceberg").load("hive_iceberg.action_logs.history").show
+--------------------+-------------------+---------+-------------------+
| made_current_at| snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2020-06-08 12:20:...|6771375506965563160| null| true|
+--------------------+-------------------+---------+-------------------+
scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.snapshots").show
+--------------------+-------------------+---------+---------+--------------------+--------------------+
| committed_at| snapshot_id|parent_id|operation| manifest_list| summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2020-06-08 12:20:...|6771375506965563160| null| append|/libis/hive-2.3.6...|[spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.manifests").show(false)
+---------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------------------------------------------------+
|path |length|partition_spec_id|added_snapshot_id |added_data_files_count|existing_data_files_count|deleted_data_files_count|partition_summaries |
+---------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------------------------------------------------+
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro|5040 |0 |6771375506965563160|4 |0 |0 |[[false, 2020-06-04-19, 2020-06-04-20], [false, click, view]]|
+---------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------------------------------------------------+
scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.files").show(false)
+---------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+------------+------------------+-------------------+---------------------------------------------+----------------------------------------+----------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+-------------+
|file_path |file_format|partition |record_count|file_size_in_bytes|block_size_in_bytes|column_sizes |value_counts |null_value_counts |lower_bounds |upper_bounds |key_metadata|split_offsets|
+---------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+------------+------------------+-------------------+---------------------------------------------+----------------------------------------+----------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+-------------+
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=view/00007-39-4e7af786-9668-4e3d-b8aa-07b7b30fa60a-00000.parquet |PARQUET |[442027, view] |1 |1418 |67108864 |[1 -> 51, 2 -> 50, 3 -> 51, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> lly, 3 -> view, 4 -> K5, 5 -> !�F�] |[1 -> , 2 -> lly, 3 -> view, 4 -> K5, 5 -> !�F�] |null |[4] |
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet|PARQUET |[442027, click]|1 |1425 |67108864 |[1 -> 51, 2 -> 50, 3 -> 52, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> lly, 3 -> click, 4 -> K5, 5 -> ���F�] |[1 -> , 2 -> lly, 3 -> click, 4 -> K5, 5 -> ���F�] |null |[4] |
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-20/action=view/00023-55-f0494272-6166-4386-88c7-059e3081aa11-00000.parquet |PARQUET |[442028, view] |1 |1460 |67108864 |[1 -> 51, 2 -> 56, 3 -> 51, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> mint_1989, 3 -> view, 4 -> ч, 5 -> '��G�] |[1 -> , 2 -> mint_1989, 3 -> view, 4 -> ч, 5 -> '��G�] |null |[4] |
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-20/action=click/00031-63-a04ce10d-ae98-4004-bda8-2f18d842b66b-00000.parquet|PARQUET |[442028, click]|1 |1467 |67108864 |[1 -> 51, 2 -> 56, 3 -> 52, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> mint_1989, 3 -> click, 4 -> ч, 5 -> @r�G�]|[1 -> , 2 -> mint_1989, 3 -> click, 4 -> ч, 5 -> @r�G�]|null |[4] |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+------------+------------------+-------------------+---------------------------------------------+----------------------------------------+----------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+-------------+

7

總結

本文基於上一篇介紹Iceberg入門的文章,介紹瞭如何使用Iceberg進行簡單的建立表、往表中寫資料、查詢表資料以及表中元資料。同時也介紹了HadoopCatalog和HiveCatalog的不同以及我們選擇HadoopCatalog的一些想法。

作者簡介

子和,網易大資料開發工程師,長期從事分散式KV資料庫、分散式時序資料庫以及大資料底層元件等相關工作。