既然有了Apache Spark,爲何還要使用Apache Flink?css
由於Flink是一個純流式計算引擎,而相似於Spark這種微批的引擎,只是Flink流式引擎的一個特例。其餘的不一樣點以後會陸續談到。html
Flink起源於一個叫作Stratosphere的研究項目,目標是創建下一代大數據分析引擎,其在2014年4月16日成爲Apache的孵化項目,從Stratosphere 0.6開始,正式改名爲Flink。Flink 0.7中介紹了最重要的特性:Streaming API。最初只支持Java API,後來增長了Scala API。java
Flink 1.X版本的包含了各類各樣的組件,包括部署、flink core(runtime)以及API和各類庫。git
從部署上講,Flink支持local模式、集羣模式(standalone集羣或者Yarn集羣)、雲端部署。Runtime是主要的數據處理引擎,它以JobGraph形式的API接收程序,JobGraph是一個簡單的並行數據流,包含一系列的tasks,每一個task包含了輸入和輸出(source和sink例外)。github
DataStream API和DataSet API是流處理和批處理的應用程序接口,當程序在編譯時,生成JobGraph。編譯完成後,根據API的不一樣,優化器(批或流)會生成不一樣的執行計劃。根據部署方式的不一樣,優化後的JobGraph被提交給了executors去執行。算法
Flink分佈式程序包含2個主要的進程:JobManager和TaskManager.當程序運行時,不一樣的進程就會參與其中,包括Jobmanager、TaskManager和JobClient。apache
首先,Flink程序提交給JobClient,JobClient再提交到JobManager,JobManager負責資源的協調和Job的執行。一旦資源分配完成,task就會分配到不一樣的TaskManager,TaskManager會初始化線程去執行task,並根據程序的執行狀態向JobManager反饋,執行的狀態包括starting、in progress、finished以及canceled和failing等。當Job執行完成,結果會返回給客戶端。bootstrap
Master進程,負責Job的管理和資源的協調。包括任務調度,檢查點管理,失敗恢復等。windows
固然,對於集羣HA模式,能夠同時多個master進程,其中一個做爲leader,其餘做爲standby。當leader失敗時,會選出一個standby的master做爲新的leader(經過zookeeper實現leader選舉)。
JobManager包含了3個重要的組件:緩存
Flink內部使用Akka模型做爲JobManager和TaskManager之間的通訊機制。
Actor系統是個容器,包含許多不一樣的Actor,這些Actor扮演者不一樣的角色。Actor系統提供相似於調度、配置、日誌等服務,同時包含了全部actors初始化時的線程池。
全部的Actors存在着層級的關係。新加入的Actor會被分配一個父類的Actor。Actors之間的通訊採用一個消息系統,每一個Actor都有一個「郵箱」,用於讀取消息。若是Actors是本地的,則消息在共享內存中共享;若是Actors是遠程的,則消息經過RPC遠程調用。
每一個父類的Actor都負責監控其子類Actor,當子類Actor出現錯誤時,本身先嚐試重啓並修復錯誤;若是子類Actor不能修復,則將問題升級並由父類Actor處理。
在Flink中,actor是一個有狀態和行爲的容器。Actor的線程持續的處理從「郵箱」中接收到的消息。Actor中的狀態和行爲則由收到的消息決定。
Flink中的Executors被定義爲task slots(線程槽位)。每一個Task Manager須要管理一個或多個task slots。
Flink經過SlotSharingGroup和CoLocationGroup來決定哪些task須要被共享,哪些task須要被單獨的slot使用。
Flink的檢查點機制是保證其一致性容錯功能的骨架。它持續的爲分佈式的數據流和有狀態的operator生成一致性的快照。其改良自Chandy-Lamport算法,叫作ABS(輕量級異步Barrier快照),具體參見論文:
Lightweight Asynchronous Snapshots for Distributed Dataflows
Flink的容錯機制持續的構建輕量級的分佈式快照,所以負載很是低。一般這些有狀態的快照都被放在HDFS中存儲(state backend)。程序一旦失敗,Flink將中止executor並從最近的完成了的檢查點開始恢復(依賴可重發的數據源+快照)。
Barrier做爲一種Event,是Flink快照中最主要的元素。它會隨着data record一塊兒被注入到流數據中,並且不會超越data record。每一個barrier都有一個惟一的ID,將data record分到不一樣的檢查點的範圍中。下圖展現了barrier是如何被注入到data record中的:
每一個快照中的狀態都會報告給Job Manager的檢查點協調器;快照發生時,flink會在某些有狀態的operator上對data record進行對齊操做(alignment),目的是避免失敗恢復時重複消費數據。這個過程也是exactly once的保證。一般對齊操做的時間僅是毫秒級的。可是對於某些極端的應用,在每一個operator上產生的毫秒級延遲也不能容許的話,則能夠選擇降級到at least once,即跳過對齊操做,當失敗恢復時可能發生重複消費數據的狀況。Flink默認採用exactly once意義的處理。
Task Managers是具體執行tasks的worker節點,執行發生在一個JVM中的一個或多個線程中。Task的並行度是由運行在Task Manager中的task slots的數量決定。若是一個Task Manager有4個slots,那麼JVM的內存將分配給每一個task slot 25%的內存。一個Task slot中能夠運行1個或多個線程,同一個slot中的線程又能夠共享相同的JVM。在相同的JVM中的tasks,會共享TCP鏈接和心跳消息:
Job Client並非Flink程序執行中的內部組件,而是程序執行的入口。Job Client負責接收用戶提交的程序,並建立一個data flow,而後將生成的data flow提交給Job Manager。一旦執行完成,Job Client將返回給用戶結果。
Data flow就是執行計劃,好比下面一個簡單的word count的程序:
當用戶將這段程序提交時,Job Client負責接收此程序,並根據operator生成一個data flow,那麼這個程序生成的data flow也許看起來像是這個樣子:
默認狀況下,Flink的data flow都是分佈式並行處理的,對於數據的並行處理,flink將operators和數據流進行partition。Operator partitions叫作sub-tasks。數據流又能夠分爲一對一的傳輸與重分佈的狀況。
咱們看到,從source到map的data flow,是一個一對一的關係,不必產生shuffle操做;而從map到groupBy操做,flink會根據key將數據重分佈,即shuffle操做,目的是聚合數據,產生正確的結果。
Flink自己就被設計爲高性能和低延遲的引擎。不像Spark這種框架,你沒有必要作許多手動的配置,用以得到最佳性能,Flink管道式(pipeline)的數據處理方式已經給了你最佳的性能。
經過檢查點+可重發的數據源,使得Flink對於stateful的operator,支持exactly once的計算。固然你能夠選擇降級到at least once。
Flink支持數據驅動的窗口,這意味着咱們能夠基於時間(event time或processing time)、count和session來構建窗口;窗口同時能夠定製化,經過特定的pattern實現。
經過輕量級、分佈式快照實現。
Flink在JVM內部進行內存的自我管理,使得其獨立於java自己的垃圾回收機制。當處理hash、index、caching和sorting時,Flink自個人內存管理方式使得這些操做很高效。可是,目前自個人內存管理只在批處理中實現,流處理程序並未使用。
爲了不shuffle、sort等操做,Flink的批處理API進行了優化,它能夠確保避免過分的磁盤IO而儘量使用緩存。
Flink中批和流有各自的API,你既能夠開發批程序,也能夠開發流處理程序。事實上,Flink中的流處理優先原則,認爲批處理是流處理的一種特殊狀況。
Flink提供了用於機器學習、圖計算、Table API等庫,同時Flink也支持複雜的CEP處理和警告。
Flink支持Event Time語義的處理,這有助於處理流計算中的亂序問題,有些數據也許會遲到,咱們能夠經過基於event time、count、session的窗口來處於這樣的場景。
直接參見官方文檔:QuickStart
直接參見官方文檔:Standalone Cluster
略去,可參見官方文檔:Examples
Flink細節上的討論和處理模型。下一章將介紹Flink Streaming API。
許多領域須要數據的實時處理,物聯網驅動的應用程序在數據的存儲、處理和分析上須要實時或準實時的進行。
Flink提供流處理的API叫作DataStream API,每一個Flink程序均可以按照下面的步驟進行開發:
咱們首先要得到已經存在的運行環境或者建立它。有3種方法獲得運行環境:
Flink支持許多預約義的數據源,同時也支持自定義數據源。下面咱們看看有哪些預約義的數據源。
DataStream API支持從socket讀取數據,有以下3個方法:
你可使用readTextFile(String path)來消費文件中的數據做爲流數據的來源,默認狀況下的格式是TextInputFormat。固然你也能夠經過readFile(FileInputFormat inputFormat, String path)來指定FileInputFormat的格式。
Flink一樣支持讀取文件流:
關於基於文件的數據流,這裏再也不過多介紹。
Transformation容許將數據從一種形式轉換爲另外一種形式,輸入能夠是1個源也能夠是多個,輸出則能夠是0個、1個或者多個。下面咱們一一介紹這些Transformations。
輸入1個元素,輸出一個元素,Java API以下:
輸入1個元素,輸出0個、1個或多個元素,Java API以下:
條件過濾時使用,當結果爲true時,輸出記錄;
邏輯上按照key分組,內部使用hash函數進行分組,返回keyedDataStream:
inputStream.keyBy("someKey");
keyedStream流上,將上一次reduce的結果和本次的進行操做,例如sum reduce的例子:
在keyedStream流上的記錄進行鏈接操做,例如:
假如是一個(1,2,3,4,5)的流,那麼結果將是:Start=1=2=3=4=5
在keyedStream上應用相似min、max等聚合操做:
窗口功能容許在keyedStream上應用時間或者其餘條件(count或session),根據key分組作聚合操做。
流是無界的,爲了處理無界的流,咱們能夠將流切分到有界的窗口中去處理,根據指定的key,切分爲不一樣的窗口。咱們可使用Flink預約義的窗口分配器。固然你也能夠經過繼承WindowAssginer自定義分配器。
下面看看有哪些預約義的分配器。
Global window的範圍是無限的,你須要指定觸發器來觸發窗口。一般來說,每一個數據按照指定的key分配到不一樣的窗口中,若是不指定觸發器,則窗口永遠不會觸發。
Tumbling窗口是基於特定時間建立的,他們的大小固定,窗口間不會發生重合。例如你想基於event timen每隔10分鐘計算一次,這個窗口就很適合。
Sliding窗口的大小也是固定的,但窗口之間會發生重合,例如你想基於event time每隔1分鐘,統一過去10分鐘的數據時,這個窗口就很適合。
Session窗口容許咱們設置一個gap時間,來決定在關閉一個session以前,咱們要等待多長時間,是衡量用戶活躍與否的標誌。
WindowAll操做不是基於key的,是對全局數據進行的計算。因爲不基於key,所以是非並行的,即並行度是1.在使用時性能會受些影響。
inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));
Union功能就是在2個或多個DataStream上進行鏈接,成爲一個新的DataStream。
inputStream. union(inputStream1, inputStream2, ...)
Join容許在2個DataStream上基於相同的key進行鏈接操做,計算的範圍也是要基於一個window進行。
Split的功能是根據某些條件將一個流切分爲2個或多個流。例如你有一個混合數據的流,根據數據自身的某些特徵,將其劃分到多個不一樣的流單獨處理。
DataStream根據選擇的字段,將流轉換爲新的流。
Project功能容許你選擇流中的一部分元素做爲新的數據流中的字段,至關於作個映射。
Flink容許咱們在流上執行物理分片,固然你能夠選擇自定義partitioning。
根據某個具體的key,將DataStream中的元素按照key從新進行分片,將相同key的元素聚合到一個線程中執行。
不根據具體的key,而是隨機將數據打散。
inputStream.shuffle();
內部使用round robin方法將數據均勻打散。這對於數據傾斜時是很好的選擇。
inputStream.rebalance();
Rescaling是經過執行oepration算子來實現的。因爲這種方式僅發生在一個單一的節點,所以沒有跨網絡的數據傳輸。
inputStream.rescale();
廣播用於將dataStream全部數據發到每個partition。
inputStream.broadcast();
咱們最終須要將結果保存在某個地方,Flink提供了一些選項:
對於Flink中的connector以及自定義輸出,後續的章節會講到。
Flink Streaming API受到了Google DataFlow模型的啓發,支持3種不一樣類型的時間概念:
(1)Event Time
事件發生的時間,通常數據中自帶時間戳。這就可能致使亂序的發生。
(2)Processing Time
Processing Time是機器的時間,這種時間跟數據自己沒有關係,徹底依賴於機器的時間。
(3)Ingestion Time
是數據進入到Flink的時間。注入時間比processing time更加昂貴(多了一個assign timestamp的步驟),可是其準確性相比processing time的處理更好。因爲是進入Flink才分配時間戳,所以沒法處理亂序。
咱們經過在env中設置時間屬性來選擇不一樣的時間概念:
Flink提供了預約義的時間戳抽取器和水位線生成器。參考:
Pre-defined Timestamp Extractors / Watermark Emitters
kafka是一個基於發佈、訂閱的分佈式消息系統。Flink定義了kafka consumer做爲數據源。咱們只須要引入特定的依賴便可(這裏以kafka 0.9爲例):
在使用時,咱們須要指定topic name以及反序列化器:
Flink默認支持String和Json的反序列化。
Kafka consumer在實現時實現了檢查點功能,所以失敗恢復時能夠重發。
Kafka除了consumer外,咱們也能夠將結果輸出到kafka。即kafka producer。例如:
用twitter做爲數據源,首先你須要用於twitter帳號。以後你須要建立twitter應用並認證。
這裏有個幫助文檔:https://dev.twitter.com/oauth/overview/application-owner-access-tokens
Pom中添加依賴:
API:
這3個connetor略過,殼參考官方文檔:
https://flink.apache.org/ecosystem.html
這裏能夠參考OSCON的例子:
https://github.com/dataArtisans/oscon。
本章介紹了Flink的DataStream API,下一章將介紹DataSet API。