Apache Flink流式處理

花了四小時,看完Flink的內容,基本瞭解了原理。 挖個坑,待總結後填一下。html

2019-06-02 01:22:57等歐冠決賽中,填坑。redis

1、概述

storm最大的特色是快,它的實時性很是好(毫秒級延遲)。爲了低延遲它犧牲了高吞吐,而且不能保證exactly once語義。sql

在低延遲和高吞吐的流處理中,維持良好的容錯是很是困難的,但爲了獲得有保障的準確狀態,人們想到一種替代方法:將連續時間中的流數據分割成一系列微小的批量做業(微批次處理)。若是分割得足夠小,計算幾乎能夠實現真正的流處理。由於存在延遲,因此不可能作到徹底實時,可是每一個簡單的應用程序均可以實現僅有幾秒甚至幾亞秒的延遲。這就是Spark Streaming所使用的方法。apache

爲了實現高吞吐和exactly once語義,storm推出了storm trident,也是使用了微批次處理的方法。編程

微批次處理缺點:架構

  1. 數據只能按固定時間分割,沒有辦法根據實際數據狀況,進行不一樣批次或每一個批次不一樣大小的分割app

  2. 知足不了對數據實時性要求很是高的數據框架

初識flink

flink與storm,spark streaming的比較分佈式

 

Apache flink主頁在其頂部展現了該項目的理念:「Apache Flink是爲分佈式,高性能,隨時可用以及準確的流處理應用程序打造的開源流處理框架」ide

流處理與批處理:

批處理的特色是有界、持久(數據已經落地)、大量,批處理很是適合須要訪問全套記錄才能完成的計算工做,通常用於離線統計。典型的是Hadoop,它只能用於批處理。

流處理的特色是無界、實時,流處理方式無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做,通常用於實時統計。典型的是storm,它只能進行流處理。

 

有沒有即能實現批處理,也能實現流處理?

spark即能進行流處理,也能實現批處理,但它並非在同一架構體系下,spark的批處理是經過spark core和spark sql實現,流處理是經過spark streaming實現。

flink什麼特色呢?不論是批處理仍是流處理,它可以同時進行處理,由於它底層是不區分流批的。flink將批處理(即處理有限的靜態數據)視做一種特殊的流處理。

2、flink基本架構

JobManager與TaskManager

若是粗化一點看的話,flink就是由2部分組成,即JobManager和TaskManager,是兩個JVM進程。

JobManager:也稱爲master(對應spark裏的driver),用於協調分佈式執行,它們用來調度task,協調檢查點,協調失敗回覆等。flink運行時至少存在一個master處理器,若是配置高可用模式則會存在多個master,它們其中一個是leader,而其它都是standby.

TaskManager:也稱爲worker(對應spark裏面的executor),用於執行一個dataflow的task,數據緩衝和datastream的交換,flink運行時至少會存在一個worker處理器。

flink的編程模型

 

Stateful Stream Processing,是數據接入,計算,輸出都是本身來實現,是最靈活也是最麻煩的編程接口。

DataStream/DataSet API是針對流和批處理的封裝API,絕大多數的編程是在這一層。

Table API,是將數據抽像成一張表,提供select, group_by等API接口供調用。

SQL是最高級的接口,支持直接寫SQL查詢數據。

3、flink運行架構

任務提交流程

當啓動新的Flink YARN會話時,客戶端首先檢查請求的資源(容器和內存)是否可用。以後,它將包含flink的jar和配置上傳到HDFS(步驟1)。

客戶端的下一步是請求(步驟2)YARN容器以啓動ApplicationMaster(步驟3)。因爲客戶端將配置和jar文件註冊爲容器的資源,所以在該特定機器上運行的YARN的NodeManager將負責準備容器(例如,下載文件)。一旦完成,ApplicationMaster(AM)就會啓動。

該JobManager和AM在同一容器中運行。成功啓動後,AM就很容易知道JobManager的地址(它本身的主機)。它爲TaskManagers生成一個新的Flink配置文件(以便它們能夠鏈接到JobManager)。該文件也被上傳到HDFS。此外,AM容器還提供Flink的Web界面。YARN代碼分配的全部端口都是臨時端口。這容許用戶並行執行多個Flink YARN會話。

以後,AM開始爲Flink的TaskManagers分配容器,它將從HDFS下載jar文件和修改後的配置。完成這些步驟後,Flink即會設置並準備接受做業。

這個提交Yarn Session的整個過程,Yarn Session提交完成後,JobManager和TaskManager就啓動完畢,等待用戶任務的提交(jar包)

任務調度原理

Task Slot只平均分配內存,不分CPU!Slot指的是TaskManager可以並行執行的task最大數。

客戶端將程序抽像成Datafow Graph,並能過Actor System通訊,將程序提交到JobManager,JobManager根據Dataflow Graph(相似Spark中的DAG),兩個相臨任務間的並行度變化,來劃分任務,並將任務提交到TaskManager的Task Slot去執行。

TaskManager是一個獨立的JVM進程,TaskManager和Slot能夠看做worker pool模型,Slot是一個Worker,若是TaskManager裏有Slot,才能被分配任務。

一個Slot裏的Task,可能包含多個算子。Task按distributed進行劃分,也就算子是否產生shuffle(spark裏shuffle==flink的distributed).

若是有3個TaskManager,第個TaskManager中有3個Slot,那麼最高支持的並行度是9,parallelism.default=9.

程序與數據流

flink程序的基礎構架模塊是流(streams)和轉換(transformation)

fink經過source將流接進來,經過transformation算子對流進行轉換,再經過sink將數據輸出,這是一個flink程序的完整過程。

 

並行數據流

 

flink程序的執行具備並行、分佈式的特性。在執行過程當中,一個stream包含一個或多個stream partition,而每一個operator包含一個或多個operator subtask,這些operator subtask在不一樣的線程、不一樣的物理機或不一樣的容器中彼此互不依賴的執行。

一個特定的operator的subtask的個數被稱之爲其parallelism(並行度)。一個程序中,不一樣的operator可能具備不一樣的並行度。

stream在operator之間的傳輸數據的形式能夠是one-to-one(forwarding)的模式,也能夠是redistributing的模式,具體哪一種形式取決於operator的種類。

如上圖map是one-to-one模式,而keyBy,window,apply是redistributing模式

one-to-one的算子,會被組合在一塊兒成爲operator-chain,一個operator-chain被分紅一個task去執行。

 

4、DataStream API

flink程序結構

每一個flink程序都包含如下的若干個流程:

  1. 獲取一個執行環境:execution enviroment

  2. 加載/建立初始數據:source

  3. 指定轉換這些數據:transformation

  4. 指定放置計算結果的位置:sink

  5. 觸發程序執行

Transformation

  • Map操做

遍歷一個集合的全部元素,並對每一個元素作轉換

輸入一個參數,產生一個輸出。

steam.map(item => item * 2)

  • FlatMap操做

輸入一個參數,產生0個、1個或多個輸出。

stream.flatMap(item => item.split(「 」))

  • Filter操做

結算每一個元素的布爾值,並返回布爾值爲true的元素。

stream.filter(item => item == 1)

  • Connect操做

DataStream1, DataStream2 -> ConnectedStreams

在ConnectedStream的內部,stream仍是分開的,也就是說,想對ConnectedStream執行一個Map/Filter等操做,要傳入2個函數。第1個對DataStream1操做,第2個對DataStream2操做。

streamConnect  = stream1.connect(stream2)

streamConnect.map(item => item * 2, item => (item, 1L))

  • coMap, coFlatMap操做

ConnectedStreams -> DataStream

stream  = streamConnect.map(item => item * 2, item => (item, 1L))

輸入是一個ConnectedStream,輸出是一個普通的DataStream

  • Split + Select操做

DataStream -> SplitStream

val streamSplit = stream.split(word => (「haddoop」.equals(word)) match {

case true => List(「hadoop」)

case false => List(「other」)

}

)

上面split將流劃分紅2個流

val streamSelect001 = streamSplit.select(「hadoop」)

select 將指定的一個流取出來。

  • union操做

對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新的DataStream。

  • KeyBy

DataStream -> KeyedStream,輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同Key的元素,在內部以Hash的形式實現。

val env  = StreamExecutionEnvironment.getExecutionEnvironment

var stream = env.readTextFile(「test.txt」)

val streamMap = stream.flaMap(item => item.split(「 「)).map(item => (item, 1L))

val streamKeyBy = streamMap.keyBy(0) //keyBy能夠根據Tuple中的第一個元素,也能夠根據第二個元素,進行partition。0表明第一個元素。

  • Reduce操做

KeyedStream -> DataStream:一個分組數據流的聚合操做,合併當前元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

val streamReduce = streamKeyBy.reduce(

(item1, item2) => (item1._1, item1._2 + item2._2)

)

  • Fold操做

KeyedStream -> DataStream:一個有初始值的分組數據流的滾動摺疊操做。給窗口賦一個fold功能的函數,並返回一個fold後結果

  • Aggregation操做

5、Time與Window

時間

flink中有3個時間,

EventTime:事件生成的時間

IngestionTime:事件進入flink的時間

WindowProcessingTime:事件被處理的系統時間(默認使用)

窗口

不能直接對無界的流進行聚合,要先將流劃分爲window,再對window進行聚合。

window分爲兩類:

CountWindow:按照指定的數據條數,生成一個window,與時間無關

TimeWindow:按照時間生成window

對於TimeWindow,能夠根據窗口實現原理的不一樣分紅三類:

滾動窗口(Tumbing Window)—— 沒有重疊

滑動窗口(Sliding Window)—— 重疊,有窗口長度和滑動步長兩個屬性

會話窗口(Sessionn Window)—— 若是相臨的兩條數據,間隔時間超過會話窗口時間大小,則前面的數據生成一個窗口。

每知足滑動步長,會針對window執行一次計算

val streanWindow = streamKeyBy.timeWindow(Time.Seconds(10), Time.Seconds(2)).reduce(

(item1, item2) => (item1._1, item1._2 + item2._2)

)

6、EventTime和Window

引入EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

WaterMark

若是用EventTime來決定窗口的運行,一旦出現亂序,咱們不能明確數據是否已經所有到位,但又不能無限期的等下去,此時必須有個機制來保證一個特定的時間後,必須觸發window的計了,這個特別的機制就是watermark.


WaterMark是一種衡量EventTime進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的WaterMark。

WaterMark是用於處理亂序事件的,而正確的處理亂序事件,一般用WaterMark機制結合window來實現。


數據流中的WaterMark用於表示eventTime小於watermark的數據,都已經到達了,所以,window的執行也是由WaterMark觸發的。


WaterMark能夠理解成一個延遲觸發機制,咱們能夠設置WaterMark的延時時長爲t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime-t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime-t,那麼這個窗口被觸發。


當Flink接收到每一條數據時,都會計算產生一條watermark,watermark = 當前全部到達數據中的maxEventTime - 延遲時長,也就是說,watermark是由數據攜帶的,一量數據攜帶的watermark比當前未觸發的窗口的中止時間要晚,那麼就會觸發相應的窗口的執行。因爲watermark是由數據攜帶的,所以,若是運行過程當中沒法獲取新的數據,那麼沒有被觸發的窗口將永遠不被觸發。


EventTime的窗口與Time裏的窗口區別:

窗口大小設置爲5s,Time窗口每5秒執行一次,無論有沒有數據。

EventTime每5s生成一個窗口,但不執行。當觸發條件知足後,纔會執行窗口。

附錄

https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html

相關文章
相關標籤/搜索