Flink 類型和序列化機制簡介

使用 Flink 編寫處理邏輯時,新手老是容易被林林總總的概念所混淆:html

爲何 Flink 有那麼多的類型聲明方式?java

BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何區別?git

TypeInfoFactory 又是什麼?github

TypeInformation.of 和 TypeHint 是如何使用的呢?apache

接下來本文將逐步解密 Flink 的類型和序列化機制。api

Flink 的類型分類

圖 1:Flink 類型分類機器學習

Flink 的類型系統源碼位於 org.apache.flink.api.common.typeinfo 包,讓咱們對圖 1 深刻追蹤,看一下類的繼承關係圖:函數

圖 2:TypeInformation 類繼承關係圖性能

能夠看到,圖 1 和 圖 2 是一一對應的,TypeInformation 類是描述一切類型的公共基類,它和它的全部子類必須可序列化(Serializable),由於類型信息將會伴隨 Flink 的做業提交,被傳遞給每一個執行節點。學習

因爲 Flink 本身管理內存,採用了一種很是緊湊的存儲格式(見官方博文),於是類型信息在整個數據處理流程中屬於相當重要的元數據。

TypeExtractror 類型提取

Flink 內部實現了名爲 TypeExtractror 的類,能夠利用方法簽名、子類信息等蛛絲馬跡,自動提取和恢復類型信息(固然也能夠顯式聲明,即本文所介紹的內容)。

然而因爲 Java 的類型擦除,自動提取並非老是有效。於是一些狀況下(例如經過 URLClassLoader 動態加載的類),仍需手動處理;例以下圖中對 DataSet 變換時,使用 .returns() 方法聲明返回類型。

這裏須要說明一下,returns() 接受三種類型的參數:字符串描述的類名(例如 "String")、TypeHint(接下來會講到,用於泛型類型參數)、Java 原生 Class(例如 String.class) 等;不過字符串形式的用法即將廢棄,若是確實有必要,請使用 Class.forName() 等方法來解決。

圖 3:使用 .returns 方法聲明返回類型

下面是 ExecutionEnvironment 類的 registerType 方法,它能夠向 Flink 註冊子類信息(Flink 認識父類,但不必定認識子類的一些獨特特性,於是須要註冊),下面是 Flink-ML 機器學習庫代碼的例子:

圖 4:Flink-ML 註冊子類類型信息

從下圖能夠看到,若是經過 TypeExtractor.createTypeInfo(type) 方法獲取到的類型信息屬於 PojoTypeInfo 及其子類,那麼將其註冊到一塊兒;不然統一交給 Kryo 去處理,Flink 並不過問(這種狀況下性能會變差)。

圖 5:Flink 容許註冊自定義類型

聲明類型信息的常見手段

經過 TypeInformation.of() 方法,能夠簡單地建立類型信息對象。

1. 對於非泛型的類,直接傳入 Class 對象便可

圖 6:class 對象做爲參數

2. 對於泛型類,須要藉助 TypeHint 來保存泛型類型信息

TypeHint 的原理是建立匿名子類,運行時 TypeExtractor 能夠經過 getGenericSuperclass(). getActualTypeArguments() 方法獲取保存的實際類型。

圖 7:TypeHint 做爲參數,保存泛型信息

3. 預約義的快捷方式

例如 BasicTypeInfo,這個類定義了一系列經常使用類型的快捷方式,對於 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本類型的類型聲明,能夠直接使用。

圖 8:BasicTypeInfo 快捷方式

例以下面是對 Row 類型各字段的類型聲明,使用方法很是簡明,再也不須要 new XxxTypeInfo<>(不少不少參數)

圖 9:使用 BasicTypeInfo 快捷方式來聲明一行(Row)每一個字段的類型信息

固然,若是以爲 BasicTypeInfo 仍是太長,Flink 還提供了徹底等價的 Types 類(org.apache.flink.api.common.typeinfo.Types):

圖 10:Types 類

特別須要注意的是,flink-table 模塊也有一個 Types 類(org.apache.flink.table.api.Types),用於 table 模塊內部的類型定義信息,用法稍有不一樣。使用 IDE 的自動 import 時必定要當心:

圖 11:flink-table 模塊的 Types 類

4. 自定義 TypeInfo 和 TypeInfoFactory

經過自定義 TypeInfo 爲任意類提供 Flink 原生內存管理(而非 Kryo),可令存儲更緊湊,運行時也更高效。

開發者在自定義類上使用 @TypeInfo 註解,隨後建立相應的 TypeInfoFactory 並覆蓋 createTypeInfo 方法。

注意須要繼承 TypeInformation 類,爲每一個字段定義類型,並覆蓋元數據方法,例如是不是基本類型(isBasicType)、是不是 Tuple(isTupleType)、元數(對於一維的 Row 類型,等於字段的個數)等等,從而爲 TypeExtractor 提供決策依據。

圖 12:爲自定義類提供類型支持(圖片未展現所有字段)

更多示例,請參考 Flink 源碼的 org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java

TypeSerializer

Flink 自帶了不少 TypeSerializer 子類,大多數狀況下各類自定義類型都是經常使用類型的排列組合,於是能夠直接複用:

圖 13:Flink 自帶的 TypeSerializer 子類概覽

若是不能知足,那麼能夠繼承 TypeSerializer 及其子類以實現本身的序列化器。

Kryo 序列化

對於 Flink 沒法序列化的類型(例如用戶自定義類型,沒有 registerType,也沒有自定義 TypeInfo 和 TypeInfoFactory),默認會交給 Kryo 處理。

若是 Kryo 仍然沒法處理(例如 Guava、Thrift、Protobuf 等第三方庫的一些類),有如下兩種解決方案:

1. 能夠強制使用 Avro 來替代 Kryo:

env.getConfig().enableForceAvro();   // env 表明 ExecutionEnvironment 對象, 下同

2. 爲 Kryo 增長自定義的 Serializer 以加強 Kryo 的功能:

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass

圖 14:爲 Kryo 增長自定義的 Serializer

以及

env.getConfig().registerTypeWithKryoSerializer(Class<?> type, T serializer)

圖 15:爲 Kryo 增長自定義的 Serializer

若是但願徹底禁用 Kryo(100% 使用 Flink 的序列化機制),則可使用如下設置,但注意一切沒法處理的類都將致使異常:

env.getConfig().disableGenericTypes();

類型機制的陷阱與缺陷

金無足赤,人無完人。Flink 內置的類型系統雖然強大而靈活,但仍然有一些須要注意的點:

1. Lambda 函數的類型提取

因爲 Flink 類型提取依賴於繼承等機制,而 lambda 函數比較特殊,它是匿名的,也沒有與之相關的類,因此其類型信息較難獲取。

Eclipse 的 JDT 編譯器會把 lambda 函數的泛型簽名等信息寫入編譯後的字節碼中,而對於 javac 等常見的其餘編譯器,則不會這樣作,於是 Flink 就沒法獲取具體類型信息了。

2. Kryo 的 JavaSerializer 在 Flink 下存在 Bug

推薦使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 而非 com.esotericsoftware.kryo.serializers.JavaSerializer 以防止與 Flink 不兼容。

類型機制與內存管理

圖 16:類型信息到內存塊

下面以 StringSerializer 爲例,來看下 Flink 是如何緊湊管理內存的:

圖 17:StringSerializer 類的 serialize() 方法

下面是具體的序列化過程:

圖 18:String 對象的序列化過程

能夠看到,Flink 對於內存管理是很是細緻的,井井有條,代碼也容易理解。

參考閱讀

Data Types & Serialization

Flink 原理與實現:內存管理

Flink 的數據類型和序列化

相關文章
相關標籤/搜索