Flink DataStream 型別系統 TypeInformation

語言: CN / TW / HK

Flink DataStream 應用程式所處理的事件以資料物件的形式存在。函式呼叫時會傳入資料物件,同時也可以輸出資料物件。因此,Flink 在內部需要能夠處理這些物件。當通過網路傳輸或者讀寫狀態後端、檢查點以及儲存點時,需要對它們進行序列化和反序列化。為了能夠更高效的做到這一點,Flink 需要詳細瞭解應用程式處理的資料型別。Flink 使用型別資訊的概念來表示資料型別,併為每種資料型別生成特定的序列化器、反序列化器以及比較器。

此外,Flink 還有一個型別提取系統,可以分析函式的輸入和返回型別來自動獲取型別資訊,進而獲得序列化器和反序列化器。但是,在某些情況下,例如使用了 Lambda 函式或者泛型型別,必須顯式提供型別資訊才能使應用程式正常工作或者提高其效能。

在本文中,我們會討論 Flink 支援的資料型別,如何為資料型別建立型別資訊,以及如何在 Flink 的型別系統無法自動推斷函式的返回型別時提供提示,最後簡單說明一下顯示指定型別資訊的兩個場景。

1. 資料型別

Flink 支援 Java 和 Scala 所有常見的資料型別,也不需要像 Hadoop 一樣去實現一個特定的介面(org.apache.hadoop.io.Writable),能夠自動識別資料型別。使用最多的可以分為如下幾類,如下圖所示:

從圖中可以看到 Flink 型別可以分為基本型別、陣列型別、複合型別、輔助型別以及泛型。

1.1 基本型別

Flink 能夠支援所有 Java 和 Scala 原生基本型別(包裝型別)以及 Void、String、Date、BigDecimal、BigInteger 等型別。例如通過從給定的元素集中建立 DataStream 資料集:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 建立 Integer 型別的資料集
DataStream<Integer> integerElements = env.fromElements(1, 2, 3);
// 建立 String 型別的資料集
DataStream<String> stringElements = env.fromElements("1", "2", "3");

1.2 陣列型別

陣列型別包含兩種型別:

  • 基本型別陣列:基本型別的 Java 陣列,支援 boolean、byte、short、int、long、float 等

  • 物件陣列:Object 型別的 Java 陣列,支援 String 以及其他物件

例如通過從給定的元素集中建立 DataStream 資料集:

int[] a = {1, 2};
int[] b = {3, 4};
DataStream<int[]> arrayElements = env.fromElements(a, b);

1.3 複合資料型別

1.3.1 Java Tuples 型別

Flink 在 Java 介面中定義了元組類(Tuple)供使用者使用。元組是由固定數量的強型別欄位組成的複合資料型別。如下程式碼所示,建立 Tuple 資料型別資料集:

DataStream<Tuple2> tupleElements = env.fromElements(new Tuple2(1, "a"), new Tuple2(2, "b"));

Flink 提供了 Java 元組的高效實現,最多包含 25 個欄位,每個欄位長度都對應一個單獨的實現,即 Tuple0 到 Tuple25。如果欄位數量超過上限,可以通過繼承 Tuple 類的方式進行拓展。

1.3.2 Scala Case Class 與 Tuple 型別

Flink 支援任意的 Scala Case Class 以及 Scala tuples 型別,支援的欄位數量上限為 22,支援通過欄位名稱和位置索引獲取指標,不支援儲存空值。如下程式碼例項所示,定義 WordCount Case Class 資料型別,然後通過 fromElements 方法建立 input 資料集,呼叫 keyBy() 方法對資料集根據 word 欄位重新分割槽。

// 定義WordCount Case Class資料結構
case class WordCount(word: String, count: Int)
// 通過fromElements方法建立資料集
val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
val keyStream1 = input.keyBy("word") // 根據word欄位為分割槽欄位,
val keyStream2 = input.keyBy(0) //也可以通過指定position分割槽

通過使用 Scala Tuple 建立 DataStream 資料集,其他的使用方式和 Case Class 相似。需要注意的是,如果根據名稱獲取欄位,可以使用 Tuple 中的預設欄位名稱:

// 通過 scala Tuple 建立具有兩個元素的資料集
val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),("c", 2))
// 使用預設欄位名稱獲取欄位,其中 _1 表示 tuple 的第一個欄位
tupleStream.keyBy("_1")

1.3.3 ROW 型別

Row 是一種固定長度、可識別空值的複合型別,以確定的欄位順序儲存多個值。每個欄位的型別都可以不一樣並且每個欄位都可以為空。由於無法自動推斷行欄位的型別,因此在生成 Row 時都需要提供型別資訊。如下程式碼所示,建立 Row 資料型別資料集:

DataStream<Row> rowElements = env.fromElements(Row.of(0, "a", 3.14));

1.3.4 POJO 型別

Flink 會分析那些不屬於任何一類的資料型別,嘗試將它們作為 POJO 型別進行處理。如果一個型別滿足如下條件,Flink 就會將它們作為 POJO 資料型別:

  • POJOs 類必須是一個公有類,Public 修飾且獨立定義,不能是內部類;

  • POJOs 類中必須包含一個 Public 修飾的無參構造器;

  • POJOs 類中所有的欄位必須是 Public 或者具有 Public 修飾的 getter 和 setter 方法;

  • POJOs 類中的欄位型別必須是 Flink 支援的。

例如,如下 Java 類就會被 Flink 識別為 POJO:

// (1) 必須是 Public 修飾且必須獨立定義,不能是內部類
public class Person {
// (4) 欄位型別必須是 Flink 支援的
private String name;
private int age;
// (2) 必須包含一個 Public 修飾的無參構造器
public Person() {
}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
// (3) 所有的欄位必須是 Public 或者具有 Public 修飾的 getter 和 setter 方法
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}

定義好 POJOs Class 後,就可以在 Flink 環境中使用了,如下程式碼所示,使用 fromElements 介面構建 Person 類的資料集:

env.fromElements(new Person("Lucy", 18), new Person("Tom", 12))

1.4 輔助型別

在 Flink 中也支援一些比較特殊的資料資料型別,例如 Scala 中的 List、Map、Either、Option、Try 資料型別,以及 Java 中 Either 資料型別,還有 Hadoop 的 Writable 資料型別。如下程式碼所示,建立 List 型別資料集:

DataStream<ArrayList<Integer>> listElements = env.fromElements(
Lists.newArrayList(1, 2), Lists.newArrayList(3, 4)
);

這種資料型別使用場景不是特別廣泛,主要原因是資料中的操作相對不像 POJOs 類那樣方便和透明,使用者無法根據欄位位置或者名稱獲取欄位資訊,同時要藉助 Types Hint 幫助 Flink 推斷資料型別資訊。

1.5 泛型型別

那些無法特別處理的型別會被當做泛型型別處理並交給 Kryo 序列化框架進行序列化。如果可能的話,儘可能的避免使用 Kryo。Kryo 作為一個通用的序列化框架,通常效率不高。

2. TypeInformation

那這麼多的資料型別,在 Flink 內部又是如何表示的呢?在 Flink 中每一個具體的型別都對應了一個具體的 TypeInformation 實現類。例如,BasicTypeInformation 中的 IntegerTypeInformation 對應了 Integer 資料型別。資料型別的描述資訊都是由 TypeInformation 定義,比較常用的 TypeInformation 有 BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo 類等,如下圖所示:

TypeInformation 為系統提供生成序列化器和比較器提供必要的資訊。當應用程式提交執行時,Flink 的型別系統會嘗試為處理的每種資料型別自動推斷 TypeInformation。型別提取器會分析函式的泛型型別以及返回型別,來獲取相應的 TypeInformation 物件。但是,有時型別提取器會失靈,或者你可能想定義自己的型別並告訴 Flink 如何有效地處理它們。在這種情況下,你需要為特定資料型別生成 TypeInformation。

除了對型別地描述之外,TypeInformation 還提供了序列化的支撐。每一個 TypeInformation 都會為對應的具體資料型別提供一個專屬的序列化器。TypeInformation 會提供一個 createSerialize() 方法,通過這個方法就可以得到該型別進行資料序列化操作與反序列化操作的序列化器 TypeSerializer:

public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}

對於大多數資料型別 Flink 可以自動生成對應的序列化器,能非常高效地對資料集進行序列化和反序列化,比如,BasicTypeInfo、WritableTypeIno 等,但針對 GenericTypeInfo 型別,Flink 會使用 Kyro 進行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 型別是複合型別,它們可能巢狀一個或者多個數據型別。在這種情況下,它們的序列化器同樣是複合的。它們會將內嵌型別的序列化委託給對應型別的序列化器。

3. 顯示指定 TypeInformation

大多數情況下,Flink 可以自動推斷型別生成正確的 TypeInformation,並選擇合適的序列化器和比較器。Flink 的型別提取器利用反射分析函式簽名以及子類資訊,生成函式的正確輸出型別。但是有時無法提取必要的資訊,例如定義函式時如果使用到了泛型,JVM 就會出現型別擦除的問題,使得 Flink 並不能很容易地獲取到資料集中的資料型別資訊。這時候可能會丟擲如下類似的異常:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ReturnsExample.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1236)
at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
...

此外,在某些情況下,Flink 選擇的 TypeInformation 可能無法生成最有效的序列化器和反序列化器。因此,你可能需要為你使用的資料型別顯式地提供 TypeInformation。我們首先看一下如何建立 TypeInformation,然後再看一下如何為函式指定 TypeInformation。

3.1 建立 TypeInformation

3.1.1 of 方法

對於非泛型的型別,可以使用 TypeInformation 的 of(Class typeClass) 函式直接傳入 Class 就可以建立 TypeInformation:

// 示例1 非泛型型別 直接傳入 Class 物件
DataStream<WordCount> result1 = env.fromElements("a b a")
.flatMap((String value, Collector<WordCount> out) -> {
for(String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
})
.returns(TypeInformation.of(WordCount.class));
result1.print("R1");

上述方法僅適用於非泛型型別。如果是泛型型別,可以藉助 TypeHint 為泛型型別建立 TypeInformation:

// 示例2 泛型型別 需要藉助 TypeHint
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
result2.print("R2");

完整示例

3.1.2 TypeHint

對於泛型型別,上面是通過 TypeInformation.of + TypeHint 來建立 TypeInformation,也可以單獨使用 TypeHint 來建立 TypeInformation:

DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo());
result2.print("R2");

TypeHint 的原理是在內部建立匿名子類,捕獲泛型資訊並會將其保留到執行時。執行時 TypeExtractor 可以獲取儲存的實際型別。

完整示例

3.1.3 預定義的快捷方式

例如 BasicTypeInfo 類定義了一系列常用型別的快捷方式,對於 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本型別的型別宣告,可以直接使用:

當然,如果覺得 BasicTypeInfo 還是太長,Flink 還提供了完全等價的 Types 類(org.apache.flink.api.common.typeinfo.Types):

Types 為常見資料型別提供 TypeInformation,使用起來非常方便,如下示例:

// 示例1 Types.TUPLE
DataStream<Tuple2<String, Integer>> result1 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
result1.print("R1");

// 示例2 Types.POJO
DataStream<WordCount> result2 = env.fromElements("a b a")
.flatMap((String value, Collector<WordCount> out) -> {
for(String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
})
.returns(Types.POJO(WordCount.class));
result2.print("R2");

完整示例

3.2 顯示提供型別資訊

當 Flink 無法自動推斷函式的生成型別是什麼的時候,就需要我們顯示提供型別資訊提示。從上面示例中我們知道可以通過 returns 顯示提供型別資訊,除此之外還可以實現 ResultTypeQueryable 介面顯示提供。

3.2.1 returns

第一種方法是使用 returns 為運算元新增返回型別的型別資訊提示。對於非泛型型別,可以直接傳入 Class 即可;對於泛型型別需要藉助 TypeHint 提供型別資訊提示,如下所示:

// 示例1 非泛型型別 直接傳入 Class
DataStream<WordCount> result1 = env.fromElements("a b a")
.flatMap((String value, Collector<WordCount> out) -> {
for(String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
})
.returns(WordCount.class);
result1.print("R1");

// 示例2 泛型型別 優先推薦藉助 TypeHint
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(new TypeHint<Tuple2<String, Integer>>() {});
result2.print("R2");

// 示例3 TypeInformation.of + TypeHint
DataStream<Tuple2<String, Integer>> result3 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
result3.print("R3");

// 示例4 Types 快捷方式
DataStream<Tuple2<String, Integer>> result4 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
result4.print("R4");

完整示例

3.2.2 ResultTypeQueryable

第二種方法是通過實現 ResultTypeQueryable 介面來擴充套件函式以顯式提供返回型別的 TypeInformation。如下示例是一個顯式提供返回型別的 MapFunction:

public static class ResultTypeMapFunction implements MapFunction<String, Stu>, ResultTypeQueryable {
@Override
public Stu map(String value) throws Exception {
String[] params = value.split(",");
String name = params[0];
int age = Integer.parseInt(params[1]);
return new Stu(name, age);
}

@Override
public TypeInformation getProducedType() {
return Types.POJO(Stu.class);
}
}

4. 使用場景

4.1 Table 轉 DataStream

Table 轉 DataStream 的時候,Table 並清楚 DataStream 的資料結構,因此需要給當前轉換出來的 DataStream 顯性的指定資料型別:

// 轉化為 Pojo 型別
DataStream<WordCount> stream1 = tEnv.toAppendStream(table, Types.POJO(WordCount.class));

// 轉換為 Row 型別
DataStream<Row> stream2 = tEnv.toAppendStream(table, Types.ROW(Types.STRING, Types.LONG));

4.2 Lambda 表示式與泛型

由於 Java 泛型會出現型別擦除問題,因此 Flink 通過 Java 反射機制儘可能重構型別資訊,例如使用函式簽名以及子類的資訊等。對於函式的返回型別取決於輸入型別的情況時,會包含一些簡單的型別推斷。但如果無法重構所有的泛型型別資訊時,需要藉助於型別提示來告訴系統函式中傳入的引數型別資訊和輸出引數資訊。如下所示使用 returns 語句指定生成的型別:

env.fromElements(1, 2, 3)
.map(i -> Tuple2.of(i, i*i))
// 如果不指定 returns 返回的 TypeInformation 會丟擲異常
.returns(Types.TUPLE(Types.INT, Types.INT))
.print();

參考: