花了四小時,看完Flink的內容,基本瞭解了原理。 挖個坑,待總結後填一下。html
2019-06-02 01:22:57等歐冠決賽中,填坑。redis
storm最大的特色是快,它的實時性很是好(毫秒級延遲)。爲了低延遲它犧牲了高吞吐,而且不能保證exactly once語義。sql
在低延遲和高吞吐的流處理中,維持良好的容錯是很是困難的,但爲了獲得有保障的準確狀態,人們想到一種替代方法:將連續時間中的流數據分割成一系列微小的批量做業(微批次處理)。若是分割得足夠小,計算幾乎能夠實現真正的流處理。由於存在延遲,因此不可能作到徹底實時,可是每一個簡單的應用程序均可以實現僅有幾秒甚至幾亞秒的延遲。這就是Spark Streaming所使用的方法。apache
爲了實現高吞吐和exactly once語義,storm推出了storm trident,也是使用了微批次處理的方法。編程
微批次處理缺點:架構
數據只能按固定時間分割,沒有辦法根據實際數據狀況,進行不一樣批次或每一個批次不一樣大小的分割app
知足不了對數據實時性要求很是高的數據框架
flink與storm,spark streaming的比較分佈式
Apache flink主頁在其頂部展現了該項目的理念:「Apache Flink是爲分佈式,高性能,隨時可用以及準確的流處理應用程序打造的開源流處理框架」ide
流處理與批處理:
批處理的特色是有界、持久(數據已經落地)、大量,批處理很是適合須要訪問全套記錄才能完成的計算工做,通常用於離線統計。典型的是Hadoop,它只能用於批處理。
流處理的特色是無界、實時,流處理方式無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做,通常用於實時統計。典型的是storm,它只能進行流處理。
有沒有即能實現批處理,也能實現流處理?
spark即能進行流處理,也能實現批處理,但它並非在同一架構體系下,spark的批處理是經過spark core和spark sql實現,流處理是經過spark streaming實現。
flink什麼特色呢?不論是批處理仍是流處理,它可以同時進行處理,由於它底層是不區分流批的。flink將批處理(即處理有限的靜態數據)視做一種特殊的流處理。
JobManager與TaskManager
若是粗化一點看的話,flink就是由2部分組成,即JobManager和TaskManager,是兩個JVM進程。
JobManager:也稱爲master(對應spark裏的driver),用於協調分佈式執行,它們用來調度task,協調檢查點,協調失敗回覆等。flink運行時至少存在一個master處理器,若是配置高可用模式則會存在多個master,它們其中一個是leader,而其它都是standby.
TaskManager:也稱爲worker(對應spark裏面的executor),用於執行一個dataflow的task,數據緩衝和datastream的交換,flink運行時至少會存在一個worker處理器。
Stateful Stream Processing,是數據接入,計算,輸出都是本身來實現,是最靈活也是最麻煩的編程接口。
DataStream/DataSet API是針對流和批處理的封裝API,絕大多數的編程是在這一層。
Table API,是將數據抽像成一張表,提供select, group_by等API接口供調用。
SQL是最高級的接口,支持直接寫SQL查詢數據。
當啓動新的Flink YARN會話時,客戶端首先檢查請求的資源(容器和內存)是否可用。以後,它將包含flink的jar和配置上傳到HDFS(步驟1)。
客戶端的下一步是請求(步驟2)YARN容器以啓動ApplicationMaster(步驟3)。因爲客戶端將配置和jar文件註冊爲容器的資源,所以在該特定機器上運行的YARN的NodeManager將負責準備容器(例如,下載文件)。一旦完成,ApplicationMaster(AM)就會啓動。
該JobManager和AM在同一容器中運行。成功啓動後,AM就很容易知道JobManager的地址(它本身的主機)。它爲TaskManagers生成一個新的Flink配置文件(以便它們能夠鏈接到JobManager)。該文件也被上傳到HDFS。此外,AM容器還提供Flink的Web界面。YARN代碼分配的全部端口都是臨時端口。這容許用戶並行執行多個Flink YARN會話。
以後,AM開始爲Flink的TaskManagers分配容器,它將從HDFS下載jar文件和修改後的配置。完成這些步驟後,Flink即會設置並準備接受做業。
這個提交Yarn Session的整個過程,Yarn Session提交完成後,JobManager和TaskManager就啓動完畢,等待用戶任務的提交(jar包)
Task Slot只平均分配內存,不分CPU!Slot指的是TaskManager可以並行執行的task最大數。
客戶端將程序抽像成Datafow Graph,並能過Actor System通訊,將程序提交到JobManager,JobManager根據Dataflow Graph(相似Spark中的DAG),兩個相臨任務間的並行度變化,來劃分任務,並將任務提交到TaskManager的Task Slot去執行。
TaskManager是一個獨立的JVM進程,TaskManager和Slot能夠看做worker pool模型,Slot是一個Worker,若是TaskManager裏有Slot,才能被分配任務。
一個Slot裏的Task,可能包含多個算子。Task按distributed進行劃分,也就算子是否產生shuffle(spark裏shuffle==flink的distributed).
若是有3個TaskManager,第個TaskManager中有3個Slot,那麼最高支持的並行度是9,parallelism.default=9.
flink程序的基礎構架模塊是流(streams)和轉換(transformation)
fink經過source將流接進來,經過transformation算子對流進行轉換,再經過sink將數據輸出,這是一個flink程序的完整過程。
flink程序的執行具備並行、分佈式的特性。在執行過程當中,一個stream包含一個或多個stream partition,而每一個operator包含一個或多個operator subtask,這些operator subtask在不一樣的線程、不一樣的物理機或不一樣的容器中彼此互不依賴的執行。
一個特定的operator的subtask的個數被稱之爲其parallelism(並行度)。一個程序中,不一樣的operator可能具備不一樣的並行度。
stream在operator之間的傳輸數據的形式能夠是one-to-one(forwarding)的模式,也能夠是redistributing的模式,具體哪一種形式取決於operator的種類。
如上圖map是one-to-one模式,而keyBy,window,apply是redistributing模式
one-to-one的算子,會被組合在一塊兒成爲operator-chain,一個operator-chain被分紅一個task去執行。
每一個flink程序都包含如下的若干個流程:
獲取一個執行環境:execution enviroment
加載/建立初始數據:source
指定轉換這些數據:transformation
指定放置計算結果的位置:sink
觸發程序執行
Map操做
遍歷一個集合的全部元素,並對每一個元素作轉換
輸入一個參數,產生一個輸出。
steam.map(item => item * 2)
FlatMap操做
輸入一個參數,產生0個、1個或多個輸出。
stream.flatMap(item => item.split(「 」))
Filter操做
結算每一個元素的布爾值,並返回布爾值爲true的元素。
stream.filter(item => item == 1)
Connect操做
DataStream1, DataStream2 -> ConnectedStreams
在ConnectedStream的內部,stream仍是分開的,也就是說,想對ConnectedStream執行一個Map/Filter等操做,要傳入2個函數。第1個對DataStream1操做,第2個對DataStream2操做。
streamConnect = stream1.connect(stream2)
streamConnect.map(item => item * 2, item => (item, 1L))
coMap, coFlatMap操做
ConnectedStreams -> DataStream
stream = streamConnect.map(item => item * 2, item => (item, 1L))
輸入是一個ConnectedStream,輸出是一個普通的DataStream
Split + Select操做
DataStream -> SplitStream
val streamSplit = stream.split(word => (「haddoop」.equals(word)) match {
case true => List(「hadoop」)
case false => List(「other」)
}
)
上面split將流劃分紅2個流
val streamSelect001 = streamSplit.select(「hadoop」)
select 將指定的一個流取出來。
union操做
對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新的DataStream。
KeyBy
DataStream -> KeyedStream,輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同Key的元素,在內部以Hash的形式實現。
val env = StreamExecutionEnvironment.getExecutionEnvironment
var stream = env.readTextFile(「test.txt」)
val streamMap = stream.flaMap(item => item.split(「 「)).map(item => (item, 1L))
val streamKeyBy = streamMap.keyBy(0) //keyBy能夠根據Tuple中的第一個元素,也能夠根據第二個元素,進行partition。0表明第一個元素。
Reduce操做
KeyedStream -> DataStream:一個分組數據流的聚合操做,合併當前元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
val streamReduce = streamKeyBy.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
Fold操做
KeyedStream -> DataStream:一個有初始值的分組數據流的滾動摺疊操做。給窗口賦一個fold功能的函數,並返回一個fold後結果
Aggregation操做
flink中有3個時間,
EventTime:事件生成的時間
IngestionTime:事件進入flink的時間
WindowProcessingTime:事件被處理的系統時間(默認使用)
不能直接對無界的流進行聚合,要先將流劃分爲window,再對window進行聚合。
window分爲兩類:
CountWindow:按照指定的數據條數,生成一個window,與時間無關
TimeWindow:按照時間生成window
對於TimeWindow,能夠根據窗口實現原理的不一樣分紅三類:
滾動窗口(Tumbing Window)—— 沒有重疊
滑動窗口(Sliding Window)—— 重疊,有窗口長度和滑動步長兩個屬性
會話窗口(Sessionn Window)—— 若是相臨的兩條數據,間隔時間超過會話窗口時間大小,則前面的數據生成一個窗口。
每知足滑動步長,會針對window執行一次計算
val streanWindow = streamKeyBy.timeWindow(Time.Seconds(10), Time.Seconds(2)).reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
若是用EventTime來決定窗口的運行,一旦出現亂序,咱們不能明確數據是否已經所有到位,但又不能無限期的等下去,此時必須有個機制來保證一個特定的時間後,必須觸發window的計了,這個特別的機制就是watermark.
WaterMark是一種衡量EventTime進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的WaterMark。
WaterMark是用於處理亂序事件的,而正確的處理亂序事件,一般用WaterMark機制結合window來實現。
數據流中的WaterMark用於表示eventTime小於watermark的數據,都已經到達了,所以,window的執行也是由WaterMark觸發的。
WaterMark能夠理解成一個延遲觸發機制,咱們能夠設置WaterMark的延時時長爲t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime-t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime-t,那麼這個窗口被觸發。
當Flink接收到每一條數據時,都會計算產生一條watermark,watermark = 當前全部到達數據中的maxEventTime - 延遲時長,也就是說,watermark是由數據攜帶的,一量數據攜帶的watermark比當前未觸發的窗口的中止時間要晚,那麼就會觸發相應的窗口的執行。因爲watermark是由數據攜帶的,所以,若是運行過程當中沒法獲取新的數據,那麼沒有被觸發的窗口將永遠不被觸發。
EventTime的窗口與Time裏的窗口區別:
窗口大小設置爲5s,Time窗口每5秒執行一次,無論有沒有數據。
EventTime每5s生成一個窗口,但不執行。當觸發條件知足後,纔會執行窗口。
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html