目前不少大數據處理框架,例如Hadoop、Spark、Storm、Flink等。它們都基於JVM語言開發(java or scala),運行在JVM上。爲了加速合併或者排序(基於磁盤的方式一般要慢一些),須要將數據加載到內存中,因爲數據量巨大,對內存是不小的壓力。html
數據存儲最簡單的作法是將封裝成對象直接存儲到如List或者Map這樣的數據結構中。這種作法會引起兩個問題。一個問題是數據規模大,須要建立的對象很是多(數據加上存儲的數據結構,它們將耗費大量的內存),可能引起OOM。另外一個問題是GC,源源不斷的數據須要被處理,對象持續產生並須要被銷燬,對GC來講是不小的壓力。java
鑑於JVM自帶的GC沒法知足高效穩定的流處理,Flink創建了一套本身的內存管理體系。apache
MemorySegment是Flink管理內存的最小單位,是內存的抽象。MemorySegment有兩種實現,堆(HeapMemorySegment、基於JVM內存),非堆(HybridMemorySegment、混合,可以使用JVM內存或者直接內存)。數據結構
用戶能夠經過配置決定使用哪種,參數「taskmanager.memory.off-heap」默認false,使用堆;設置爲true,使用非堆。框架
MemorySegment的實現作了一些優化。函數
MemoryPool負責管理MemorySegment,例如建立、銷燬、獲取、重用等。oop
MemoryPool在初始化的時候,建立了空的ArrayDeque,以後申請內存加入到ArrayDeque中。內存申請有兩種:new byte[segmentSize] -- 堆內存、ByteBuffer.allocateDirect(segmentSize) -- 直接內存。(SegmentSize由配置"taskmanager.memory.segment-size"指定,默認32kb) 。這兩種不一樣類型的內存,在使用以前會由MemorySegmentFactory.wrapXXXHeapMemory包裝一次,統一抽象成MemorySegment。大數據
MemoryPool的生命週期很長,從TaskManager建立直至銷燬。因此在任務執行期間它佔用的內存(Segment)不會釋放,而是經過回收來重複使用。MemoryPool經過減小對象的建立和回收,大大下降了GC壓力。優化
須要注意的是:Flink不是將全部的對象都寫入到MemoryPool管理的內存中。默認的,Flink分配70%的內存給MemoryManager。執行過程當中還須要一些內存,例如用戶實現的自定義函數,在函數中建立的對象存儲在堆中,由JVM的GC機制管理。scala
Flink在內存管理以外,還有一套本身的序列化體系。在執行的過程當中,數據對象經過序列化轉換成字節,或者字節反序列化成對象。
以簡單的ETL任務距離,抽取 --> 過濾 --> 存儲,對象類型是提早預知的,調用對象的序列化便可。若用戶須要加上本身的處理,抽取 --> 過濾 --> 轉換 --> 存儲。在轉換的過程當中,會引入Flink"未知"的對象類型。爲了解決這種場景,Flink提供了一種基於反射的類型提取。用戶須要提供TypeInformation來告知Flink類型信息,Flink根據類型自動選擇合適的序列化方式。
建立TypeInformation:
TypeInformation<List<CommonLogBean>> typeInformation = TypeInformation.of(new TypeHint<List<CommonLogBean>>() {});
Flink構建了一套特有的內存管理體系,下降了OOM的風險以及GC的負載,另外提供了智能高效的序列化方式。它們功能構成了高效的流處理基礎。
[flink-memory-manager]https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html