Flink 原理與實現:內存管理

摘要: 現在,大數據領域的開源框架(Hadoop,Spark,Storm)都使用的 JVM,固然也包括 Flink。基於 JVM 的數據分析引擎都須要面對將大量數據存到內存中,這就不得不面對 JVM 存在的幾個問題: 1. Java 對象存儲密度低。一個只包含 boolean 屬性的對象佔用了16個字節內存:對象頭佔了8個,boolean 屬性佔了1個,對齊填充佔了7個。而實際上只須要一個bit(1html

現在,大數據領域的開源框架(Hadoop,Spark,Storm)都使用的 JVM,固然也包括 Flink。基於 JVM 的數據分析引擎都須要面對將大量數據存到內存中,這就不得不面對 JVM 存在的幾個問題:java

  1. Java 對象存儲密度低。一個只包含 boolean 屬性的對象佔用了16個字節內存:對象頭佔了8個,boolean 屬性佔了1個,對齊填充佔了7個。而實際上只須要一個bit(1/8字節)就夠了。
  2. Full GC 會極大地影響性能,尤爲是爲了處理更大數據而開了很大內存空間的JVM來講,GC 會達到秒級甚至分鐘級。
  3. OOM 問題影響穩定性。OutOfMemoryError是分佈式計算框架常常會遇到的問題,當JVM中全部對象大小超過度配給JVM的內存大小時,就會發生OutOfMemoryError錯誤,致使JVM崩潰,分佈式框架的健壯性和性能都會受到影響。

因此目前,愈來愈多的大數據項目開始本身管理JVM內存了,像 Spark、Flink、HBase,爲的就是得到像 C 同樣的性能以及避免 OOM 的發生。本文將會討論 Flink 是如何解決上面的問題的,主要內容包括內存管理、定製的序列化工具、緩存友好的數據結構和算法、堆外內存、JIT編譯優化等。算法

 

積極的內存管理

Flink 並非將大量對象存在堆上,而是將對象都序列化到一個預分配的內存塊上,這個內存塊叫作 MemorySegment,它表明了一段固定長度的內存(默認大小爲 32KB),也是 Flink 中最小的內存分配單元,而且提供了很是高效的讀寫方法。你能夠把 MemorySegment 想象成是爲 Flink 定製的 java.nio.ByteBuffer。它的底層能夠是一個普通的 Java 字節數組(byte[]),也能夠是一個申請在堆外的 ByteBuffer。每條記錄都會以序列化的形式存儲在一個或多個MemorySegment中。apache

Flink 中的 Worker 名叫 TaskManager,是用來運行用戶代碼的 JVM 進程。TaskManager 的堆內存主要被分紅了三個部分:數組

TB17qs5JpXXXXXhXpXXXXXXXXXX

  • Network Buffers: 必定數量的32KB大小的 buffer,主要用於數據的網絡傳輸。在 TaskManager 啓動的時候就會分配。默認數量是 2048 個,能夠經過 taskmanager.network.numberOfBuffers 來配置。(閱讀這篇文章瞭解更多Network Buffer的管理)
  • Memory Manager Pool: 這是一個由 MemoryManager 管理的,由衆多MemorySegment組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個內存池申請 MemorySegment,將序列化後的數據存於其中,使用完後釋放回內存池。默認狀況下,池子佔了堆內存的 70% 的大小。
  • Remaining (Free) Heap: 這部分的內存是留給用戶代碼以及 TaskManager 的數據結構使用的。由於這些數據結構通常都很小,因此基本上這些內存都是給用戶代碼使用的。從GC的角度來看,能夠把這裏當作的新生代,也就是說這裏主要都是由用戶代碼生成的短時間對象。

注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,該池子不會預分配內存,也不會向該池子請求內存塊。也就是說該部分的內存都是能夠給用戶代碼使用的。不過社區是打算在 Streaming 模式下也能將該池子利用起來。緩存

Flink 採用相似 DBMS 的 sort 和 join 算法,直接操做二進制數據,從而使序列化/反序列化帶來的開銷達到最小。因此 Flink 的內部實現更像 C/C++ 而非 Java。若是須要處理的數據超出了內存限制,則會將部分數據存儲到硬盤上。若是要操做多塊MemorySegment就像操做一塊大的連續內存同樣,Flink會使用邏輯視圖(AbstractPagedInputView)來方便操做。下圖描述了 Flink 如何存儲序列化後的數據到內存塊中,以及在須要的時候如何將數據存儲到磁盤上。網絡

從上面咱們可以得出 Flink 積極的內存管理以及直接操做二進制數據有如下幾點好處:數據結構

  1. 減小GC壓力。顯而易見,由於全部常駐型數據都以二進制的形式存在 Flink 的MemoryManager中,這些MemorySegment一直呆在老年代而不會被GC回收。其餘的數據對象基本上是由用戶代碼生成的短生命週期對象,這部分對象能夠被 Minor GC 快速回收。只要用戶不去建立大量相似緩存的常駐型對象,那麼老年代的大小是不會變的,Major GC也就永遠不會發生。從而有效地下降了垃圾回收的壓力。另外,這裏的內存塊還能夠是堆外內存,這可使得 JVM 內存更小,從而加速垃圾回收。
  2. 避免了OOM。全部的運行時數據結構和算法只能經過內存池申請內存,保證了其使用的內存大小是固定的,不會由於運行時數據結構和算法而發生OOM。在內存吃緊的狀況下,算法(sort/join等)會高效地將一大批內存塊寫到磁盤,以後再讀回來。所以,OutOfMemoryErrors能夠有效地被避免。
  3. 節省內存空間。Java 對象在存儲上有不少額外的消耗(如上一節所談)。若是隻存儲實際數據的二進制內容,就能夠避免這部分消耗。
  4. 高效的二進制操做 & 緩存友好的計算。二進制數據以定義好的格式存儲,能夠高效地比較與操做。另外,該二進制形式能夠把相關的值,以及hash值,鍵值和指針等相鄰地放進內存中。這使得數據結構能夠對高速緩存更友好,能夠從 L1/L2/L3 緩存得到性能的提高(下文會詳細解釋)。

爲 Flink 量身定製的序列化框架

目前 Java 生態圈提供了衆多的序列化框架:Java serialization, Kryo, Apache Avro 等等。可是 Flink 實現了本身的序列化框架。由於在 Flink 中處理的數據流一般是同一類型,因爲數據集對象的類型固定,對於數據集能夠只保存一份對象Schema信息,節省大量的存儲空間。同時,對於固定大小的類型,也可經過固定的偏移位置存取。當咱們須要訪問某個對象成員變量的時候,經過定製的序列化工具,並不須要反序列化整個Java對象,而是能夠直接經過偏移量,只是反序列化特定的對象成員變量。若是對象的成員變量較多時,可以大大減小Java對象的建立開銷,以及內存數據的拷貝大小。app

Flink支持任意的Java或是Scala類型。Flink 在數據類型上有很大的進步,不須要實現一個特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 可以自動識別數據類型。Flink 經過 Java Reflection 框架分析基於 Java 的 Flink 程序 UDF (User Define Function)的返回類型的類型信息,經過 Scala Compiler 分析基於 Scala 的 Flink 程序 UDF 的返回類型的類型信息。類型信息由 TypeInformation 類表示,TypeInformation 支持如下幾種類型:框架

  • BasicTypeInfo: 任意Java 基本類型(裝箱的)或 String 類型。
  • BasicArrayTypeInfo: 任意Java基本類型數組(裝箱的)或 String 數組。
  • WritableTypeInfo: 任意 Hadoop Writable 接口的實現類。
  • TupleTypeInfo: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實現。
  • CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java對象的全部成員變量,要麼是 public 修飾符定義,要麼有 getter/setter 方法。
  • GenericTypeInfo: 任意沒法匹配以前幾種類型的類。

前六種數據類型基本上能夠知足絕大部分的Flink程序,針對前六種類型數據集,Flink皆能夠自動生成對應的TypeSerializer,能很是高效地對數據集進行序列化和反序列化。對於最後一種數據類型,Flink會使用Kryo進行序列化和反序列化。每一個TypeInformation中,都包含了serializer,類型會自動經過serializer進行序列化,而後用Java Unsafe接口寫入MemorySegments。對於能夠用做key的數據類型,Flink還同時自動生成TypeComparator,用來輔助直接對序列化後的二進制數據進行compare、hash等操做。對於 Tuple、CaseClass、POJO 等組合類型,其TypeSerializer和TypeComparator也是組合的,序列化和比較時會委託給對應的serializers和comparators。以下圖展現 一個內嵌型的Tuple3 對象的序列化過程。

能夠看出這種序列化方式存儲密度是至關緊湊的。其中 int 佔4字節,double 佔8字節,POJO多個一個字節的header,PojoSerializer只負責將header序列化進去,並委託每一個字段對應的serializer對字段進行序列化。

Flink 的類型系統能夠很輕鬆地擴展出自定義的TypeInformation、Serializer以及Comparator,來提高數據類型在序列化和比較時的性能。

Flink 如何直接操做二進制數據

Flink 提供瞭如 group、sort、join 等操做,這些操做都須要訪問海量數據。這裏,咱們以sort爲例,這是一個在 Flink 中使用很是頻繁的操做。

首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,咱們把這批 MemorySegment 稱做 sort buffer,用來存放排序的數據。

咱們會把 sort buffer 分紅兩塊區域。一個區域是用來存放全部對象完整的二進制數據。另外一個區域用來存放指向完整二進制數據的指針以及定長的序列化後的key(key+pointer)。若是須要序列化的key是個變長類型,如String,則會取其前綴序列化。如上圖所示,當一個對象要加到 sort buffer 中時,它的二進制數據會被加到第一個區域,指針(可能還有key)會被加到第二個區域。

將實際的數據和指針加定長key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的數據也不用移動其餘key和pointer。第二,這樣作是緩存友好的,由於key都是連續存儲在內存中的,能夠大大減小 cache miss(後面會詳細解釋)。

排序的關鍵是比大小和交換。Flink 中,會先用 key 比大小,這樣就能夠直接用二進制的key比較而不須要反序列化出整個對象。由於key是定長的,因此若是key相同(或者沒有提供二進制key),那就必須將真實的二進制數據反序列化出來,而後再作比較。以後,只須要交換key+pointer就能夠達到排序的效果,真實的數據不用移動。

最後,訪問排序後的數據,能夠沿着排好序的key+pointer區域順序訪問,經過pointer找到對應的真實數據,並寫到內存或外部(更多細節能夠看這篇文章 Joins in Flink)。

緩存友好的數據結構和算法

隨着磁盤IO和網絡IO愈來愈快,CPU逐漸成爲了大數據領域的瓶頸。從 L1/L2/L3 緩存讀取數據的速度比從主內存讀取數據的速度快好幾個量級。經過性能分析能夠發現,CPU時間中的很大一部分都是浪費在等待數據從主內存過來上。若是這些數據能夠從 L1/L2/L3 緩存過來,那麼這些等待時間能夠極大地下降,而且全部的算法會所以而受益。

在上面討論中咱們談到的,Flink 經過定製的序列化框架將算法中須要操做的數據(如sort中的key)連續存儲,而完整數據存儲在其餘地方。由於對於完整的數據來講,key+pointer更容易裝進緩存,這大大提升了緩存命中率,從而提升了基礎算法的效率。這對於上層應用是徹底透明的,能夠充分享受緩存友好帶來的性能提高。

走向堆外內存

Flink 基於堆內存的內存管理機制已經能夠解決不少JVM現存問題了,爲何還要引入堆外內存?

  1. 啓動超大內存(上百GB)的JVM須要很長時間,GC停留時間也會很長(分鐘級)。使用堆外內存的話,能夠極大地減少堆內存(只須要分配Remaining Heap那一塊),使得 TaskManager 擴展到上百GB內存不是問題。
  2. 高效的 IO 操做。堆外內存在寫磁盤或網絡傳輸時是 zero-copy,而堆內存的話,至少須要 copy 一次。
  3. 堆外內存是進程間共享的。也就是說,即便JVM進程崩潰也不會丟失數據。這能夠用來作故障恢復(Flink暫時沒有利用起這個,不過將來極可能會去作)。

可是強大的東西老是會有其負面的一面,否則爲什麼你們不都用堆外內存呢。

  1. 堆內存的使用、監控、調試都要簡單不少。堆外內存意味着更復雜更麻煩。
  2. Flink 有時須要分配短生命週期的 MemorySegment,這個申請在堆上會更廉價。
  3. 有些操做在堆內存上會快一點點。

Flink用經過ByteBuffer.allocateDirect(numBytes)來申請堆外內存,用 sun.misc.Unsafe 來操做堆外內存。

基於 Flink 優秀的設計,實現堆外內存是很方便的。Flink 將原來的 MemorySegment 變成了抽象類,並生成了兩個子類。HeapMemorySegment 和 HybridMemorySegment。從字面意思上也很容易理解,前者是用來分配堆內存的,後者是用來分配堆外內存和堆內存的。是的,你沒有看錯,後者既能夠分配堆外內存又能夠分配堆內存。爲何要這樣設計呢?

首先假設HybridMemorySegment只提供分配堆外內存。在上述堆外內存的不足中的第二點談到,Flink 有時須要分配短生命週期的 buffer,這些buffer用HeapMemorySegment會更高效。那麼當使用堆外內存時,爲了也知足堆內存的需求,咱們須要同時加載兩個子類。這就涉及到了 JIT 編譯優化的問題。由於之前 MemorySegment 是一個單獨的 final 類,沒有子類。JIT 編譯時,全部要調用的方法都是肯定的,全部的方法調用均可以被去虛化(de-virtualized)和內聯(inlined),這能夠極大地提升性能(MemroySegment的使用至關頻繁)。然而若是同時加載兩個子類,那麼 JIT 編譯器就只能在真正運行到的時候才知道是哪一個子類,這樣就沒法提早作優化。實際測試的性能差距在 2.7 被左右。

Flink 使用了兩種方案:

方案1:只能有一種 MemorySegment 實現被加載

代碼中全部的短生命週期和長生命週期的MemorySegment都實例化其中一個子類,另外一個子類根本沒有實例化過(使用工廠模式來控制)。那麼運行一段時間後,JIT 會意識到全部調用的方法都是肯定的,而後會作優化。

方案2:提供一種實現能同時處理堆內存和堆外內存

這就是 HybridMemorySegment 了,能同時處理堆與堆外內存,這樣就不須要子類了。這裏 Flink 優雅地實現了一份代碼能同時操做堆和堆外內存。這主要歸功於 sun.misc.Unsafe提供的一系列方法,如getLong方法:

sun.misc.Unsafe.getLong(Object reference, long offset)
  • 若是reference不爲空,則會取該對象的地址,加上後面的offset,從相對地址處取出8字節並獲得 long。這對應了堆內存的場景。
  • 若是reference爲空,則offset就是要操做的絕對地址,從該地址處取出數據。這對應了堆外內存的場景。

這裏咱們看下 MemorySegment 及其子類的實現。

public abstract class MemorySegment {
  // 堆內存引用
  protected final byte[] heapMemory;
  // 堆外內存地址
  protected long address;
  
  //堆內存的初始化
  MemorySegment(byte[] buffer, Object owner) {
    //一些先驗檢查
    ...
    this.heapMemory = buffer;
    this.address = BYTE_ARRAY_BASE_OFFSET;
    ...
  }

  //堆外內存的初始化
  MemorySegment(long offHeapAddress, int size, Object owner) {
    //一些先驗檢查
    ...
    this.heapMemory = null;
    this.address = offHeapAddress;
    ...
  }
  
  public final long getLong(int index) {
    final long pos = address + index;
    if (index >= 0 && pos <= addressLimit - 8) {
      // 這是咱們關注的地方,使用 Unsafe 來操做 on-heap & off-heap
      return UNSAFE.getLong(heapMemory, pos);
    }
    else if (address > addressLimit) {
      throw new IllegalStateException("segment has been freed");
    }
    else {
      // index is in fact invalid
      throw new IndexOutOfBoundsException();
    }
  }
  ...
}

public final class HeapMemorySegment extends MemorySegment {
  // 指向heapMemory的額外引用,用來如數組越界的檢查
  private byte[] memory;
  // 只能初始化堆內存
  HeapMemorySegment(byte[] memory, Object owner) {
    super(Objects.requireNonNull(memory), owner);
    this.memory = memory;
  }
  ...
}

public final class HybridMemorySegment extends MemorySegment {
  private final ByteBuffer offHeapBuffer;
  
  // 堆外內存初始化
  HybridMemorySegment(ByteBuffer buffer, Object owner) {
    super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
    this.offHeapBuffer = buffer;
  }
  
  // 堆內存初始化
  HybridMemorySegment(byte[] buffer, Object owner) {
    super(buffer, owner);
    this.offHeapBuffer = null;
  }
  ...
}

能夠發現,HybridMemorySegment 中的不少方法其實都下沉到了父類去實現。包括堆內堆外內存的初始化。MemorySegment 中的 getXXX/putXXX 方法都是調用了 unsafe 方法,能夠說MemorySegment已經具備了些 Hybrid 的意思了。HeapMemorySegment只調用了父類的MemorySegment(byte[] buffer, Object owner)方法,也就只能申請堆內存。另外,閱讀代碼你會發現,許多方法(大量的 getXXX/putXXX)都被標記成了 final,兩個子類也是 final 類型,爲的也是優化 JIT 編譯器,會提醒 JIT 這個方法是能夠被去虛化和內聯的。

對於堆外內存,使用 HybridMemorySegment 能同時用來表明堆和堆外內存。這樣只須要一個類就能表明長生命週期的堆外內存和短生命週期的堆內存。既然HybridMemorySegment已經這麼全能,爲何還要方案1呢?由於咱們須要工廠模式來保證只有一個子類被加載(爲了更高的性能),並且HeapMemorySegment比heap模式的HybridMemorySegment要快。

下方是一些性能測試數據,更詳細的數據請參考這篇文章

Segment Time
HeapMemorySegment, exclusive 1,441 msecs
HeapMemorySegment, mixed 3,841 msecs
HybridMemorySegment, heap, exclusive 1,626 msecs
HybridMemorySegment, off-heap, exclusive 1,628 msecs
HybridMemorySegment, heap, mixed 3,848 msecs
HybridMemorySegment, off-heap, mixed 3,847 msecs

總結

本文主要總結了 Flink 面對 JVM 存在的問題,而在內存管理的道路上越走越深。從本身管理內存,到序列化框架,再到堆外內存。其實縱觀大數據生態圈,其實會發現各個開源項目都有一樣的趨勢。好比最近炒的很火熱的 Spark Tungsten 項目,與 Flink 在內存管理上的思想是及其類似的。

參考資料

相關文章
相關標籤/搜索