iceberg的java api使用

語言: CN / TW / HK

【前言】

瞭解一個元件的最好方式是先使用該元件,今天我們就來聊聊如何通過java api對iceberg進行操作。

為什麼是選擇api進行介紹,而不是更通用的flink、spark、hive等。一方面是覺得flink、spark使用iceberg的介紹網上已經有很多,官網的介紹也比較清晰,而java api的介紹則相對少些;另一方面,不管是flink,spark最終都還是呼叫這些基本的api完成相關的操作的,因此先從api入手,後續對flink,spark,trino等元件對iceberg的操作原理理解起來也會更容易些。所以就有了本文的內容。

【catalog的建立】

在建立資料庫,表之前需要先建立catalog,這裡主要介紹hive型別的catalog。

import org.apache.iceberg.hive.HiveCatalog;

HiveCatalog catalog = new HiveCatalog();
catalog.setConf(conf);

Map <String, String> properties = new HashMap<String, String>();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/user/hive/warehouse")
properties.put(CatalogProperties.URI, "thrift://172.16.55.21:9083");
properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.hive.HiveCatalog");

// 初始化catalog
catalog.initialize("hive",properties);

對於hive型別的catalog,主要指定資料庫儲存位置,以及hive metastore server的URI。

【建立表】

對於iceberg表,可以理解由四部分組成,表結構定義(schema)、分割槽定義(partitionSpec)、表的屬性(properties),以及表的唯一識別資訊(identity)即表所屬的資料庫與表名建立表時只需要分別制定這些內容即可。

// 定義表結構schema
Schema schema = new Schema(
    Types.NestedField.required(1, "id", Types.IntegerType.get()),
    Types.NestedField.required(2, "name", Types.StringType.get()),
    Types.NestedField.required(3, "birth", Types.StringType.get()));

// 分割槽定義(以birth欄位按月進行分割槽)
PartitionSpec spec = PartitionSpec.builderFor(schema).month("birth").build();

// 資料庫名,表名
TableIdentifier name = TableIdentifier.of("iceberg_db", "developer");
// 表的屬性
Map<String, String> properties = new HashMap<>();
properties.put("engine.hive.enabled", "true");
// 建表
Table table = catalog.createTable(name, schema, spec, properties);

這裡需要注意的是:分割槽定義中的欄位必須是schema中已有的欄位,如果在schema中找不到對應的欄位,會報錯拋異常

但是,通過sql方式建表時,分割槽欄位會隱式地加入到表字段定義中,即不用強制寫到schema的欄位定義。例如通過如下hivesql語句建表:

create table developer(
id int,
name string
)
partitioned by (birth Date)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
tblproperties('engine.hive.enabled'='true');

建表後的情況如下所示:

【插入資料】

插入資料可以分為3個步驟,首先根據表格式構造對應的資料記錄,然後將記錄寫入到指定格式(parquet、orc等)的檔案中,最後將檔案列表寫入到表中。

// 1. 構建記錄
GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(ImmutableMap.of("id", 1, "name", "chen", "birth", "2020-03-08"));
builder.add(ImmutableMap.of("id", 2, "name", "yuan", "birth", "2021-03-09"));
builder.add(ImmutableMap.of("id", 3, "name", "jie", "birth", "2023-03-10"));
builder.add(ImmutableMap.of("id", 4, "name", "ma", "birth", "2023-03-11"));
ImmutableList<GenericRecord> records = builder.build();

// 2. 將記錄寫入parquet檔案
String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
    Parquet.writeData(file)
    .schema(schema)
    .createWriterFunc(GenericParquetWriter::buildWriter)
    .overwrite()
    .withSpec(PartitionSpec.unpartitioned())
    .build();
try {
    for (GenericRecord record : builder.build()) {
        dataWriter.write(record);
    }
} finally {
    dataWriter.close();
}

// 3. 將檔案寫入table中
DataFile dataFile = dataWriter.toDataFile();
table.newAppend().appendFile(dataFile).commit();

這裡,對於資料檔案的儲存位置是有一定規範的,如果沒有在指定路徑下存放,那麼對於其他元件來說(比如表同步到hive後),會出現資料不完整或者查不到的情況。

【行級別的查詢資料】

查詢是通過構造ScanBuilder,並IcebergGenerics.read來完成的。ScanBuilder還可以進行select選擇列,以及通過where指定查詢條件。

Table table = catalog.loadTable(TableIdentifier.of("iceberg_db", "developer"));
IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(table);
// 查詢全部
CloseableIterable<Record> records = scanBuilder.build();
for (Record record : records) {
}
// 指定select列與where條件的查詢
//CloseableIterable<Record> records = scanBuilder.select("id", "name").where(Expressions.lessThan("id", Integer.valueOf(10))).build();

【表結構變更】

iceberg所具備的一項特點就是可以對表結構進行變更,例如新增,刪除已有欄位,欄位名或型別的變更,新增分割槽等。

1)新增列欄位

Table table = catalog.loadTable(TableIdentifier.of("iceberg_db", "developer"));
UpdateSchema newSchema = table.updateSchema();
// 欄位名, 欄位型別
newSchema.addColumn("skill", Type.StringType.get());
updateSchema.commit();

對於已經寫入的記錄資料,其新增欄位的值為NULL。

當然還可以UpdateSchema進行刪除欄位、重新命名欄位、更新欄位(型別),調整欄位位置等操作。

2)新增分割槽

通過UpdatePartitionSpec可以進行分割槽的相關操作。

Table table = catalog.loadTable(TableIdentifier.of(dbName, tblName));
UpdatePartitionSpec updatePartitionSpec = table.updateSpec();
updatePartitionSpec.addField("skill");
updatePartitionSpec.commit();

【snapshot的操作】

完成表的載入後,可以得到表的所有snapshot資訊,也可以刪除指定的snapshot,或指定時間之前的snapshot。

Table table = catalog.loadTable(TableIdentifier.of(dbName, tblName));
for (Snapshot snapshot : table.snapshots()) {
    System.out.println(snapshot.sequenceNumber() + " " + snapshot.snapshotId() + " " + snapshot.parentId() + " " + snapshot.timestampMillis());
}

ExpireSnapshots expireSnapshot = table.expireSnapshots();
expireSnapshot.expireOlderThan(table.currentSnapshot().timestampMillis());
expireSnapshot.commit();

【刪除表】

刪除表的操作則很簡單,通過catalog對錶進行刪除。

TableIdentifier name = TableIdentifier.of("iceberg_db", "developer");
catalog.dropTable(name, true);

【總結】

本文主要介紹iceberg api的一些基本操作,這裡未涉及資料的更新與刪除,因為這是一個比較大的知識點。另外,分割槽的新增,新增新的列這些操作的背後邏輯和iceberg的檔案儲存格式都有一定的關係,我們後續會逐一介紹。

好了,這就是本文的全部內容,如果覺得本文對您有幫助,請點贊+轉發,如果覺得有不正確的地方,也可以拍磚指點,最後,歡迎加我微信交流~

本文分享自微信公眾號 - 陳猿解碼(gh_383bc7486c1a)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。