現在,許多用於分析大型數據集的開源系統都是用 Java 或者是基於 JVM 的編程語言實現的。最着名的例子是 Apache Hadoop,還有較新的框架,如 Apache Spark、Apache Drill、Apache Flink。基於 JVM 的數據分析引擎面臨的一個常見挑戰就是如何在內存中存儲大量的數據(包括緩存和高效處理)。合理的管理好 JVM 內存能夠將 難以配置且不可預測的系統 與 少許配置且穩定運行的系統區分開來。html
在這篇文章中,咱們將討論 Apache Flink 如何管理內存,討論其自定義序列化與反序列化機制,以及它是如何操做二進制數據的。java
在 JVM 中處理大量數據最直接的方式就是將這些數據作爲對象存儲在堆內存中,而後直接在內存中操做這些數據,若是想進行排序則就是對對象列表進行排序。然而這種方法有一些明顯的缺點,首先,在頻繁的建立和銷燬大量對象的時候,監視和控制堆內存的使用並非一件很簡單的事情。若是對象分配過多的話,那麼會致使內存過分使用,從而觸發 OutOfMemoryError,致使 JVM 進程直接被殺死。另外一個方面就是由於這些對象大都是生存在新生代,當 JVM 進行垃圾回收時,垃圾收集的開銷很容易達到 50% 甚至更多。最後就是 Java 對象具備必定的空間開銷(具體取決於 JVM 和平臺)。對於具備許多小對象的數據集,這能夠顯著減小有效可用的內存量。若是你精通系統設計和系統調優,你能夠根據系統進行特定的參數調整,能夠或多或少的控制出現 OutOfMemoryError 的次數和避免堆內存的過多使用,可是這種設置和調優的做用有限,尤爲是在數據量較大和執行環境發生變化的狀況下。git
Apache Flink 起源於一個研究項目,該項目旨在結合基於 MapReduce 的系統和並行數據庫系統的最佳技術。在此背景下,Flink 一直有本身的內存數據處理方法。Flink 將對象序列化爲固定數量的預先分配的內存段,而不是直接把對象放在堆內存上。它的 DBMS 風格的排序和鏈接算法儘量多地對這個二進制數據進行操做,以此將序列化和反序列化開銷降到最低。若是須要處理的數據多於能夠保存在內存中的數據,Flink 的運算符會將部分數據溢出到磁盤。事實上,不少Flink 的內部實現看起來更像是 C / C ++,而不是普通的 Java。下圖概述了 Flink 如何在內存段中存儲序列化數據並在必要時溢出到磁盤:github
Flink 的主動內存管理和操做二進制數據有幾個好處:算法
一、內存安全執行和高效的核外算法 因爲分配的內存段的數量是固定的,所以監控剩餘的內存資源是很是簡單的。在內存不足的狀況下,處理操做符能夠有效地將更大批的內存段寫入磁盤,後面再將它們讀回到內存。所以,OutOfMemoryError 就有效的防止了。sql
二、減小垃圾收集壓力 由於全部長生命週期的數據都是在 Flink 的管理內存中以二進制表示的,因此全部數據對象都是短暫的,甚至是可變的,而且能夠重用。短生命週期的對象能夠更有效地進行垃圾收集,這大大下降了垃圾收集的壓力。如今,預先分配的內存段是 JVM 堆上的長期存在的對象,爲了下降垃圾收集的壓力,Flink 社區正在積極地將其分配到堆外內存。這種努力將使得 JVM 堆變得更小,垃圾收集所消耗的時間將更少。數據庫
三、節省空間的數據存儲 Java 對象具備存儲開銷,若是數據以二進制的形式存儲,則能夠避免這種開銷。apache
四、高效的二進制操做和緩存敏感性 在給定合適的二進制表示的狀況下,能夠有效地比較和操做二進制數據。此外,二進制表示能夠將相關值、哈希碼、鍵和指針等相鄰地存儲在內存中。這使得數據結構一般具備更高效的緩存訪問模式。編程
主動內存管理的這些特性在用於大規模數據分析的數據處理系統中是很是可取的,可是要實現這些功能的代價也是高昂的。要實現對二進制數據的自動內存管理和操做並不是易事,使用 java.util.HashMap
比實現一個可溢出的 hash-table
(由字節數組和自定義序列化支持)。固然,Apache Flink 並非惟一一個基於 JVM 且對二進制數據進行操做的數據處理系統。例如 Apache Drill、Apache Ignite、Apache Geode 也有應用相似技術,最近 Apache Spark 也宣佈將向這個方向演進。數組
下面咱們將詳細討論 Flink 如何分配內存、若是對對象進行序列化和反序列化以及若是對二進制數據進行操做。咱們還將經過一些性能表現數據來比較處理堆內存上的對象和對二進制數據的操做。
Flink TaskManager 是由幾個內部組件組成的:actor 系統(負責與 Flink master 協調)、IOManager(負責將數據溢出到磁盤並將其讀取回來)、MemoryManager(負責協調內存使用)。在本篇文章中,咱們主要講解 MemoryManager。
MemoryManager 負責將 MemorySegments 分配、計算和分發給數據處理操做符,例如 sort 和 join 等操做符。MemorySegment 是 Flink 的內存分配單元,由常規 Java 字節數組支持(默認大小爲 32 KB)。MemorySegment 經過使用 Java 的 unsafe 方法對其支持的字節數組提供很是有效的讀寫訪問。你能夠將 MemorySegment 看做是 Java 的 NIO ByteBuffer 的定製版本。爲了在更大的連續內存塊上操做多個 MemorySegment,Flink 使用了實現 Java 的 java.io.DataOutput 和 java.io.DataInput 接口的邏輯視圖。
MemorySegments 在 TaskManager 啓動時分配一次,並在 TaskManager 關閉時銷燬。所以,在 TaskManager 的整個生命週期中,MemorySegment 是重用的,而不會被垃圾收集的。在初始化 TaskManager 的全部內部數據結構而且已啓動全部核心服務以後,MemoryManager 開始建立 MemorySegments。默認狀況下,服務初始化後,70% 可用的 JVM 堆內存由 MemoryManager 分配(也能夠配置所有)。剩餘的 JVM 堆內存用於在任務處理期間實例化的對象,包括由用戶定義的函數建立的對象。下圖顯示了啓動後 TaskManager JVM 中的內存分佈:
Java 生態系統提供了幾個庫,能夠將對象轉換爲二進制表示形式並返回。常見的替代方案是標準 Java 序列化,Kryo,Apache Avro,Apache Thrift 或 Google 的 Protobuf。Flink 包含本身的自定義序列化框架,以便控制數據的二進制表示。這一點很重要,由於對二進制數據進行操做須要對序列化佈局有準確的瞭解。此外,根據在二進制數據上執行的操做配置序列化佈局能夠顯著提高性能。Flink 的序列化機制利用了這一特性,即在執行程序以前,要序列化和反序列化的對象的類型是徹底已知的。
Flink 程序能夠處理表示爲任意 Java 或 Scala 對象的數據。在優化程序以前,須要識別程序數據流的每一個處理步驟中的數據類型。對於 Java 程序,Flink 提供了一個基於反射的類型提取組件,用於分析用戶定義函數的返回類型。Scala 程序能夠在 Scala 編譯器的幫助下進行分析。Flink 使用 TypeInformation 表示每種數據類型。
注:該圖選自董偉柯的文章《Apache Flink 類型和序列化機制簡介》,侵刪
Flink 有以下幾種數據類型的 TypeInformations:
BasicTypeInfo:全部 Java 的基礎類型或 java.lang.String
BasicArrayTypeInfo:Java 基本類型構成的數組或 java.lang.String
WritableTypeInfo:Hadoop 的 Writable 接口的任何實現
TupleTypeInfo:任何 Flink tuple(Tuple1 到 Tuple25)。Flink tuples 是具備類型化字段的固定長度元組的 Java 表示
CaseClassTypeInfo:任何 Scala CaseClass(包括 Scala tuples)
PojoTypeInfo:任何 POJO(Java 或 Scala),即全部字段都是 public 的或經過 getter 和 setter 訪問的對象,遵循通用命名約定
GenericTypeInfo:不能標識爲其餘類型的任何數據類型
注:該圖選自董偉柯的文章《Apache Flink 類型和序列化機制簡介》,侵刪
每一個 TypeInformation 都爲它所表明的數據類型提供了一個序列化器。例如,BasicTypeInfo 返回一個序列化器,該序列化器寫入相應的基本類型;WritableTypeInfo 的序列化器將序列化和反序列化委託給實現 Hadoop 的 Writable 接口的對象的 write() 和 readFields() 方法;GenericTypeInfo 返回一個序列化器,該序列化器將序列化委託給 Kryo。對象將自動經過 Java 中高效的 Unsafe 方法來序列化到 Flink MemorySegments 支持的 DataOutput。對於可用做鍵的數據類型,例如哈希值,TypeInformation 提供了 TypeComparators,TypeComparators 比較和哈希對象,而且能夠根據具體的數據類型有效的比較二進制並提取固定長度的二進制 key 前綴。
Tuple,Pojo 和 CaseClass 類型是複合類型,它們可能嵌套一個或者多個數據類型。所以,它們的序列化和比較也都比較複雜,通常將其成員數據類型的序列化和比較都交給各自的 Serializers(序列化器) 和 Comparators(比較器)。下圖說明了 Tuple3<Integer, Double, Person>
對象的序列化,其中Person
是 POJO 並定義以下:
public class Person { public int id; public String name; }
經過提供定製的 TypeInformations、Serializers(序列化器) 和 Comparators(比較器),能夠方便地擴展 Flink 的類型系統,從而提升序列化和比較自定義數據類型的性能。
與其餘的數據處理框架的 API(包括 SQL)相似,Flink 的 API 也提供了對數據集進行分組、排序和鏈接等轉換操做。這些轉換操做的數據集可能很是大。關係數據庫系統具備很是高效的算法,好比 merge-sort、merge-join 和 hash-join。Flink 創建在這種技術的基礎上,可是主要分爲使用自定義序列化和自定義比較器來處理任意對象。在下面文章中咱們將經過 Flink 的內存排序算法示例演示 Flink 如何使用二進制數據進行操做。
Flink 爲其數據處理操做符預先分配內存,初始化時,排序算法從 MemoryManager 請求內存預算,並接收一組相應的 MemorySegments。這些 MemorySegments 變成了緩衝區的內存池,緩衝區中收集要排序的數據。下圖說明了如何將數據對象序列化到排序緩衝區中:
排序緩衝區在內部分爲兩個內存區域:第一個區域保存全部對象的完整二進制數據,第二個區域包含指向完整二進制對象數據的指針(取決於 key 的數據類型)。將對象添加到排序緩衝區時,它的二進制數據會追加到第一個區域,指針(可能還有一個 key)被追加到第二個區域。分離實際數據和指針以及固定長度的 key 有兩個目的:它能夠有效的交換固定長度的 entries(key 和指針),還能夠減小排序時須要移動的數據。若是排序的 key 是可變長度的數據類型(好比 String),則固定長度的排序 key 必須是前綴 key,好比字符串的前 n 個字符。請注意:並不是全部數據類型都提供固定長度的前綴排序 key。將對象序列化到排序緩衝區時,兩個內存區域都使用內存池中的 MemorySegments 進行擴展。一旦內存池爲空且不能再添加對象時,則排序緩衝區將會被徹底填充並能夠進行排序。Flink 的排序緩衝區提供了比較和交換元素的方法,這使得實際的排序算法是可插拔的。默認狀況下, Flink 使用了 Quicksort(快速排序)實現,可使用 HeapSort(堆排序)。下圖顯示瞭如何比較兩個對象:
排序緩衝區經過比較它們的二進制固定長度排序 key 來比較兩個元素。若是元素的完整 key(不是前綴 key) 或者二進制前綴 key 不相等,則表明比較成功。若是前綴 key 相等(或者排序 key 的數據類型不提供二進制前綴 key),則排序緩衝區遵循指向實際對象數據的指針,對兩個對象進行反序列化並比較對象。根據比較結果,排序算法決定是否交換比較的元素。排序緩衝區經過移動其固定長度 key 和指針來交換兩個元素,實際數據不會移動,排序算法完成後,排序緩衝區中的指針被正確排序。下圖演示瞭如何從排序緩衝區返回已排序的數據:
經過順序讀取排序緩衝區的指針區域,跳過排序 key 並按照實際數據的排序指針返回排序數據。此數據要麼反序列化並做爲對象返回,要麼在外部合併排序的狀況下複製二進制數據並將其寫入磁盤。
那麼,對二進制數據進行操做對性能意味着什麼?咱們將運行一個基準測試,對 1000 萬個Tuple2<Integer, String>
對象進行排序以找出答案。整數字段的值從均勻分佈中採樣。String 字段值的長度爲 12 個字符,並從長尾分佈中進行採樣。輸入數據由返回可變對象的迭代器提供,即返回具備不一樣字段值的相同 Tuple 對象實例。Flink 在從內存,網絡或磁盤讀取數據時使用此技術,以免沒必要要的對象實例化。基準測試在具備 900 MB 堆大小的 JVM 中運行,在堆上存儲和排序 1000 萬個 Tuple 對象而且不會致使觸發 OutOfMemoryError 大約須要這麼大的內存。咱們使用三種排序方法在Integer 字段和 String 字段上對 Tuple 對象進行排序:
一、對象存在堆中:Tuple 對象存儲在經常使用的 java.util.ArrayList
中,初始容量設置爲 1000 萬,並使用 Java 中經常使用的集合排序進行排序。
三、Kryo 序列化:使用 Kryo 序列化將 Tuple 字段序列化爲 600 MB 大小的排序緩衝區,並在沒有二進制排序 key 的狀況下進行排序。這意味着每次比較須要對兩個對象進行反序列化。
全部排序方法都使用單線程實現。結果的時間是十次運行結果的平均值。在每次運行以後,咱們調用System.gc()
請求垃圾收集運行,該運行不會進入測量的執行時間。下圖顯示了將輸入數據存儲在內存中,對其進行排序並將其做爲對象讀回的時間。
咱們看到 Flink 使用本身的序列化器對二進制數據進行排序明顯優於其餘兩種方法。與存儲在堆內存上相比,咱們看到將數據加載到內存中要快得多。由於咱們其實是在收集對象,沒有機會重用對象實例,但必須從新建立每一個 Tuple。這比 Flink 的序列化器(或Kryo序列化)效率低。另外一方面,與反序列化相比,從堆中讀取對象是無性能消耗的。在咱們的基準測試中,對象克隆比序列化和反序列化組合更耗性能。查看排序時間,咱們看到對二進制數據的排序也比 Java 的集合排序更快。使用沒有二進制排序 key 的 Kryo 序列化的數據排序比其餘方法慢得多。這是由於反序列化帶來很大的開銷。在String 字段上對 Tuple 進行排序比在 Integer 字段上排序更快,由於長尾值分佈顯着減小了成對比較的數量。爲了更好地瞭解排序過程當中發生的情況,咱們使用 VisualVM 監控執行的 JVM。如下截圖顯示了執行 10次 運行時的堆內存使用狀況、垃圾收集狀況和 CPU 使用狀況。
測試是在 8 核機器上運行單線程,所以一個核心的徹底利用僅對應 12.5% 的整體利用率。截圖顯示,對二進制數據進行操做可顯著減小垃圾回收活動。對於對象存在堆中,垃圾收集器在排序緩衝區被填滿時以很是短的時間間隔運行,而且即便對於單個處理線程也會致使大量 CPU 使用(排序自己不會觸發垃圾收集器)。JVM 垃圾收集多個並行線程,解釋了高CPU 整體利用率。另外一方面,對序列化數據進行操做的方法不多觸發垃圾收集器而且 CPU 利用率低得多。實際上,若是使用 Flink 序列化的方式在 Integer 字段上對 Tuple 進行排序,則垃圾收集器根本不運行,由於對於成對比較,不須要反序列化任何對象。Kryo 序列化須要比較多的垃圾收集,由於它不使用二進制排序 key 而且每次排序都要反序列化兩個對象。
內存使用狀況上圖顯示 Flink 序列化和 Kryo 序列化不斷的佔用大量內存
存使用狀況圖表顯示flink-serialized和kryo-serialized不斷佔用大量內存。這是因爲 MemorySegments 的預分配。實際內存使用率要低得多,由於排序緩衝區並未徹底填充。下表顯示了每種方法的內存消耗。1000 萬條數據產生大約 280 MB 的二進制數據(對象數據、指針和排序 key),具體取決於使用的序列化程序以及二進制排序 key 的存在和大小。將其與數據存儲在堆上的方法進行比較,咱們發現對二進制數據進行操做能夠顯著提升內存效率。在咱們的基準測試中,若是序列化爲排序緩衝區而不是將其做爲堆上的對象保存,則能夠在內存中對兩倍以上的數據進行排序。
佔用內存 | 對象存在堆中 | Flink 序列化 | Kryo 序列化 |
---|---|---|---|
對 Integer 排序 | 約 700 MB(堆內存) | 277 MB(排序緩衝區) | 266 MB(排序緩衝區) |
對 String 排序 | 約 700 MB(堆內存) | 315 MB(排序緩衝區) | 266 MB(排序緩衝區) |
總而言之,測試驗證了文章前面說的對二進制數據進行操做的好處。
Apache Flink 具備至關多的高級技術,能夠經過有限的內存資源安全有效地處理大量數據。可是有幾點可使 Flink 更有效率。Flink 社區正在努力將管理內存移動到堆外內存。這將容許更小的 JVM,更低的垃圾收集開銷,以及更容易的系統配置。使用 Flink 的 Table API,全部操做(如 aggregation 和 projection)的語義都是已知的(與黑盒用戶定義的函數相反)。所以,咱們能夠爲直接對二進制數據進行操做的 Table API 操做生成代碼。進一步的改進包括序列化設計,這些設計針對應用於二進制數據的操做和針對序列化器和比較器的代碼生成而定製。
Flink 的主動內存管理減小了因觸發 OutOfMemoryErrors 而殺死 JVM 進程和垃圾收集開銷的問題。
Flink 具備高效的數據序列化和反序列化機制,有助於對二進制數據進行操做,並使更多數據適合內存。
Flink 的 DBMS 風格的運算符自己在二進制數據上運行,在必要時能夠在內存中高性能地傳輸到磁盤。
本文地址: http://www.54tianzhisheng.cn/2019/03/24/Flink-code-memory-management/
本文翻譯自:https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 翻譯:zhisheng,二次轉載請註明地址,不然保留追究法律責任。
微信公衆號:zhisheng
另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號了。你能夠加個人微信:zhisheng_tian,而後回覆關鍵字:Flink 便可無條件獲取到。
更多私密資料請加入知識星球!
https://github.com/zhisheng17/flink-learning/
之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客。
一、Flink 從0到1學習 —— Apache Flink 介紹
二、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門
三、Flink 從0到1學習 —— Flink 配置文件詳解
四、Flink 從0到1學習 —— Data Source 介紹
五、Flink 從0到1學習 —— 如何自定義 Data Source ?
六、Flink 從0到1學習 —— Data Sink 介紹
七、Flink 從0到1學習 —— 如何自定義 Data Sink ?
八、Flink 從0到1學習 —— Flink Data transformation(轉換)
九、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows
十、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解
十一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch
十二、Flink 從0到1學習 —— Flink 項目如何運行?
1三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka
1四、Flink 從0到1學習 —— Flink JobManager 高可用性配置
1五、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹
1六、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL
1七、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ
1八、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase
1九、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS
20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis
2一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra
2二、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume
2三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB
2四、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ
2五、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裏去了
2六、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裏去了
2八、Flink 從0到1學習 —— Flink 中如何管理配置?
2九、Flink 從0到1學習—— Flink 不能夠連續 Split(分流)?
30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文
3二、爲何說流處理即將來?
3三、OPPO 數據中臺之基石:基於 Flink SQL 構建實時數據倉庫
3六、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理
3八、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了
40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
四、Flink 源碼解析 —— standalone session 模式啓動流程
五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動
六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動
七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程
八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程
九、Flink 源碼解析 —— 如何獲取 JobGraph?
十、Flink 源碼解析 —— 如何獲取 StreamGraph?
十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?
十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?
1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程
1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程
1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制
1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制
1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?
1八、Flink Metrics 源碼解析 —— Flink-metrics-core
1九、Flink Metrics 源碼解析 —— Flink-metrics-datadog
20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard
2一、Flink Metrics 源碼解析 —— Flink-metrics-graphite
2二、Flink Metrics 源碼解析 —— Flink-metrics-influxdb
2三、Flink Metrics 源碼解析 —— Flink-metrics-jmx
2四、Flink Metrics 源碼解析 —— Flink-metrics-slf4j
2五、Flink Metrics 源碼解析 —— Flink-metrics-statsd
2六、Flink Metrics 源碼解析 —— Flink-metrics-prometheus
2七、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?
30、Flink Clients 源碼解析原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng