flink流處理內容

Flink核心是一個流式的數據流執行引擎,其針對數據流的分佈式計算提供了數據分佈、數據通訊以及容錯機制等功能java

Flink提供了諸多更高抽象層的API以便用戶編寫分佈式任務:算法

DataSet API, 對靜態數據進行批處理操做,將靜態數據抽象成分佈式的數據集,用戶能夠方便地使用Flink提供的各類操做符對分佈式數據集進行處理,支持Java、Scala和PythonDataStream API,對數據流進行流處理操做,將流式的數據抽象成分佈式的數據流,用戶能夠方便地對分佈式數據流進行各類操做,支持Java和Scalaapache

Table API,結構化數據進行查詢操做,將結構化數據抽象成關係表,並經過類SQL的DSL對關係表進行各類查詢操做,支持Java和Scala數組

對於一個流處理系統,其節點間數據傳輸的標準模型是:當一條數據被處理完成後,序列化到緩存中,而後馬上經過網絡傳輸到下一個節點,由下一個節點繼續處理緩存

而對於一個批處理系統,其節點間數據傳輸的標準模型是:當一條數據被處理完成後,序列化到緩存中,並不會馬上經過網絡傳輸到下一個節點,當緩存寫滿,就持久化到本地硬盤上,當全部數據都被處理完成後,纔開始將處理後的數據經過網絡傳輸到下一個節點網絡

這兩種數據傳輸模式是兩個極端,對應的是流處理系統對低延遲的要求和批處理系統對高吞吐量的要求數據結構

Flink的執行引擎同時支持了這兩種數據傳輸模型,Flink以固定的緩存塊爲單位進行網絡數據傳輸,用戶能夠經過緩存塊超時值指定緩存塊的傳輸時機,超時值爲0,則是流處理系統的標準模型,此時能夠得到最低的處理延遲,緩存塊的超時值爲無限大,則Flink的數據傳輸方式相似上文所提到批處理系統的標準模型。併發

緩存塊的超時閾值越小,則流處理數據的延遲越低,但吞吐量也會變低。根據超時閾值來靈活權衡系統延遲和吞吐量。Flink基於分佈式快照與可部分重發的數據源實現了容錯。框架

用戶可自定義對整個Job進行快照的時間間隔,當任務失敗時,Flink會將整個Job恢復到最近一次快照,並從數據源重發快照以後的數據。異步

按照用戶自定義的快照間隔時間,flink會定時在數據源中插入快照標記的消息,快照消息和普通消息都在DAG中流動,但不會被用戶定義的邏輯所處理,每個快照消息都將其所在的數據流分紅2部分:本次快照數據和下次快照數據。當操做符處理到快照標記消息,對本身的狀態進行快照標記並緩存。操做符對本身的快照和狀態能夠是異步,增量操做,並不阻塞消息處理。當全部的終點操做符都收到快照標記信息並對本身的狀態快照和存儲後,整個分佈式快照就完成了。同時通知數據源釋放該快照標記消息以前的全部消息。若以後的節點崩潰等異常,就能夠恢復分佈式快照狀態。並從數據源重發該快照之後的消息。

flink基於分佈式快照實現了一次性。

 

目前大部分流處理系統來講,時間窗口通常是根據Task所在節點的本地時鐘進行切分,

是可能沒法知足某些應用需求,好比:

消息自己帶有時間戳,用戶但願按照消息自己的時間特性進行分段處理。

因爲不一樣節點的時鐘可能不一樣,以及消息在流經各個節點延遲不一樣,在某個節點屬於同一個時間窗口處理的消息,流到下一個節點時可能被切分到不一樣的時間窗口中,從而產生不符合預期的結果

Flink支持3種類型的時間窗口:

1.Operator Time。根據Task所在節點的本地時鐘來切分的時間窗口

2.Event Time。消息自帶時間戳,根據消息的時間戳進行處理,確保時間戳在同一個時間窗口的全部消息必定會被正確處理。因爲消息可能亂序流入Task,因此Task須要緩存當前時間窗口消息處理的狀態,直到確認屬於該時間窗口的全部消息都被處理,才能夠釋放,若是亂序的消息延遲很高會影響分佈式系統的吞吐量和延遲

3.ingress Time。有時消息自己並不帶有時間戳信息,但用戶依然但願按照消息而不是節點時鐘劃分時間窗口,例如避免上面提到的第二個問題,此時能夠在消息源流入Flink流處理系統時自動生成增量的時間戳賦予消息,以後處理的流程與Event Time相同。Ingress Time能夠當作是Event Time的一個特例,因爲其在消息源處時間戳必定是有序的,因此在流處理系統中,相對於Event Time,其亂序的消息延遲不會很高,所以對Flink分佈式系統的吞吐量和延遲的影響也會更小。

操做符經過基於Event Time的時間窗口來處理數據時,它必須在肯定全部屬於該時間窗口的消息所有流入此操做符後才能開始數據處理。可是因爲消息多是亂序的,因此操做符沒法直接確認什麼時候全部屬於該時間窗口的消息所有流入此操做符。WaterMark包含一個時間戳,Flink使用WaterMark標記全部小於該時間戳的消息都已流入

一個可能的優化措施是,對於聚合類的操做符,能夠提早對部分消息進行聚合操做,當有屬於該時間窗口的新消息流入時,基於以前的部分聚合結果繼續計算,這樣的話,只需緩存中間計算結果便可,無需緩存該時間窗口的全部消息

flink基於watermark實現了基於時間戳的全局排序:

排序操做:排序操做符緩存全部流入的消息,當接收到watermark時,對時間戳小於該watermark的消息進行排序,併發送到下一個節點。在此排序操做符中釋放全部時間戳小於該watermark的消息,繼續緩存流入的消息。等待下一次watermark觸發下一次排序。

watermark保證了其以後不會出現時間戳比它小的消息,所以能夠保證排序的正確性。請注意:排序操做符有多個節點,只能保證每一個節點流出的消息有序,節點之間的消息不能有序,要實現全局有序,則只能有一個排序操做符節點。

Java對象的存儲密度相對偏低,例如[1],「abcd」這樣簡單的字符串在UTF-8編碼中須要4個字節存儲

採用了UTF-16編碼存儲字符串的Java則須要8個字節,同時Java對象還有header等其餘額外信息,一個4字節字符串對象在Java中須要48字節的空間來存儲。對於大部分的大數據應用,內存都是稀缺資源,更有效率地內存存儲,意味着CPU數據訪問吞吐量更高,以及更少磁盤落地的存在。

垃圾回收也是Java應用的不定時炸彈,有時秒級甚至是分鐘級的垃圾回收極大影響了Java應用的性能和可用性。

經過JVM參數調優提升垃圾回收效率須要用戶對應用和分佈式計算框架以及JVM的各參數有深刻了解,並且有時候這也遠遠不夠:

 

爲了解決以上提到的問題,高性能分佈式計算框架一般須要如下技術:

Flink的處理策略:

定製的序列化工具,顯式內存管理的前提步驟就是序列化,用的序列化框架,如Java默認使用java.io.Serializable

制的序列化框架,如Hadoop的org.apache.hadoop.io.Writable須要用戶實現該接口並自定義類的序列化和反序列化方法。這種方式效率最高。

對於計算密集的數據結構和算法,直接操做序列化後的二進制數據,而不是將對象反序列化後再進行操做。

緩存友好的數據結構和算法對於計算密集的數據結構和算法,直接操做序列化後的二進制數據,而不是將對象反序列化後再進行操做。同時,只將操做相關的數據連續存儲,能夠最大化的利用L1/L2/L3緩存,減小Cache miss的機率,提高CPU計算的吞吐量。以排序爲例,因爲排序的主要操做是對Key進行對比,若是將全部排序數據的Key與Value分開並對Key連續存儲,那麼訪問Key時的Cache命中率會大大提升

分佈式計算框架可使用定製序列化工具的前提是要待處理數據流一般是同一類型,因爲數據集對象的類型固定,從而能夠只保存一份對象Schema信息,節省大量的存儲空間

對於固定大小的類型,也可經過固定的偏移位置存取。在須要訪問某個對象成員變量時,經過定製的序列化工具,並不須要反序列化整個Java對象,而是直接經過偏移量,從而只須要反序列化特定的對象成員變量。若是對象的成員變量較多時,可以大大減小Java對象的建立開銷,以及內存數據的拷貝大小Flink數據集支持任意Java或是Scala類型,經過自動生成定製序列化工具,既保證了API接口對用戶友好(不用像Hadoop那樣數據類型需要繼承實現org.apache.hadoop.io.Writable接口),也達到了和Hadoop相似的序列化效率

Flink對數據集的類型信息進行分析,然後自動生成定製的序列化工具類。Flink支持任意的Java或是Scala類型,經過Java Reflection框架分析基於Java的Flink程序UDF(User Define Function)的返回類型的類型信息,經過Scala Compiler分析基於Scala的Flink程序UDF的返回類型的類型信息。類型信息由TypeInformation類表示,這個類有諸多具體實現類

例如

1.BasicTypeInfo任意Java基本類型(裝包或未裝包)和String類型

 2.BasicArrayTypeInfo任意Java基本類型數組(裝包或未裝包)和String數組

3.WritableTypeInfo任意Hadoop的Writable接口的實現類

4.TupleTypeInfo任意的Flink tuple類型(支持Tuple1 to Tuple25)Flink tuples是固定長度固定類型的Java Tuple實

 5.CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)

 6.PojoTypeInfo任意的POJO (Java or Scala),Java對象的全部成員變量,要麼是public修飾符定義,要麼有getter/setter方法

7.GenericTypeInfo任意沒法匹配以前幾種類型的類。

前6種類型數據集幾乎覆蓋了絕大部分的Flink程序,針對前6種類型數據集,Flink皆能夠自動生成對應的TypeSerializer定製序列化工具,很是有效率地對數據集進行序列化和反序列化

對於第7種類型,Flink使用Kryo進行序列化和反序列化

對於可被用做Key的類型,Flink還同時自動生成TypeComparator,用來輔助直接對序列化後的二進制數據直接進行compare、hash等操做

對於Tuple、CaseClass、Pojo組合類型,Flink自動生成的TypeSerializerTypeComparator一樣是組合的,並把其成員的序列化/反序列化代理給其成員對應的TypeSerializer、TypeComparator,如圖6所示:

此外若有須要,用戶可經過集成TypeInformation接口定製實現本身的序列化工具

JDK8的G1算法改善了JVM垃圾回收的效率和可用範圍

經過JVM進行內存管理的話,OutOfMemoryError也是一個很難解決的問題

在JVM內存管理中,Java對象有潛在的碎片化存儲問題

Flink將內存分爲3個部分,每一個部分都有不一樣用途:

1.Network buffers: 一些以32KB Byte數組爲單位buffer,主要被網絡模塊用於數據的網絡傳輸,基於Netty的網絡傳輸

2.Memory Manager pool大量以32KB Byte數組爲單位的內存池,全部的運行時算法(例如Sort/Shuffle/Join)都從這個內存池申請內存並將序列化後的數據存儲其中,結束後釋放回內存池。一般會配置爲最大的一塊內存,

3. Remaining (Free) Heap主要留給UDF中用戶本身建立的Java對象,由JVM管理。Flink也不鼓勵用戶在UDF中緩存不少數據。。 Remaining Heap的內存雖然由JVM管理,可是因爲其主要用來存儲用戶處理的流式數據,生命週期很是短,速度很快的Minor GC就會所有回收掉,通常不會觸發Full GC

在Flink中,內存池由多個MemorySegment組成,每一個MemorySegment表明一塊連續的內存,底層存儲是byte[],默認32KB大小。

MemorySegment提供了根據偏移量訪問數據的各類方法,如get/put int、long、float、double等,MemorySegment之間數據拷貝等方法和java.nio.ByteBuffer相似。

於Flink的數據結構,一般包括多個向內存池申請的MemeorySegment,全部要存入的對象經過TypeSerializer序列化以後,將二進制數據存儲在MemorySegment中,在取出時經過TypeSerializer反序列化

數據結構經過MemorySegment提供的set/get方法訪問具體的二進制數據

Flink這種看起來比較複雜的內存管理方式帶來的好處主要有:

1.二進制的數據存儲大大提升了數據存儲密度,節省了存儲空間。全部的運行時數據結構和算法只能經過內存池申請內存,保證了其使用的內存大小是固定的,不會由於運行時數據結構和算法而發生OOM

Flink當前的內存管理在最底層是基於byte[],

flink排序算法的實現:

1.將待排序的數據通過序列化存儲在兩個不一樣的MemorySegment集中,數據所有的序列化值存放於其中一個MemorySegment集中。數據序列化後的Key和指向第一個MemorySegment集中值的指針存放於第二個MemorySegment集中。對第二個MemorySegment集中的Key進行排序,如需交換Key位置,只需交換對應的Key+Pointer的位置,第一個MemorySegment集中的數據無需改變。 當比較兩個Key大小時,TypeComparator提供了直接基於二進制數據的對比方法,無需反序列化任何數據。排序完成後,訪問數據時,按照第二個MemorySegment集中Key的順序訪問,並經過Pointer值找到數據在第一個MemorySegment集中的位置,經過TypeSerializer反序列化成Java對象返回。

經過Key和Full data分離存儲的方式儘可能將被操做的數據最小化,提升Cache命中的機率,從而提升CPU的吞吐量。 移動數據時,只需移動Key+Pointer,而無須移動數據自己,大大減小了內存拷貝的數據量。 TypeComparator直接基於二進制數據進行操做,節省了反序列化的時間。

DataSet API級別的執行計劃優化器,原生的迭代操做符等,

相關文章
相關標籤/搜索