Flink 核心技術淺析(整理版)

1. Flink簡介

Apache Flink是一個面向分佈式數據流處理和批量數據處理的開源計算平臺,它可以基於同一個Flink流執行引擎(streaming dataflow engine),提供支持流處理和批處理兩種類型應用的功能。batch dataSet能夠視做data Streaming的一種特例。基於流執行引擎,Flink提供了諸多更高抽象層的API以便用戶編寫分佈式任務:html

  • DataSet API,對靜態數據進行批處理操做,將靜態數據抽象成分佈式的數據集,用戶能夠方便地使用Flink提供的各類操做符對分佈式數據集進行處理,支持Java、Scala和Python。
  • DataStream API,對數據流進行流處理操做,將流式的數據抽象成分佈式的數據流,用戶能夠方便地對分佈式數據流進行各類操做,支持Java和Scala。
  • Table API,對結構化數據進行查詢操做,對結構化數據抽象成關係表,並經過類SQL的DSL對關係表進行各類查詢操做,支持Java和Scala。
  • Flink ML,Flink的機器學習庫,提供了機器學習Pipelines API並實現了多種機器學習算法。
  • Gelly,Flink的圖計算庫,提供了圖計算的相關API以及多種圖計算算法。

Flink的技術棧如圖所示:java

 此外,Flink也能夠方便和Hadoop生態圈中其餘項目集成,例如Flink能夠讀取存儲在HDFS或HBase中的靜態數據,以Kafka做爲流式的數據源,直接重用MapReduce或Store代碼,或是經過YARN申請集羣資源等。算法

2. Flink核心特色

2.1 統一的批處理和流處理系統

在執行引擎這一層,流處理系統與批處理系統最大不一樣在於節點間的數據傳輸方式。對於一個流處理系統,其節點間數據傳輸的標準模型是:當一條數據被處理完成後,序列化到緩存中,而後馬上經過網絡傳輸到下一個節點,由下一個節點繼續處理。而對於一個批處理系統,其節點間數據傳輸的標準模型是:當一條數據被處理完成後,序列化到緩存中,並不會馬上經過網絡傳輸到下一個節點,當緩存寫滿,就持久化到本地硬盤上,當全部數據都被處理完成後,纔開始將處理後的數據經過網絡傳輸到下一個節點。這兩種數據傳輸模式是兩個極端,對應的是流處理對低延遲的要求和批處理系統對高吞吐量的要求。apache

Flink的執行引擎採用了一種十分靈活的方式,同時支持了上述兩種數據傳輸模型。Flink以固定的緩存塊爲單位進行網絡數據傳輸,用戶能夠經過緩存塊超時值指定緩存塊的傳輸時機。若是緩存塊的超時值爲0,則Flink的數據傳輸方式相似上文所提到流處理系統的標準模型,此時系統能夠得到最低的處理延遲。若是緩存塊的超時值爲無限大,則Flink的數據傳輸方式相似上文所提到批處理系統標準模型,此時系統能夠得到最高的吞吐量。同時緩存塊的超時值也能夠設置爲0到無限大之間的任意值。緩存塊的超時閥值越小,則Flink流處理執行引擎的數據處理延遲越低,但吞吐量也會下降,反之亦然。經過調整緩存塊的超時閥值,用戶可根據需求靈活地權衡系統延遲和吞吐量。編程

在統一的流式執行引擎基礎上,Flink同時支持了流計算和批處理,並對性能(延遲、吞吐量等)有所保障。相對於其餘原生的流處理與批處理系統,並無由於統一執行引擎而受到影響,從而大幅度減輕了用戶安裝、部署、監控、維護等成本。數組

2.2 Flink流處理的容錯機制

對於一個分佈式系統來講,單個進程或是節點崩潰致使整個Job失敗是常常發生的事情,在異常發生時不會丟失用戶數據並能自動恢復纔是分佈式系統必須支持的特性之一。本節主要介紹Flink流處理系統任務級別的容錯機制。緩存

批處理系統比較容易實現容錯機制,因爲文件能夠重複訪問,當個某個任務失敗後,重啓該任務便可。可是到了流處理系統,因爲數據源是無限的數據流,從而致使一個流處理任務執行幾個月的狀況,將全部數據緩存或是持久化,留待之後重複訪問基本上是不可行的。Flink基於分佈式快照與可部分重發的數據源實現了容錯。用戶可自定義對整個Job進行快照的時間間隔,當任務失敗時,Flink會將整個Job恢復到最近一次快照,並從數據源重發快照以後的數據。網絡

Flink的分佈式快照實現借鑑了Chandy和Lamport在1985年發表的一篇關於分佈式快照的論文,其實現的主要思想以下:按照用戶自定義的分佈式快照間隔時間,Flink會定時在全部數據源中插入一種特殊的快照標記消息,這些快照標記消息和其餘消息同樣在DAG中流動,可是不會被用戶定義的業務邏輯所處理,每個快照標記消息都將其所在的數據流分紅兩部分:本次快照數據和下次快照數據數據結構

圖3中Flink包含快照標記消息的消息流框架

快照標記消息沿着DAG流經各個操做符,當操做符處理到快照標記消息時,會對本身的狀態進行快照,並存儲起來。當一個操做符有多個輸入的時候,Flink會將先抵達的快照標記消息及其以後的消息緩存起來,當全部的輸入中對應該快照的快照標記消息所有抵達後,操做符對本身的狀態快照並存儲,以後處理全部快照標記消息以後的已緩存消息。操做符對本身的狀態快照並存儲能夠是異步與增量的操做,並不須要阻塞消息的處理。分佈式快照的流程如圖4所示:

 圖4 Flink分佈式快照流程圖

當全部的Data Sink(終點操做符)都收到快照標記信息並對本身的狀態快照和存儲後,整個分佈式快照就完成了,同時通知數據源釋放該快照標記消息以前的全部消息。若以後發生節點崩潰等異常狀況時,只須要恢復以前存儲的分佈式快照狀態,並從數據源重發該快照之後的消息就能夠了。

Exactly-Once是流處理系統須要支持的一個很是重要的特性,它保證每一條消息只被流處理系統一次,許多流處理任務的業務邏輯都依賴於Exactly-Once特性。相對於At-Least-Once或是At-Most-Once,Exactly-Once特性對流處理系統的要求更爲嚴格,實現也更加困難。Flink基於分佈式快照實現了Exactly-Once特性。

相對於其餘流處理系統的容錯方案,Flink基於分佈式快照的方案在功能和性能方面都具備不少優勢,包括:

  • 低延遲。因爲操做符狀態的存儲能夠異步,因此進行快照的過程基本上不會阻塞消息的處理,所以不會對消息延遲產生負面影響。
  • 高吞吐量。當操做符狀態較少時,對吞吐量基本沒有影響。當操做符狀態較多時,相對於其餘的容錯機制,分佈式快照的時間間隔是用戶自定義的,因此用戶能夠權衡錯誤恢復時間和吞吐量要求來調整分佈式快照的時間間隔。

與業務邏輯的隔離。Flink的分佈式快照機制與用戶的業務邏輯是徹底隔離的,用戶的業務邏輯不會依賴或是對分佈式快照產生任何影響。

錯誤恢復代價。分佈式快照的時間間隔越短,錯誤恢復的時間越少,與吞吐量負相關。

2.3 Flink流處理的時間窗口

對於流處理系統來講,流入的消息不存在上限,因此對於聚合或是鏈接等操做,流處理系統須要對流入的消息進行分段,而後基於每一段數據進行聚合或是鏈接。消息的分段即稱爲窗口,流處理系統支持的窗口有不少類型,最多見的就是時間窗口,基於時間間隔對消息進行分段處理。本節主要介紹Flink流處理系統支持的各類時間窗口。

對於目前大部分流處理系統來講,時間窗口通常是根據Task所在節點的本地時鐘進行切分,這種方式實現起來比較容易,不會產生阻塞。可是可能沒法知足某些應用需求,好比:①消息自己帶有時間戳,用戶但願按照消息自己的時間特性進行分段處理;②因爲不一樣節點的時鐘可能不一樣,以及消息在流經各個節點的延遲不一樣,在某個節點屬於同一個時間窗口處理的消息,流到下一個節點時可能被切分到不一樣的時間窗口中,從而產生不符合預期的結果。

Flink支持3種類型的時間窗口,分別適用於用戶對時間窗口不一樣類型的要求:

  • Operator Time。根據Task所在節點的本地時鐘來切分的時間窗口。
  • Event Time。消息自帶時間戳,根據消息的時間戳進行處理,確保時間戳在同一個時間窗口的全部消息必定會被正確處理。因爲消息可能亂序流入Task,因此Task須要緩存當前時間窗口消息處理的狀態,直到確認屬於該時間窗口的全部消息都被處理,才能夠釋放,若是亂序的消息延遲很高會影響分佈式系統的吞吐量和延遲。
  • Ingress Time。有時消息自己並不帶時間戳信息,但用戶依然但願按照消息而不是節點時鐘劃分時間窗口,例如避免上面提到的第二個問題,此時能夠在消息源流入Flink流處理系統時自動生成增量的時間戳賦予消息,以後處理的流程與Event Time相同。Ingress Time能夠當作是Event Time的一個特例,因爲其在消息源處時間戳必定是有序的,因此在流處理系統中,相對於Event Time,其亂序的消息延遲不會很高,所以對Flink分佈式系統的吞吐量和延遲的影響也會更小。

2.4 定製的內存管理

Flink項目基於Java及Scala等JVM語言,JVM自己做爲一個各類類型應用的執行平臺,其對Java對象的管理也是基於通用的處理策略,其垃圾回收器經過估算Java對象的生命週期對Java對象進行有效率的管理。

JVM存在的問題

Java對象開銷

相對於C/C++等更加接近底層的語言,Java對象的存儲密度相對偏低,例如[1],"abcd"這樣簡單的字符串在UTF-8編碼中須要4個字節存儲,但採用了UTF-16編碼存儲字符串的Java須要8個字節,同時Java對象還有header等其餘額外信息,一個4字節字符串對象在Java中須要48字節的空間來存儲。對於大部分的大數據應用,內存都是稀缺資源,更有效率的內存存儲,意味着CPU數據訪問吐吞量更高,以及更少磁盤落地的存在。

對象存儲結構引起的cache miss

爲了緩解CPU處理速度與內存訪問速度的差距,現代CPU數據訪問通常都會有多級緩存。當從內存加載數據到緩存時,通常是以cache line爲單位加載數據,因此當CPU訪問的數據若是是在內存中連續存儲的話,訪問的效率會很是高。若是CPU要訪問的數據不在當前緩存全部的cache line中,則須要從內存中加載對應的數據,這被稱爲一次cache miss。當cache miss很是高的時候,CPU大部分的時間都在等待數據加載,而不是真正的處理數據。Java對象並非連續的存儲在內存上,同時不少的Java數據結構的數據彙集性也很差。

大數據的垃圾回收

Java的垃圾回收機制一直讓Java開發者又愛又恨,一方面它免去了開發者本身回收資源的步驟,提升了開發效率,減小了內存泄漏的可能,另外一方面垃圾回收也是Java應用的不定時炸彈,有時秒級甚至是分鐘級的垃圾回收極大影響了Java應用的性能和可用性。在時下數據中心,大容量內存獲得了普遍的應用,甚至出現了單臺機器配置TB內存的狀況,同時,大數據分析一般會遍歷整個源數據集,對數據進行轉換、清洗、處理等步驟。在這個過程當中,會產生海量的Java對象,JVM的垃圾回收執行效率對性能有很大影響。經過JVM參數調優提升垃圾回收效率須要用戶對應用和分佈式計算框架以及JVM的各參數有深刻了解,並且有時候這也遠遠不夠。

OOM問題

OutOfMemoryError是分佈式計算框架常常會遇到的問題,當JVM中全部對象大小超過度配給JVM的內存大小時,就會出現OutOfMemoryError錯誤,JVM崩潰,分佈式框架的健壯性和性能都會受到影響。經過JVM管理內存,同時試圖解決OOM問題的應用,一般都須要檢查Java對象的大小,並在某些存儲Java對象特別多的數據結構中設置閾值進行控制。可是JVM並無提供官方檢查Java對象大小的工具,第三方的工具類庫可能沒法準確通用地肯定Java對象大小[6]。侵入式的閾值檢查也會爲分佈式計算框架的實現增長不少額外與業務邏輯無關的代碼。

Flink的處理策略

爲了解決以上提到的問題,高性能分佈式計算框架一般須要如下技術:

定製的序列化工具

顯式內存管理的前提步驟就是序列化,將Java對象序列化成二進制數據存儲在內存上(on heap或是off-heap)。通用的序列化框架,如Java默認使用java.io.Serializable將Java對象及其成員變量的全部元信息做爲其序列化數據的一部分,序列化後的數據包含了全部反序列化所需的信息。這在某些場景中十分必要,可是對於Flink這樣的分佈式計算框架來講,這些元數據信息多是冗餘數據。

分佈式計算框架可使用定製序列化工具的前提是要待處理數據流一般是同一類型,因爲數據集對象的類型固定,從而能夠只保存一份對象Schema信息,節省大量的存儲空間。同時,對於固定大小的類型,也可經過固定的偏移位置存取。在須要訪問某個對象成員變量時,經過定製的序列化工具,並不須要反序列化整個Java對象,而是直接經過偏移量,從而只須要反序列化特定的對象成員變量。若是對象的成員變量較多時,可以大大減小Java對象的建立開銷,以及內存數據的拷貝大小。Flink數據集都支持任意Java或是Scala類型,經過自動生成定製序列化工具,既保證了API接口對用戶友好(不用像Hadoop那樣數據類型須要繼承實現org.apache.hadoop.io.Writable接口),也達到了和Hadoop相似的序列化效率。

Flink對數據集的類型信息進行分析,而後自動生成定製的序列化工具類。Flink支持任意的Java或是Scala類型,經過Java Reflection框架分析基於Java的Flink程序UDF(User Define Function)的返回類型的類型信息,經過Scala Compiler分析基於Scala的Flink程序UDF的返回類型的類型信息。類型信息由TypeInformation類表示,這個類有諸多具體實現類,例如:

  • BasicTypeInfo任意Java基本類型(裝包或未裝包)和String類型。
  • BasicArrayTypeInfo任意Java基本類型數組(裝包或未裝包)和String數組。
  • WritableTypeInfo任意Hadoop的Writable接口的實現類。
  • TupleTypeInfo任意的Flink tuple類型(支持Tuple1 to Tuple25)。 Flink tuples是固定長度固定類型的Java Tuple實現。
  • CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo任意的POJO (Java or Scala),例如Java對象的全部成員變量,要麼是public修飾符定義,要麼有getter/setter方法。
  • GenericTypeInfo任意沒法匹配以前幾種類型的類。

前6種類型數據集幾乎覆蓋了絕大部分的Flink程序,針對前6種類型數據集,Flink皆能夠自動生成對應的TypeSerializer定製序列化工具,很是有效率地對數據集進行序列化和反序列化。對於第7種類型,Flink使用Kryo進行序列化和反序列化。此外,對於可被用做Key的類型,Flink還同時自動生成TypeComparator,用來輔助直接對序列化後的二進制數據直接進行compare、hash等操做。對於Tuple、CaseClass、Pojo等組合類型,Flink自動生成的TypeSerializer、TypeComparator一樣是組合的,並把其成員的序列化/反序列化代理給其成員對應的TypeSerializer、TypeComparator,如圖6所示:

圖6組合類型序列化

此外若有須要,用戶可經過集成TypeInformation接口定製實現本身的序列化工具。

顯式的內存管理

 通常通用的作法是批量申請和釋放內存,每一個JVM實例有一個統一的內存管理器,全部內存的申請和釋放都經過該內存管理器進行。這能夠避免常見的內存碎片問題,同時因爲數據以二進制的方式存儲,能夠大大減輕垃圾回收壓力。

垃圾回收是JVM內存管理迴避不了的問題,JDK8的G1算法改善了JVM垃圾回收的效率和可用範圍,但對於大數據處理實際環境還遠遠不夠。這也和如今分佈式框架的發展趨勢有所衝突,愈來愈多的分佈式計算框架但願儘量多地將待處理數據集放入內存,而對於JVM垃圾回收來講,內存中Java對象越少、存活時間越短,其效率越高。經過JVM進行內存管理的話,OutOfMemoryError也是一個很難解決的問題。同時,在JVM內存管理中,Java對象有潛在的碎片化存儲問題(Java對象全部信息可能在內存中連續存儲),也有可能在全部Java對象大小沒有超過JVM分配內存時,出現OutOfMemoryError問題。Flink將內存分爲3個部分,每一個部分都有不一樣用途:

  • Network buffers: 一些以32KB Byte數組爲單位的buffer,主要被網絡模塊用於數據的網絡傳輸。
  • Memory Manager pool大量以32KB Byte數組爲單位的內存池,全部的運行時算法(例如Sort/Shuffle/Join)都從這個內存池申請內存,並將序列化後的數據存儲其中,結束後釋放回內存池。
  • Remaining (Free) Heap主要留給UDF中用戶本身建立的Java對象,由JVM管理。

Network buffers在Flink中主要基於Netty的網絡傳輸,無需多講。Remaining Heap用於UDF中用戶本身建立的Java對象,在UDF中,用戶一般是流式的處理數據,並不須要不少內存,同時Flink也不鼓勵用戶在UDF中緩存不少數據,由於這會引發前面提到的諸多問題。Memory Manager pool(之後之內存池代指)一般會配置爲最大的一塊內存,接下來會詳細介紹。

在Flink中,內存池由多個MemorySegment組成,每一個MemorySegment表明一塊連續的內存,底層存儲是byte[],默認32KB大小。MemorySegment提供了根據偏移量訪問數據的各類方法,如get/put int、long、float、double等,MemorySegment之間數據拷貝等方法和java.nio.ByteBuffer相似。對於Flink的數據結構,一般包括多個向內存池申請的MemeorySegment,全部要存入的對象經過TypeSerializer序列化以後,將二進制數據存儲在MemorySegment中,在取出時經過TypeSerializer反序列化。數據結構經過MemorySegment提供的set/get方法訪問具體的二進制數據。Flink這種看起來比較複雜的內存管理方式帶來的好處主要有:

  • 二進制的數據存儲大大提升了數據存儲密度,節省了存儲空間。
  • 全部的運行時數據結構和算法只能經過內存池申請內存,保證了其使用的內存大小是固定的,不會由於運行時數據結構和算法而發生OOM。對於大部分的分佈式計算框架來講,這部分因爲要緩存大量數據最有可能致使OOM。
  • 內存池雖然佔據了大部份內存,但其中的MemorySegment容量較大(默認32KB),因此內存池中的Java對象其實不多,並且一直被內存池引用,全部在垃圾回收時很快進入持久代,大大減輕了JVM垃圾回收的壓力。
  • Remaining Heap的內存雖然由JVM管理,可是因爲其主要用來存儲用戶處理的流式數據,生命週期很是短,速度很快的Minor GC就會所有回收掉,通常不會觸發Full GC。

Flink當前的內存管理在最底層是基於byte[],因此數據最終仍是on-heap,最近Flink增長了off-heap的內存管理支持。Flink off-heap的內存管理相對於on-heap的優勢主要在於:

  • 啓動分配了大內存(例如100G)的JVM很耗費時間,垃圾回收也很慢。若是採用off-heap,剩下的Network buffer和Remaining heap都會很小,垃圾回收也不用考慮MemorySegment中的Java對象了。
  • 更有效率的IO操做。在off-heap下,將MemorySegment寫到磁盤或是網絡能夠支持zeor-copy技術,而on-heap的話則至少須要一次內存拷貝。
  • off-heap可用於錯誤恢復,好比JVM崩潰,在on-heap時數據也隨之丟失,但在off-heap下,off-heap的數據可能還在。此外,off-heap上的數據還能夠和其餘程序共享。

緩存友好的計算

對於計算密集的數據結構和算法,直接操做序列化後的二進制數據,而不是將對象反序列化後再進行操做。同時,只將操做相關的數據連續存儲,能夠最大化的利用L1/L2/L3緩存,減小Cache miss的機率,提高CPU計算的吞吐量。以排序爲例,因爲排序的主要操做是對Key進行對比,若是將全部排序數據的Key與Value分開並對Key連續存儲,那麼訪問Key時的Cache命中率會大大提升。

磁盤IO和網絡IO以前一直被認爲是Hadoop系統的瓶頸,可是隨着Spark、Flink等新一代分佈式計算框架的發展,愈來愈多的趨勢使得CPU/Memory逐漸成爲瓶頸,這些趨勢包括:

  • 更先進的IO硬件逐漸普及。10GB網絡和SSD硬盤等已經被愈來愈多的數據中心使用。
  • 更高效的存儲格式。Parquet,ORC等列式存儲被愈來愈多的Hadoop項目支持,其很是高效的壓縮性能大大減小了落地存儲的數據量。
  • 更高效的執行計劃。例如不少SQL系統執行計劃優化器的Fliter-Push-Down優化會將過濾條件儘量的提早,甚至提早到Parquet的數據訪問層,使得在不少實際的工做負載中並不須要不少的磁盤IO。

3. Flink vs Spark

經過比較spark,瞭解flink的做用和優缺點,主要從設計抽象、內存管理、語言實現,以及API和SQL等方面來描述。

3.1 設計抽象

接觸過 Spark 的同窗,應該比較熟悉,在處理批處理任務,可使用 RDD,而對於流處理,可使用 Streaming,然其實際仍是 RDD,因此本質上仍是 RDD 抽象而來。可是,在 Flink 中,批處理用 DataSet,對於流處理,有 DataStreams。思想相似,但卻有所不一樣:其一,DataSet 在運行時表現爲 Runtime Plans,而在 Spark 中,RDD 在運行時表現爲 Java Objects。在 Flink 中有 Logical Plan ,這和 Spark 中的 DataFrames 相似。於是,在 Flink 中,如果使用這類 API ,會被優先來優化(即:自動優化迭代)。然而,在 Spark 中,RDD 就沒有這塊的相關優化。

另外,DataSet 和 DataStream 是相對獨立的 API,在 Spark 中,全部不一樣的 API,好比 Streaming,DataFrame 都是基於 RDD 抽象的。然而在 Flink 中,DataSet 和 DataStream 是同一個公用引擎之上的兩個獨立的抽象。因此,不能把這二者的行爲合併在一塊兒操做,目前官方正在處理這種問題。

3.2 內存

在以前的版本(1.5之前),Spark 延用 Java 的內存管理來作數據緩存,這樣很容易致使 OOM 或者 GC。以後,Spark 開始轉向另外更加友好和精準的控制內存,即:Tungsten 項目。然而,對於 Flink 來講,從一開始就堅持使用本身控制內存。Flink 除把數據存在本身管理的內存以外,還直接操做二進制數據。在 Spark 1.5以後的版本開始,全部的 DataFrame 操做都是直接做用於 Tungsten 的二進制數據上。

  PS:Tungsten 項目將是 Spark 自誕生以來內核級別的最大改動,以大幅度提高 Spark 應用程序的內存和 CPU 利用率爲目標,旨在最大程度上利用硬件性能。該項目包括了三個方面的改進:

  1. 內存管理和二進制處理:更加明確的管理內存,消除 JVM 對象模型和垃圾回收開銷。
  2. 緩存友好計算:使用算法和數據結構來實現內存分級結構。
  3. 代碼生成:使用代碼生成來利用新型編譯器和 CPU。

3.2 編程語言

Spark 使用 Scala 來實現的,它提供了 Java,Python 以及 R 語言的編程接口。而對於 Flink 來講,它是使用 Java 實現的,提供 Scala 編程 API。從編程語言的角度來看,Spark 略顯豐富一些。Spark 和 Flink 二者都傾向於使用 Scala 來實現對應的業務。

3.2 SQL

目前,Spark SQL 是其組件中較爲活躍的一部分,它提供了相似於 Hive SQL 來查詢結構化數據,API 依然很成熟。對於 Flink 來講,支持 Flink Table API。

總結

參考資料:

https://www.cnblogs.com/smartloli/p/5580757.html

https://www.cnblogs.com/feiyudemeng/p/8998772.html

https://yq.aliyun.com/articles/600173

相關文章
相關標籤/搜索