歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~html
使用 Flink 編寫處理邏輯時,新手老是容易被林林總總的概念所混淆:git
爲何 Flink 有那麼多的類型聲明方式?github
BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何區別?apache
TypeInfoFactory 又是什麼?api
TypeInformation.of 和 TypeHint 是如何使用的呢?機器學習
接下來本文將逐步解密 Flink 的類型和序列化機制。函數
圖 1:Flink 類型分類性能
Flink 的類型系統源碼位於 org.apache.flink.api.common.typeinfo 包,讓咱們對圖 1 深刻追蹤,看一下類的繼承關係圖:學習
圖 2:TypeInformation 類繼承關係圖
能夠看到,圖 1 和 圖 2 是一一對應的,TypeInformation 類是描述一切類型的公共基類,它和它的全部子類必須可序列化(Serializable),由於類型信息將會伴隨 Flink 的做業提交,被傳遞給每一個執行節點。
因爲 Flink 本身管理內存,採用了一種很是緊湊的存儲格式(見官方博文),於是類型信息在整個數據處理流程中屬於相當重要的元數據。
Flink 內部實現了名爲 TypeExtractror 的類,能夠利用方法簽名、子類信息等蛛絲馬跡,自動提取和恢復類型信息(固然也能夠顯式聲明,即本文所介紹的內容)。
然而因爲 Java 的類型擦除,自動提取並非老是有效。於是一些狀況下(例如經過 URLClassLoader 動態加載的類),仍需手動處理;例以下圖中對 DataSet 變換時,使用 .returns() 方法聲明返回類型。
這裏須要說明一下,returns() 接受三種類型的參數:字符串描述的類名(例如 "String")、TypeHint(接下來會講到,用於泛型類型參數)、Java 原生 Class(例如 String.class) 等;不過字符串形式的用法即將廢棄,若是確實有必要,請使用 Class.forName() 等方法來解決。
圖 3:使用 .returns 方法聲明返回類型
下面是 ExecutionEnvironment 類的 registerType 方法,它能夠向 Flink 註冊子類信息(Flink 認識父類,但不必定認識子類的一些獨特特性,於是須要註冊),下面是 Flink-ML 機器學習庫代碼的例子:
圖 4:Flink-ML 註冊子類類型信息
從下圖能夠看到,若是經過 TypeExtractor.createTypeInfo(type) 方法獲取到的類型信息屬於 PojoTypeInfo 及其子類,那麼將其註冊到一塊兒;不然統一交給 Kryo 去處理,Flink 並不過問(這種狀況下性能會變差)。
圖 5:Flink 容許註冊自定義類型
經過 TypeInformation.of() 方法,能夠簡單地建立類型信息對象。
1. 對於非泛型的類,直接傳入 Class 對象便可
圖 6:class 對象做爲參數
2. 對於泛型類,須要藉助 TypeHint 來保存泛型類型信息
TypeHint 的原理是建立匿名子類,運行時 TypeExtractor 能夠經過 getGenericSuperclass(). getActualTypeArguments() 方法獲取保存的實際類型。
圖 7:TypeHint 做爲參數,保存泛型信息
3. 預約義的快捷方式
例如 BasicTypeInfo,這個類定義了一系列經常使用類型的快捷方式,對於 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本類型的類型聲明,能夠直接使用。
圖 8:BasicTypeInfo 快捷方式
例以下面是對 Row 類型各字段的類型聲明,使用方法很是簡明,再也不須要 new XxxTypeInfo<>(不少不少參數)
圖 9:使用 BasicTypeInfo 快捷方式來聲明一行(Row)每一個字段的類型信息
固然,若是以爲 BasicTypeInfo 仍是太長,Flink 還提供了徹底等價的 Types 類(org.apache.flink.api.common.typeinfo.Types):
圖 10:Types 類
特別須要注意的是,flink-table 模塊也有一個 Types 類(org.apache.flink.table.api.Types),用於 table 模塊內部的類型定義信息,用法稍有不一樣。使用 IDE 的自動 import 時必定要當心:
圖 11:flink-table 模塊的 Types 類
4. 自定義 TypeInfo 和 TypeInfoFactory
經過自定義 TypeInfo 爲任意類提供 Flink 原生內存管理(而非 Kryo),可令存儲更緊湊,運行時也更高效。
開發者在自定義類上使用 @TypeInfo 註解,隨後建立相應的 TypeInfoFactory 並覆蓋 createTypeInfo 方法。
注意須要繼承 TypeInformation 類,爲每一個字段定義類型,並覆蓋元數據方法,例如是不是基本類型(isBasicType)、是不是 Tuple(isTupleType)、元數(對於一維的 Row 類型,等於字段的個數)等等,從而爲 TypeExtractor 提供決策依據。
圖 12:爲自定義類提供類型支持(圖片未展現所有字段)
更多示例,請參考 Flink 源碼的 org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
Flink 自帶了不少 TypeSerializer 子類,大多數狀況下各類自定義類型都是經常使用類型的排列組合,於是能夠直接複用:
圖 13:Flink 自帶的 TypeSerializer 子類概覽
若是不能知足,那麼能夠繼承 TypeSerializer 及其子類以實現本身的序列化器。
對於 Flink 沒法序列化的類型(例如用戶自定義類型,沒有 registerType,也沒有自定義 TypeInfo 和 TypeInfoFactory),默認會交給 Kryo 處理。
若是 Kryo 仍然沒法處理(例如 Guava、Thrift、Protobuf 等第三方庫的一些類),有如下兩種解決方案:
\1. 能夠強制使用 Avro 來替代 Kryo:
env.getConfig().enableForceAvro(); // env 表明 ExecutionEnvironment 對象, 下同
\2. 爲 Kryo 增長自定義的 Serializer 以加強 Kryo 的功能:
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass
圖 14:爲 Kryo 增長自定義的 Serializer
以及
env.getConfig().registerTypeWithKryoSerializer(Class<?> type, T serializer)
圖 15:爲 Kryo 增長自定義的 Serializer
若是但願徹底禁用 Kryo(100% 使用 Flink 的序列化機制),則可使用如下設置,但注意一切沒法處理的類都將致使異常:
env.getConfig().disableGenericTypes();
金無足赤,人無完人。Flink 內置的類型系統雖然強大而靈活,但仍然有一些須要注意的點:
1. Lambda 函數的類型提取
因爲 Flink 類型提取依賴於繼承等機制,而 lambda 函數比較特殊,它是匿名的,也沒有與之相關的類,因此其類型信息較難獲取。
Eclipse 的 JDT 編譯器會把 lambda 函數的泛型簽名等信息寫入編譯後的字節碼中,而對於 javac 等常見的其餘編譯器,則不會這樣作,於是 Flink 就沒法獲取具體類型信息了。
2. Kryo 的 JavaSerializer 在 Flink 下存在 Bug
推薦使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 而非 com.esotericsoftware.kryo.serializers.JavaSerializer 以防止與 Flink 不兼容。
圖 16:類型信息到內存塊
下面以 StringSerializer 爲例,來看下 Flink 是如何緊湊管理內存的:
圖 17:StringSerializer 類的 serialize() 方法
下面是具體的序列化過程:
圖 18:String 對象的序列化過程
能夠看到,Flink 對於內存管理是很是細緻的,井井有條,代碼也容易理解。
問答
如何使用Flink Quickstart在Eclipse IDE中缺乏依賴關係?
相關閱讀
Storm做業轉化爲Flink做業流程分析
Apache Calcite 功能簡析及在 Flink 的應用
【每日課程推薦】機器學習實戰!快速入門在線廣告業務及CTR相應知識
此文已由做者受權騰訊雲+社區發佈,更多原文請點擊
搜索關注公衆號「雲加社區」,第一時間獲取技術乾貨,關注後回覆1024 送你一份技術課程大禮包!
海量技術實踐經驗,盡在雲加社區!