前言
這一講將介紹一下序列化機制和過程函數(processfunction)。html
序列化機制
使用 Flink 編寫處理邏輯時,新手老是容易被林林總總的概念所混淆:java
爲何 Flink 有那麼多的類型聲明方式? BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何區別? TypeInfoFactory 又是什麼? TypeInformation.of 和 TypeHint 是如何使用的呢?
接下來本文將逐步解密 Flink 的類型和序列化機制(TypeInformation)。git
Flink 的類型系統源碼位於 org.apache.flink.api.common.typeinfo 包,讓咱們對上圖TypeInformation深刻追蹤,看一下類的繼承關係圖:github
能夠看到,上面兩個圖片是一一對應的,TypeInformation 類是描述一切類型的公共基類,它和它的全部子類必須可序列化(Serializable),由於類型信息將會伴隨 Flink 的做業提交,被傳遞給每一個執行節點。apache
因爲 Flink 本身管理內存,採用了一種很是緊湊的存儲格式(見官方博文),於是類型信息在整個數據處理流程中屬於相當重要的元數據。api
Flink 內部實現了名爲 TypeExtractror 的類,能夠利用方法簽名、子類信息等蛛絲馬跡,自動提取和恢復類型信息(固然也能夠顯式聲明,即本文所介紹的內容)。函數
然而因爲 Java 的類型擦除,自動提取並非老是有效。於是一些狀況下(例如經過 URLClassLoader 動態加載的類),仍需手動處理;例以下圖中對 DataSet 變換時,使用 .returns() 方法聲明返回類型。spa
這裏須要說明一下,returns() 接受三種類型的參數:字符串描述的類名(例如 "String")、TypeHint(接下來會講到,用於泛型類型參數)、Java 原生 Class(例如 String.class) 等;不過字符串形式的用法即將廢棄,若是確實有必要,請使用 Class.forName() 等方法來解決。code
經過 TypeInformation.of() 方法,能夠簡單地建立類型信息對象。orm
1. 對於非泛型的類,直接傳入 Class 對象便可
2.對於泛型類,須要藉助 TypeHint 來保存泛型類型信息
3. 預約義的快捷方式 例如 BasicTypeInfo,這個類定義了一系列經常使用類型的快捷方式,對於 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本類型的類型聲明,能夠直接使用。
4. 自定義 TypeInfo 和 TypeInfoFactory
經過自定義 TypeInfo 爲任意類提供 Flink 原生內存管理(而非 Kryo),可令存儲更緊湊,運行時也更高效。
開發者在自定義類上使用 @TypeInfo 註解,隨後建立相應的 TypeInfoFactory 並覆蓋 createTypeInfo 方法。
注意須要繼承 TypeInformation 類,爲每一個字段定義類型,並覆蓋元數據方法,例如是不是基本類型(isBasicType)、是不是 Tuple(isTupleType)、元數(對於一維的 Row 類型,等於字段的個數)等等,從而爲 TypeExtractor 提供決策依據。
更多示例,請參考 Flink 源碼的 org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
Kryo 序列化 待研究中...