Apache Flink 進階(五):數據類型和序列化


做者:馬慶祥 整理:毛鶴
本文根據 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、360 數據開發高級工程師馬慶祥老師分享。文章主要從如何爲Flink量身定製的序列化框架、Flink序列化的最佳實踐、Flink通訊層的序列化以及問答環節四部分分享。

爲 Flink 量身定製的序列化框架

爲何要爲 Flink 量身定製序列化框架?

你們都知道如今大數據生態很是火,大多數技術組件都是運行在 JVM 上的,Flink 也是運行在 JVM 上,基於 JVM 的數據分析引擎都須要將大量的數據存儲在內存中,這就不得不面臨 JVM 的一些問題,好比 Java 對象存儲密度較低等。針對這些問題,最經常使用的方法就是實現一個顯式的內存管理,也就是說用自定義的內存池來進行內存的分配回收,接着將序列化後的對象存儲到內存塊中。
如今 Java 生態圈中已經有許多序列化框架,好比說 Java serialization, Kryo, Apache Avro 等等。可是 Flink 依然是選擇了本身定製的序列化框架,那麼到底有什麼意義呢?若 Flink 選擇本身定製的序列化框架,對類型信息瞭解越多,能夠在早期完成類型檢查,更好的選取序列化方式,進行數據佈局,節省數據的存儲空間,直接操做二進制數據。

Flink 的數據類型



Flink 在其內部構建了一套本身的類型系統,Flink 現階段支持的類型分類如圖所示,從圖中能夠看到 Flink 類型能夠分爲基礎類型(Basic)、數組(Arrays)、複合類型(Composite)、輔助類型(Auxiliary)、泛型和其它類型(Generic)。Flink 支持任意的 Java 或是 Scala 類型。不須要像 Hadoop 同樣去實現一個特定的接口(org.apache.hadoop.io.Writable),Flink 可以自動識別數據類型。


那這麼多的數據類型,在 Flink 內部又是如何表示的呢?圖示中的 Person 類,複合類型的一個 Pojo 在 Flink 中是用 PojoTypeInfo 來表示,它繼承至 TypeInformation,也即在 Flink 中用 TypeInformation 做爲類型描述符來表示每一種要表示的數據類型。

TypeInformation



TypeInformation 的思惟導圖如圖所示,從圖中能夠看出,在 Flink 中每個具體的類型都對應了一個具體的 TypeInformation 實現類,例如 BasicTypeInformation 中的 IntegerTypeInformation 和 FractionalTypeInformation 都具體的對應了一個 TypeInformation。而後還有 BasicArrayTypeInformation、CompositeType 以及一些其它類型,也都具體對應了一個 TypeInformation。
TypeInformation 是 Flink 類型系統的核心類。對於用戶自定義的 Function 來講,Flink 須要一個類型信息來做爲該函數的輸入輸出類型,即 TypeInfomation。該類型信息類做爲一個工具來生成對應類型的序列化器 TypeSerializer,並用於執行語義檢查,好比當一些字段在做爲 joing 或 grouping 的鍵時,檢查這些字段是否在該類型中存在。
如何使用 TypeInformation?下面的實踐中會爲你們介紹。

Flink 的序列化過程



在 Flink 序列化過程當中,進行序列化操做必需要有序列化器,那麼序列化器從何而來?每個具體的數據類型都對應一個 TypeInformation 的具體實現,每個 TypeInformation 都會爲對應的具體數據類型提供一個專屬的序列化器。經過 Flink 的序列化過程圖能夠看到 TypeInformation 會提供一個 createSerialize() 方法,經過這個方法就能夠獲得該類型進行數據序列化操做與反序化操做的對象 TypeSerializer。
對於大多數數據類型 Flink 能夠自動生成對應的序列化器,能很是高效地對數據集進行序列化和反序列化,好比,BasicTypeInfo、WritableTypeIno 等,但針對 GenericTypeInfo 類型,Flink 會使用 Kyro 進行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 類型是複合類型,它們可能嵌套一個或者多個數據類型。在這種狀況下,它們的序列化器一樣是複合的。它們會將內嵌類型的序列化委託給對應類型的序列化器。
簡單的介紹下 Pojo 的類型規則,即在知足一些條件的狀況下,纔會選用 Pojo 的序列化進行相應的序列化與反序列化的一個操做。即類必須是 Public 的,且類有一個 public 的無參數構造函數,該類(以及全部超類)中的全部非靜態 no-static、非瞬態 no-transient 字段都是 public 的(和非最終的 final)或者具備公共 getter 和 setter 方法,該方法遵循 getter 和 setter 的 Java bean 命名約定。當用戶定義的數據類型沒法識別爲 POJO 類型時,必須將其做爲 GenericType 處理並使用 Kryo 進行序列化。
Flink 自帶了不少 TypeSerializer 子類,大多數狀況下各類自定義類型都是經常使用類型的排列組合,於是能夠直接複用,若是內建的數據類型和序列化方式不能知足你的需求,Flink 的類型信息系統也支持用戶拓展。若用戶有一些特殊的需求,只須要實現 TypeInformation、TypeSerializer 和 TypeComparator 便可定製本身類型的序列化和比較大小方式,來提高數據類型在序列化和比較時的性能。


序列化就是將數據結構或者對象轉換成一個二進制串的過程,在 Java 裏面能夠簡單地理解成一個 byte 數組。而反序列化偏偏相反,就是將序列化過程當中所生成的二進制串轉換成數據結構或者對象的過程。下面就之內嵌型的 Tuple 3 這個對象爲例,簡述一下它的序列化過程。Tuple 3 包含三個層面,一是 int 類型,一是 double 類型,還有一個是 Person。Person 包含兩個字段,一是 int 型的 ID,另外一個是 String 類型的 name,它在序列化操做時,會委託相應具體序列化的序列化器進行相應的序列化操做。從圖中能夠看到 Tuple 3 會把 int 類型經過 IntSerializer 進行序列化操做,此時 int 只須要佔用四個字節就能夠了。根據 int 佔用四個字節,這個可以體現出 Flink 可序列化過程當中的一個優點,即在知道數據類型的前提下,能夠更好的進行相應的序列化與反序列化操做。相反,若是採用 Java 的序列化,雖然可以存儲更多的屬性信息,但一次佔據的存儲空間會受到必定的損耗。
Person 類會被當成一個 Pojo 對象來進行處理,PojoSerializer 序列化器會把一些屬性信息使用一個字節存儲起來。一樣,其字段則採起相對應的序列化器進行相應序列化,在序列化完的結果中,能夠看到全部的數據都是由 MemorySegment 去支持。MemorySegment 具備什麼做用呢?
MemorySegment 在 Flink 中會將對象序列化到預分配的內存塊上,它表明 1 個固定長度的內存,默認大小爲 32 kb。MemorySegment 表明 Flink 中的一個最小的內存分配單元,至關因而 Java 的一個 byte 數組。 每條記錄都會以序列化的形式存儲在一個或多個 MemorySegment 中。

Flink 序列化的最佳實踐

最多見的場景

Flink 常見的應用場景有四種,即註冊子類型、註冊自定義序列化器、添加類型提示、手動建立 TypeInformation,具體介紹以下:
  • 註冊子類型:若是函數簽名只描述了超類型,可是它們實際上在執行期間使用了超類型的子類型,那麼讓 Flink 瞭解這些子類型會大大提升性能。能夠在 StreamExecutionEnvironment 或 ExecutionEnvironment 中調用 .registertype (clazz) 註冊子類型信息。
  • 註冊自定義序列化:對於不適用於本身的序列化框架的數據類型,Flink 會使用 Kryo 來進行序列化,並非全部的類型都與 Kryo 無縫鏈接,具體註冊方法在下文介紹。
  • 添加類型提示:有時,當 Flink 用盡各類手段都沒法推測出泛型信息時,用戶須要傳入一個類型提示 TypeHint,這個一般只在 Java API 中須要。
  • 手動建立一個 TypeInformation:在某些 API 調用中,這多是必需的,由於 Java 的泛型類型擦除致使 Flink 沒法推斷數據類型。
其實在大多數狀況下,用戶沒必要擔憂序列化框架和註冊類型,由於 Flink 已經提供了大量的序列化操做,不須要去定義本身的一些序列化器,可是在一些特殊場景下,須要去作一些相應的處理。

實踐–類型聲明

類型聲明去建立一個類型信息的對象是經過哪一種方式?一般是用 TypeInformation.of() 方法來建立一個類型信息的對象,具體說明以下:
  • 對於非泛型類,直接傳入 class 對象便可。 PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
  • 對於泛型類,須要經過 TypeHint 來保存泛型類型信息。 final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
  • 預約義常量。
如 BasicTypeInfo,這個類定義了一系列經常使用類型的快捷方式,對於 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本類型的類型聲明,能夠直接使用。並且 Flink 還提供了徹底等價的 Types 類(org.apache.flink.api.common.typeinfo.Types)。特別須要注意的是,flink-table 模塊也有一個 Types 類(org.apache.flink.table.api.Types),用於 table 模塊內部的類型定義信息,用法稍有不一樣。使用 IDE 的自動 import 時必定要當心。
  • 自定義 TypeInfo 和 TypeInfoFactory。


經過自定義 TypeInfo 爲任意類提供 Flink 原生內存管理(而非 Kryo),可令存儲更緊湊,運行時也更高效。須要注意在自定義類上使用 @TypeInfo 註解,隨後建立相應的 TypeInfoFactory 並覆蓋 createTypeInfo() 方法。

實踐–註冊子類型

Flink 認識父類,但不必定認識子類的一些獨特特性,所以須要單獨註冊子類型。
StreamExecutionEnvironment 和 ExecutionEnvironment 提供 registerType() 方法用來向 Flink 註冊子類信息。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Env. registerType(typeClass);複製代碼


在 registerType() 方法內部,會使用 TypeExtractor 來提取類型信息,如上圖所示,獲取到的類型信息屬於 PojoTypeInfo 及其子類,那麼須要將其註冊到一塊兒,不然統一交給 Kryo 去處理,Flink 並不過問(這種狀況下性能會變差)。

實踐–Kryo 序列化

對於 Flink 沒法序列化的類型(例如用戶自定義類型,沒有 registerType,也沒有自定義 TypeInfo 和 TypeInfoFactory),默認會交給 Kryo 處理,若是 Kryo 仍然沒法處理(例如 Guava、Thrift、Protobuf 等第三方庫的一些類),有兩種解決方案:
  • 強制使用 Avro 來代替 Kryo。 env.getConfig().enableForceAvro();
  • 爲 Kryo 增長自定義的 Serializer 以加強 Kryo 的功能。 env.getConfig().addDefaultKryoSerializer(clazz, serializer);
注:若是但願徹底禁用 Kryo(100% 使用 Flink 的序列化機制),能夠經過 Kryo-env.getConfig().disableGenericTypes() 的方式完成,但注意一切沒法處理的類都將致使異常,這種對於調試很是有效。

Flink 通訊層的序列化

Flink 的 Task 之間若是須要跨網絡傳輸數據記錄, 那麼就須要將數據序列化以後寫入 NetworkBufferPool,而後下層的 Task 讀出以後再進行反序列化操做,最後進行邏輯處理。
爲了使得記錄以及事件可以被寫入 Buffer,隨後在消費時再從 Buffer 中讀出,Flink 提供了數據記錄序列化器(RecordSerializer)與反序列化器(RecordDeserializer)以及事件序列化器(EventSerializer)。
Function 發送的數據被封裝成 SerializationDelegate,它將任意元素公開爲 IOReadableWritable 以進行序列化,經過 setInstance() 來傳入要序列化的數據。
在 Flink 通訊層的序列化中,有幾個問題值得關注,具體以下:
  • 什麼時候肯定 Function 的輸入輸出類型?


在構建 StreamTransformation 的時候經過 TypeExtractor 工具肯定 Function 的輸入輸出類型。TypeExtractor 類能夠根據方法簽名、子類信息等蛛絲馬跡自動提取或恢復類型信息。
  • 什麼時候肯定 Function 的序列化/反序列化器?
構造 StreamGraph 時,經過 TypeInfomation 的 createSerializer() 方法獲取對應類型的序列化器 TypeSerializer,並在 addOperator() 的過程當中執行 setSerializers() 操做,設置 StreamConfig 的 TYPE_SERIALIZER_IN_1 、 TYPE_SERIALIZER_IN_二、 TYPE_SERIALIZER_OUT_1 屬性。
  • 什麼時候進行真正的序列化/反序列化操做?這個過程與 TypeSerializer 又是怎麼聯繫在一塊兒的呢?


你們都應該清楚 Tsk 和 StreamTask 兩個概念,Task 是直接受 TaskManager 管理和調度的,而 Task 又會調用 StreamTask,而 StreamTask 中真正封裝了算子的處理邏輯。在 run() 方法中,首先將反序列化後的數據封裝成 StreamRecord 交給算子處理;而後將處理結果經過 Collector 發動給下游(在構建 Collector 時已經肯定了 SerializtionDelegate),並經過 RecordWriter 寫入器將序列化後的結果寫入 DataOutput;最後序列化的操做交給 SerializerDelegate 處理,實際仍是經過 TypeSerializer 的 serialize() 方法完成。

本文爲雲棲社區原創內容,未經容許不得轉載。
相關文章
相關標籤/搜索