本文轉載自:https://ververica.cn/develope...
做者:伍翀(雲邪)html
現在,大數據領域的開源框架(Hadoop,Spark,Storm)都使用的 JVM,固然也包括 Flink。基於 JVM 的數據分析引擎都須要面對將大量數據存到內存中,這就不得不面對 JVM 存在的幾個問題:java
因此目前,愈來愈多的大數據項目開始本身管理 JVM 內存了,像 Spark、Flink、HBase,爲的就是得到像 C 同樣的性能以及避免 OOM 的發生。本文將會討論 Flink 是如何解決上面的問題的,主要內容包括內存管理、定製的序列化工具、緩存友好的數據結構和算法、堆外內存、JIT 編譯優化等。算法
Flink 並非將大量對象存在堆上,而是將對象都序列化到一個預分配的內存塊上,這個內存塊叫作 **MemorySegment**
,它表明了一段固定長度的內存(默認大小爲 32KB),也是 Flink 中最小的內存分配單元,而且提供了很是高效的讀寫方法。你能夠把 MemorySegment 想象成是爲 Flink 定製的 **java.nio.ByteBuffer**
。它的底層能夠是一個普通的 Java 字節數組(**byte[]**
),也能夠是一個申請在堆外的 **ByteBuffer**
。每條記錄都會以序列化的形式存儲在一個或多個**MemorySegment**
中。apache
Flink 中的 Worker 名叫 TaskManager,是用來運行用戶代碼的 JVM 進程。TaskManager 的堆內存主要被分紅了三個部分:數組
**taskmanager.network.numberOfBuffers**
來配置。(閱讀這篇文章瞭解更多Network Buffer的管理)MemoryManager
管理的,由衆多MemorySegment
組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個內存池申請 MemorySegment,將序列化後的數據存於其中,使用完後釋放回內存池。默認狀況下,池子佔了堆內存的 70% 的大小。注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,該池子不會預分配內存,也不會向該池子請求內存塊。也就是說該部分的內存都是能夠給用戶代碼使用的。不過社區是打算在 Streaming 模式下也能將該池子利用起來。緩存
Flink 採用相似 DBMS 的 sort 和 join 算法,直接操做二進制數據,從而使序列化/反序列化帶來的開銷達到最小。因此 Flink 的內部實現更像 C/C++ 而非 Java。若是須要處理的數據超出了內存限制,則會將部分數據存儲到硬盤上。若是要操做多塊MemorySegment就像操做一塊大的連續內存同樣,Flink會使用邏輯視圖(**AbstractPagedInputView**
)來方便操做。下圖描述了 Flink 如何存儲序列化後的數據到內存塊中,以及在須要的時候如何將數據存儲到磁盤上。網絡
從上面咱們可以得出 Flink 積極的內存管理以及直接操做二進制數據有如下幾點好處:數據結構
MemoryManager
中,這些MemorySegment
一直呆在老年代而不會被GC回收。其餘的數據對象基本上是由用戶代碼生成的短生命週期對象,這部分對象能夠被 Minor GC 快速回收。只要用戶不去建立大量相似緩存的常駐型對象,那麼老年代的大小是不會變的,Major GC也就永遠不會發生。從而有效地下降了垃圾回收的壓力。另外,這裏的內存塊還能夠是堆外內存,這可使得 JVM 內存更小,從而加速垃圾回收。**OutOfMemoryErrors**
能夠有效地被避免。目前 Java 生態圈提供了衆多的序列化框架:Java serialization, Kryo, Apache Avro 等等。可是 Flink 實現了本身的序列化框架。由於在 Flink 中處理的數據流一般是同一類型,因爲數據集對象的類型固定,對於數據集能夠只保存一份對象 Schema 信息,節省大量的存儲空間。同時,對於固定大小的類型,也可經過固定的偏移位置存取。當咱們須要訪問某個對象成員變量的時候,經過定製的序列化工具,並不須要反序列化整個 Java 對象,而是能夠直接經過偏移量,只是反序列化特定的對象成員變量。若是對象的成員變量較多時,可以大大減小 Java 對象的建立開銷,以及內存數據的拷貝大小。app
Flink 支持任意的 Java 或是 Scala 類型。Flink 在數據類型上有很大的進步,不須要實現一個特定的接口(像 Hadoop 中的**org.apache.hadoop.io.Writable**
),Flink 可以自動識別數據類型。Flink 經過 Java Reflection 框架分析基於 Java 的 Flink 程序 UDF (User Define Function)的返回類型的類型信息,經過 Scala Compiler 分析基於 Scala 的 Flink 程序 UDF 的返回類型的類型信息。類型信息由 **TypeInformation**
類表示,TypeInformation 支持如下幾種類型:框架
BasicTypeInfo
: 任意Java 基本類型(裝箱的)或 String 類型。BasicArrayTypeInfo
: 任意Java基本類型數組(裝箱的)或 String 數組。**WritableTypeInfo**
: 任意 Hadoop Writable 接口的實現類。TupleTypeInfo
: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實現。CaseClassTypeInfo
: 任意的 Scala CaseClass(包括 Scala tuples)。PojoTypeInfo
: 任意的 POJO (Java or Scala),例如,Java對象的全部成員變量,要麼是 public 修飾符定義,要麼有 getter/setter 方法。GenericTypeInfo
: 任意沒法匹配以前幾種類型的類。前六種數據類型基本上能夠知足絕大部分的 Flink 程序,針對前六種類型數據集,Flink 皆能夠自動生成對應的TypeSerializer,能很是高效地對數據集進行序列化和反序列化。對於最後一種數據類型,Flink 會使用 Kryo 進行序列化和反序列化。每一個 TypeInformation 中,都包含了 serializer,類型會自動經過serializer進行序列化,而後用 Java Unsafe 接口寫入 MemorySegments。對於能夠用做 key 的數據類型,Flink 還同時自動生成 TypeComparator,用來輔助直接對序列化後的二進制數據進行 compare、hash 等操做。對於 Tuple、CaseClass、POJO 等組合類型,其 TypeSerializer 和 TypeComparator 也是組合的,序列化和比較時會委託給對應的 serializers 和 comparators。以下圖展現 一個內嵌型的 Tuple3<Integer,Double,Person> 對象的序列化過程。
能夠看出這種序列化方式存儲密度是至關緊湊的。其中 int 佔4字節,double 佔8字節,POJO 多個一個字節的 header,PojoSerializer 只負責將 header序列化進去,並委託每一個字段對應的 serializer 對字段進行序列化。
Flink 的類型系統能夠很輕鬆地擴展出自定義的TypeInformation、Serializer 以及 Comparator,來提高數據類型在序列化和比較時的性能。
Flink 提供瞭如 group、sort、join 等操做,這些操做都須要訪問海量數據。這裏,咱們以 sort 爲例,這是一個在 Flink 中使用很是頻繁的操做。
首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,咱們把這批 MemorySegment 稱做 sort buffer,用來存放排序的數據。
咱們會把 sort buffer 分紅兩塊區域。一個區域是用來存放全部對象完整的二進制數據。另外一個區域用來存放指向完整二進制數據的指針以及定長的序列化後的key(key+pointer)。若是須要序列化的 key 是個變長類型,如 String,則會取其前綴序列化。如上圖所示,當一個對象要加到 sort buffer 中時,它的二進制數據會被加到第一個區域,指針(可能還有 key)會被加到第二個區域。
將實際的數據和指針加定長 key 分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的數據也不用移動其餘 key 和 pointer。第二,這樣作是緩存友好的,由於 key 都是連續存儲在內存中的,能夠大大減小 cache miss(後面會詳細解釋)。
排序的關鍵是比大小和交換。Flink 中,會先用 key 比大小,這樣就能夠直接用二進制的 key 比較而不須要反序列化出整個對象。由於 key 是定長的,因此若是 key 相同(或者沒有提供二進制key),那就必須將真實的二進制數據反序列化出來,而後再作比較。以後,只須要交換 key+pointer 就能夠達到排序的效果,真實的數據不用移動。
最後,訪問排序後的數據,能夠沿着排好序的key+pointer區域順序訪問,經過pointer找到對應的真實數據,並寫到內存或外部(更多細節能夠看這篇文章 Joins in Flink)。
隨着磁盤 IO 和網絡 IO 愈來愈快,CPU 逐漸成爲了大數據領域的瓶頸。從 L1/L2/L3 緩存讀取數據的速度比從主內存讀取數據的速度快好幾個量級。經過性能分析能夠發現,CPU 時間中的很大一部分都是浪費在等待數據從主內存過來上。若是這些數據能夠從 L1/L2/L3 緩存過來,那麼這些等待時間能夠極大地下降,而且全部的算法會所以而受益。
在上面討論中咱們談到的,Flink 經過定製的序列化框架將算法中須要操做的數據(如 sort 中的 key)連續存儲,而完整數據存儲在其餘地方。由於對於完整的數據來講,key+pointer 更容易裝進緩存,這大大提升了緩存命中率,從而提升了基礎算法的效率。這對於上層應用是徹底透明的,能夠充分享受緩存友好帶來的性能提高。
Flink 基於堆內存的內存管理機制已經能夠解決不少 JVM 現存問題了,爲何還要引入堆外內存?
可是強大的東西老是會有其負面的一面,否則爲什麼你們不都用堆外內存呢。
MemorySegment
,這個申請在堆上會更廉價。Flink用經過**ByteBuffer.allocateDirect(numBytes)**
來申請堆外內存,用 **sun.misc.Unsafe**
來操做堆外內存。
基於 Flink 優秀的設計,實現堆外內存是很方便的。Flink 將原來的 **MemorySegment**
變成了抽象類,並生成了兩個子類。**HeapMemorySegment**
和 **HybridMemorySegment**
。從字面意思上也很容易理解,前者是用來分配堆內存的,後者是用來分配堆外內存和堆內存的。是的,你沒有看錯,後者既能夠分配堆外內存又能夠分配堆內存。爲何要這樣設計呢?
首先假設**HybridMemorySegment**
只提供分配堆外內存。在上述堆外內存的不足中的第二點談到,Flink 有時須要分配短生命週期的 buffer,這些 buffer 用**HeapMemorySegment**
會更高效。那麼當使用堆外內存時,爲了也知足堆內存的需求,咱們須要同時加載兩個子類。這就涉及到了 JIT 編譯優化的問題。由於之前 **MemorySegment**
是一個單獨的 final 類,沒有子類。JIT 編譯時,全部要調用的方法都是肯定的,全部的方法調用均可以被去虛化(de-virtualized)和內聯(inlined),這能夠極大地提升性能(MemroySegment的使用至關頻繁)。然而若是同時加載兩個子類,那麼 JIT 編譯器就只能在真正運行到的時候才知道是哪一個子類,這樣就沒法提早作優化。實際測試的性能差距在 2.7 被左右。
Flink 使用了兩種方案:
方案1:只能有一種 MemorySegment 實現被加載
代碼中全部的短生命週期和長生命週期的 MemorySegment 都實例化其中一個子類,另外一個子類根本沒有實例化過(使用工廠模式來控制)。那麼運行一段時間後,JIT 會意識到全部調用的方法都是肯定的,而後會作優化。
方案2:提供一種實現能同時處理堆內存和堆外內存
這就是 **HybridMemorySegment**
了,能同時處理堆與堆外內存,這樣就不須要子類了。這裏 Flink 優雅地實現了一份代碼能同時操做堆和堆外內存。這主要歸功於 **sun.misc.Unsafe**
提供的一系列方法,如 getLong方法:
sun.misc.Unsafe.getLong(Object reference, long offset)
這裏咱們看下 **MemorySegment**
及其子類的實現。
public abstract class MemorySegment { // 堆內存引用 protected final byte[] heapMemory; // 堆外內存地址 protected long address; //堆內存的初始化 MemorySegment(byte[] buffer, Object owner) { //一些先驗檢查 ... this.heapMemory = buffer; this.address = BYTE_ARRAY_BASE_OFFSET; ... } //堆外內存的初始化 MemorySegment(long offHeapAddress, int size, Object owner) { //一些先驗檢查 ... this.heapMemory = null; this.address = offHeapAddress; ... } public final long getLong(int index) { final long pos = address + index; if (index >= 0 && pos <= addressLimit - 8) { // 這是咱們關注的地方,使用 Unsafe 來操做 on-heap & off-heap return UNSAFE.getLong(heapMemory, pos); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } } ... } public final class HeapMemorySegment extends MemorySegment { // 指向heapMemory的額外引用,用來如數組越界的檢查 private byte[] memory; // 只能初始化堆內存 HeapMemorySegment(byte[] memory, Object owner) { super(Objects.requireNonNull(memory), owner); this.memory = memory; } ... } public final class HybridMemorySegment extends MemorySegment { private final ByteBuffer offHeapBuffer; // 堆外內存初始化 HybridMemorySegment(ByteBuffer buffer, Object owner) { super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner); this.offHeapBuffer = buffer; } // 堆內存初始化 HybridMemorySegment(byte[] buffer, Object owner) { super(buffer, owner); this.offHeapBuffer = null; } ... }
能夠發現,HybridMemorySegment 中的不少方法其實都下沉到了父類去實現。包括堆內堆外內存的初始化。**MemorySegment**
中的 **getXXX**
/**putXXX**
方法都是調用了 unsafe 方法,能夠說**MemorySegment**
已經具備了些 Hybrid 的意思了。**HeapMemorySegment**
只調用了父類的**MemorySegment(byte[] buffer, Object owner)**
方法,也就只能申請堆內存。另外,閱讀代碼你會發現,許多方法(大量的 getXXX/putXXX)都被標記成了 final,兩個子類也是 final 類型,爲的也是優化 JIT 編譯器,會提醒 JIT 這個方法是能夠被去虛化和內聯的。
對於堆外內存,使用 **HybridMemorySegment**
能同時用來表明堆和堆外內存。這樣只須要一個類就能表明長生命週期的堆外內存和短生命週期的堆內存。既然**HybridMemorySegment**
已經這麼全能,爲何還要方案1呢?由於咱們須要工廠模式來保證只有一個子類被加載(爲了更高的性能),並且 HeapMemorySegment 比 heap 模式的 HybridMemorySegment 要快。
下方是一些性能測試數據,更詳細的數據請參考這篇文章。
Segment | Time |
---|---|
HeapMemorySegment, exclusive | 1,441 msecs |
HeapMemorySegment, mixed | 3,841 msecs |
HybridMemorySegment, heap, exclusive | 1,626 msecs |
HybridMemorySegment, off-heap, exclusive | 1,628 msecs |
HybridMemorySegment, heap, mixed | 3,848 msecs |
HybridMemorySegment, off-heap, mixed | 3,847 msecs |
本文主要總結了 Flink 面對 JVM 存在的問題,而在內存管理的道路上越走越深。從本身管理內存,到序列化框架,再到堆外內存。其實縱觀大數據生態圈,其實會發現各個開源項目都有一樣的趨勢。好比最近炒的很火熱的 Spark Tungsten 項目,與 Flink 在內存管理上的思想是及其類似的。