Spark Catalog深入理解與實戰

語言: CN / TW / HK

本文首發微信公眾平臺:碼上觀世界

寫過Spark應用程式的同學都知道,通過下面這段程式碼就可以載入和訪問外部Hive資料來源:

SparkSession.builder().
appName(TestSparkHive.class.getSimpleName()).
master("local[*]").
enableHiveSupport().
getOrCreate();
List<Row> list= spark.sql("show databases").collectAsList();

也許你會好奇,它是怎麼找到並訪問外部Hive資料來源的?

其實,Spark識別Hive,也是需要依賴Hive配置項的,配置項的來源可以是$HIVE_HOME環境變數,也可以從Spark的執行環境的classpath下載入Hive相關的配置檔案。

建立對Hive外部資料來源的訪問,不得不提到Spark的兩個類:SessionCatalog和ExternalCatalog。前者是對後者的封裝,對外部資料來源的訪問都是通過ExternalCatalog實現。而ExternalCatalog是一個Trait型別,提供了對錶、函式和分割槽的增刪改查基本介面。對Hive資料來源來講,分別繼承上述兩個類,提供了具體的實現:HiveSessionCatalog和HiveExternalCatalog。

隨著新資料來源(Spark中稱為DataSourceV2)的出現,原來的SessionCatalog暴露出弊端和不足,為了適應新的資料來源特性,Spark推出了新的介面:CatalogPlugin,因為屬於頂層介面,CatalogPlugin本身很簡單,只有3個方法:

public interface CatalogPlugin {
void initialize(String name, CaseInsensitiveStringMap options);
String name();
default String[] defaultNamespace() {
return new String[0];
}
}

實現自定義Catalog,既可以直接實現CatalogPlugin,也可以擴充套件TableCatalog介面,TableCatalog擴充套件了CatalogPlugin並提供了表操作相關功能的介面。同理,實現函式相關的Catalog,也可以直接擴充套件FunctionCatalog,因為它提供了函式管理相關的介面。同SessionCatalog相對應,CatalogPlugin介面體系也實現了V2SessionCatalog,整個CatalogPlugin類體系表示為下圖所示:

V2SessionCatalog不同於SessionCataolog,主要表現在:

1. V2SessionCatalog實現了CatalogPlugIn介面,CatalogPlugIn是針對新資料來源(DatasourceV2)的元資料管理。

2. SessionCatalog 只是普通類,封裝了外部資料來源的元資料管理介面ExternalCatalog。

3. SessionCatalog 作為V2SessionCatalog的屬性,或者說 V2SessionCatalog是SessionCatalog的代理實現。

關於第3點,可以從V2SessionCatalog的實現得到佐證,同時以一個方法listTables的實現為例來看:

/**translates calls to the v1 SessionCatalog. */
class V2SessionCatalog(catalog: SessionCatalog)
extends TableCatalog with SupportsNamespaces with SQLConfHelper {

override def listTables(namespace: Array[String]): Array[Identifier] = {
namespace match {
case Array(db) =>
catalog
.listTables(db)
.map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table))
.toArray
case _ =>
throw QueryCompilationErrors.noSuchNamespaceError(namespace)
}
}
....
}

有了SessionCatalog和V2SessionCatalog,Spark又是如何管理這些Catalog呢?

Spark提供了CatalogManager,其內部通過一個Map型別的記憶體資料結構維護註冊的Catalog例項:

class CatalogManager(
defaultSessionCatalog: CatalogPlugin,
val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
//SESSION_CATALOG_NAME 常量:spark_catalog
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._


private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]

def catalog(name: String): CatalogPlugin = synchronized {
if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
v2SessionCatalog
} else {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
}
}
}

CatalogManager維護了所有Catalog例項的鍵值對資訊,能夠根據catalog名稱返回對應的Catalog例項,其中有一個固定的名字叫spark_catalog,用於當前預設的Catalog例項實現,就是V2SessionCatalog,它代理了普通的SessionCatalog,因此,在使用時,即使什麼Catalog都不註冊,Spark也會根據預設的Catalog例項載入Hive資料來源。但是V2SessionCatalog只是對SessionCatalog的簡單代理,那麼如何實現複雜的資料來源元資料管理功能呢?這就需要擴充套件V2SessionCatalog的實現,這裡以Spark Iceberg的實現為例說明:

public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces> extends BaseCatalog implements CatalogExtension {
private static final String[] DEFAULT_NAMESPACE = new String[]{"default"};
private String catalogName = null;
private TableCatalog icebergCatalog = null;
private StagingTableCatalog asStagingCatalog = null;
private T sessionCatalog = null;
...
}

SparkSessionCatalog實現了CatalogExtension介面,而CatalogExtension介面擴充套件了SparkPlugIn。注意到類中有兩個TableCatalog型別的屬性:icebergCatalog和sessionCatalog。其中sessionCatalog就是上面介紹的V2SessionCatalog。

實際上,icebergCatalog和sessionCatalog是 Iceberg Runtime提供的兩個類定義,分別是:

org.apache.iceberg.spark.SparkCatalog
org.apache.iceberg.spark.SparkSessionCatalog

關於這兩個類的區別,官網有這麼一段解釋:

什麼意思?

就是說,SparkCatalog專用於Iceberg 管理,比如你可以這樣在Spark Catalog Manager中註冊hive和hadoop型別的Catalog:

set spark.sql.catalog.hive_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.hive_iceberg_catalog_demo.type=hive;

或者

set spark.sql.catalog.hadoop_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.hadoop_iceberg_catalog_demo.type=hadoop;

你可以使用如下的形式建立Iceberg表:

CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(
id bigint COMMENT 'unique id',
data string)
USING iceberg
location 'your path'
TBLPROPERTIES ('iceberg.catalog'='new_iceberg_catalog');

如果當前預設namespace在default下,你甚至可以將上面建表語句簡寫為:

CREATE TABLE sample_iceberg_table(
id bigint COMMENT 'unique id',
data string);

因為當前Catalog已經明確定義為Iceberg表,它能自動建立Iceberg表,但無法訪問普通的Hive表。而SparkSessionCatalog不僅可以定義上面的Iceberg Catalog,並在其中建立Iceberg型別的表,還可以建立非Iceberg型別的表,註冊方式同上:

set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;
set spark.sql.catalog.spark_catalog.type=hive;

對SparkSessionCatalog型別的Catalog,其名稱為固定的spark_catalog。它重寫了Spark預設的V2SessionCatalog行為,SparkSessionCatalog可看做是對Hive資料來源的相容,對非Iceberg型別的表操作,跟普通的Hive表操作並無區別。以createTable這個方法為例:

public Table createTable(Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = (String)properties.get("provider");
return this.useIceberg(provider) ? this.icebergCatalog.createTable(ident, schema, partitions, properties) :
this.getSessionCatalog().createTable(ident, schema, partitions, properties);
}

如果是Iceberg表,它使用icebergCatalog建立表,否則就用SessionCatalog建立表。

而listTables直接將請求轉給了SessionCatalog,因為Hive Iceberg表和普通Hive表都基於HMS儲存,所以可以使用SessionCatalog。

public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
return this.getSessionCatalog().listTables(namespace);
}

除了上述區別外,SparkSessionCatalog對Create Table AS Select或者Replace Table As Select無法保證原子性,而SparkCatalog可以。

上面介紹了Spark Iceberg對多型別Catalog的支援,下一步自然要問,這有什麼用?

舉兩個場景來說明:

1. Hive資料入湖

--定義catalog
set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;
set spark.sql.catalog.spark_catalog.type=hive;
set spark.sql.catalog.spark_catalog.uri=thrift://ip:9083;
set spark.sql.catalog.spark_catalog.warehouse=s3a://mybucket/warehouse;


CREATE TABLE spark_catalog.default.sample_iceberg_table(
id bigint COMMENT 'unique id',
data string)
USING iceberg;insert into spark_catalog.default.sample_iceberg_table select * from default.sample_hive_table;

2. 不同Hive版本的資料湖資料遷移,比如從低版本的HMS資料湖遷移到高版本的資料湖,可以這樣實現:

--定義新的資料湖
SET spark.sql.catalog.new_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.new_iceberg_catalog.type=hive;
SET spark.sql.catalog.new_iceberg_catalog.uri=thrift://ip-new:9083;
SET spark.sql.catalog.new_iceberg_catalog.warehouse=s3a://mybucket/warehouse;
--建立新資料湖的表
CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(
id bigint COMMENT 'unique id',
data string)
USING iceberg;--定義舊的資料湖
SET spark.sql.catalog.old_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
SET spark.sql.catalog.old_iceberg_catalog.type=hive;
SET spark.sql.catalog.old_iceberg_catalog.uri=thrift://ip-old:9083;
SET spark.sql.catalog.old_iceberg_catalog.warehouse=hdfs://service/warehouse;


insert into new_iceberg_catalog.default.sample_iceberg_table select * from old_iceberg_catalog.default.sample_iceberg_table;