本文來自9月1日在成都舉行的Apache Flink China Meetup,雲邪的分享。算法
做者:雲邪shell
整理:李澤聚(Flink China社區志願者)數據庫
校對:雲邪 / 韓非(Flink China社區志願者)編程
Flink是一款分佈式的計算引擎,它能夠用來作批處理,即處理靜態的數據集、歷史的數據集;也能夠用來作流處理,即實時地處理一些實時數據流,實時地產生數據的結果;也能夠用來作一些基於事件的應用,好比說滴滴經過Flink CEP實現實時監測用戶及司機的行爲流來判斷用戶或司機的行爲是否正當。網絡
總而言之,Flink是一個Stateful Computations Over Streams,即數據流上的有狀態的計算。這裏面有兩個關鍵字,一個是Streams,Flink認爲有界數據集是無界數據流的一種特例,因此說有界數據集也是一種數據流,事件流也是一種數據流。Everything is streams,即Flink能夠用來處理任何的數據,能夠支持批處理、流處理、AI、MachineLearning等等。另一個關鍵詞是Stateful,即有狀態計算。有狀態計算是最近幾年來愈來愈被用戶需求的一個功能。舉例說明狀態的含義,好比說一個網站一天內訪問UV數,那麼這個UV數便爲狀態。Flink提供了內置的對狀態的一致性的處理,即若是任務發生了Failover,其狀態不會丟失、不會被多算少算,同時提供了很是高的性能。併發
那Flink的受歡迎離不開它身上還有不少的標籤,其中包括性能優秀(尤爲在流計算領域)、高可擴展性、支持容錯,是一種純內存式的一個計算引擎,作了內存管理方面的大量優化,另外也支持eventime的處理、支持超大狀態的Job(在阿里巴巴中做業的state大小超過TB的是很是常見的)、支持exactly-once的處理。框架
Flink之因此能這麼流行,離不開它最重要的四個基石:Checkpoint、State、Time、Window。運維
首先是Checkpoint機制,這是Flink最重要的一個特性。Flink基於Chandy-Lamport算法實現了一個分佈式的一致性的快照,從而提供了一致性的語義。Chandy-Lamport算法實際上在1985年的時候已經被提出來,但並無被很普遍的應用,而Flink則把這個算法發揚光大了。Spark最近在實現Continue streaming,Continue streaming的目的是爲了下降它處理的延時,其也須要提供這種一致性的語義,最終採用Chandy-Lamport這個算法,說明Chandy-Lamport算法在業界獲得了必定的確定。機器學習
提供了一致性的語義以後,Flink爲了讓用戶在編程時可以更輕鬆、更容易地去管理狀態,還提供了一套很是簡單明瞭的State API,包括裏面的有ValueState、ListState、MapState,近期添加了BroadcastState,使用State API可以自動享受到這種一致性的語義。異步
除此以外,Flink還實現了Watermark的機制,可以支持基於事件的時間的處理,或者說基於系統時間的處理,可以容忍數據的延時、容忍數據的遲到、容忍亂序的數據
另外流計算中通常在對流數據進行操做以前都會先進行開窗,即基於一個什麼樣的窗口上作這個計算。Flink提供了開箱即用的各類窗口,好比滑動窗口、滾動窗口、會話窗口以及很是靈活的自定義的窗口。
Flink分層API主要有三層,以下圖:
之上是DataStream API,最上層是SQL/Table API的一種High-level API。
Flink能用來作什麼?回顧一下Flink up前幾站的分享,有很是多的嘉賓分享了他們在本身公司裏面基於Flink作的一些實踐,包括攜程、惟品會、餓了麼、滴滴、頭條等等。他們的應用場景包括實時的機器學習,實時的統計分析,實時的異常監測等等。這些實踐案例的共同點就是都用來作實時性的任務。
早期Flink是這樣介紹本身的:「我是一個開源的流批統一的計算引擎」,當時跟Spark有點相似。後來Spark改爲了一長串的文字,裏面有各類各樣的形容詞:「我是一個分佈式的、高性能的、高可用的、高精確的流計算系統」。最近Spark又進行了修改:「我是一個數據流上的有狀態的計算」。
經過觀察這個變化,能夠發現Flink社區重心的變遷,即社區如今主要精力是放在打造它的流計算引擎上。先在流計算領域紮根,領先其餘對手幾年,而後藉助社區的力量壯大社區,再借助社區的力量擴展它的生態。
阿里巴巴Flink是這樣介紹本身的:「Flink是一個大數據量處理的統一的引擎」。這個「統一的引擎」包括流處理、批處理、AI、MachineLearning、圖計算等等。
在Flink 1.0.0時期,Table API和CEP這兩個框架被首次加入到倉庫裏面,同時社區對於SQL的需求很大。SQL和Table API很是相近,都是一種處理結構化數據的一種High-Level語言,實現上能夠共用不少內容。因此在1.1.0裏面,社區基於Apache Calcite對整個非Table的Module作了重大的重構,使得Table API和SQL共用了大部分的代碼,同時進行了支持。
在Flink 1.2.0時期,在Table API和SQL上支持Tumbling Window、Sliding Window、Session Window這些窗口。
在Flink 1.3.0時期,首次引用了Dynamic Table這個概念,藉助Dynamic Table,流和批之間是能夠相互進行轉換的。流能夠是一張表,表也能夠是一張流,這是流批統一的基礎之一。Retraction機制是Dynamic Table最重要的一個功能,基於Retraction纔可以正確地實現多級Application、多級Join,纔可以保證語意與結果的一個正確性。同時該版本支持了CEP算子的可控性。
在Flink 1.5.0時期,支持了Join操做,包括window Join以及非window Join,還添加了SQL CLI支持。SQL CLI提供了一個相似shell命令的對話框,能夠交互式執行查詢。
在Flink 1.0.0時期,加入了State API,即ValueState、ReducingState、ListState等等。State API主要方便了DataStream用戶,使其可以更加容易地管理狀態。
在Flink 1.1.0時期,提供了對SessionWindow以及遲到數據處理的支持。
在Flink 1.2.0時期,提供了ProcessFunction,一個Low-level的API。基於ProcessFunction用戶能夠比較靈活地實現基於事件的一些應用。
在Flink 1.3.0時期,提供了Side outputs功能。通常算子的輸出只有一種輸出的類型,可是有些時候可能須要輸出另外的類型,好比把一些異常數據、遲到數據以側邊流的形式進行輸出,並交給異常節點進行下一步處理,這就是Side outputs。
在Flink 1.5.0時期,加入了BroadcastState。BroadcastState用來存儲上游被廣播過來的數據,這個節點上的不少N個併發上存在的BroadcastState裏面的數據都是如出一轍的,由於它是從上游廣播來的。基於這種State能夠比較好地去解決不等值Join這種場景。好比一個Query裏面寫的「SLECECT * FROM L JOIN R WHERE L.a > R.b」,也就是說咱們須要把左表和右表裏面全部A大於B的數據都關聯輸出出來。在之前的實現中,因爲沒有Join等值條件,就沒法按照等值條件來作KeyBy的Shuffle,只可以將全部的數據所有聚集到一個節點上,一個單併發的節點上進行處理,而這個單併發的節點就會成爲整個Job的瓶頸。而有了BroadcastState之後就能夠作一些優化:由於左表數據量比較大,右表數據量比較小,因此選擇把右表進行廣播,把左表按照它某一個進行均勻分佈的key,作keyby shuffle,shuffle到下游的N個Join的節點,Join的節點裏面會存兩份State,左邊state和右邊state,左邊state用來存左邊數據流的state,是一個keyedState,由於它是按它某一個key作keyby分發下來的。右邊State是一個BroadcastState,全部的Join節點裏面的BroadcastState裏面存的數據都是如出一轍的,由於均爲從上游廣播而來。全部keyedState進行併發處理,以後將keyedState集合進行合併便等於左邊數據流的全集處理結果。因而便實現了這個Join節點的可擴充,經過增長join節點的併發,能夠比較好地提高Job處理能力。除了不等值Join場景,BroadcastState還能夠比較有效地解決像CAP上的動態規則。
在Flink 1.6.0時期,提供了State TTL參數、DataStream Interval Join功能。State TTL實現了在申請某個State時候能夠在指定一個TTL參數,指定該state過了多久以後須要被系統自動清除。在這個版本以前,若是用戶想要實現這種狀態清理操做須要使用ProcessFunction註冊一個Timer,而後利用Timer的回調手動把這個State清除。從該版本開始,Flink框架能夠基於TTL原生地解決這件事情。DataStream Interval Join功能即含有區間間隔的Join,好比說左流Join右流先後幾分鐘以內的數據,這種叫作Interval Join。
Checkpoint機制在Flink很早期的時候就已經支持,是Flink一個很核心的功能,Flink社區也一直致力於努力把Checkpoint效率提高,以及換成FailOver以後它的Recallable效率的提高。
在Flink 1.0.0時期,提供了RocksDB的支持,這個版本以前全部的狀態都只能存在進程的內存裏面,這個內存總有存不下的一天,若是存不下則會發生OOM。若是想要存更多數據、更大量State就要用到RocksDB。RocksDB是一款基於文件的嵌入式數據庫,它會把數據存到磁盤,可是同時它又提供高效讀寫能力。因此使用RocksDB不會發生OOM這種事情。在Flink1.1.0裏面,提供了純異步化的RocksDB的snapshot。之前版本在作RocksDB的snapshot時它會同步阻塞主數據流的處理,很影響吞吐量,即每當checkpoint時主數據流就會卡住。純異步化處理以後不會卡住數據流,因而吞吐量也獲得了提高。
在Flink 1.2.0時期,引入了Rescalable keys和operate state的概念,它支持了一個Key State的可擴充以及operator state的可擴充。 在Flink 1.3.0時期,引入了增量的checkpoint這個比較重要的功能。只有基於增量的checkpoint才能更好地支持含有超大State的Job。在阿里內部,這種上TB的State是很是常見。若是每一次都把全量上TB的State都刷到遠程的HDFS上那麼這個效率是很低下的。而增量checkpoint只是把checkpoint間隔新增的那些狀態發到遠程作存儲,每一次checkpoint發的數據就少了不少,效率獲得提升。在這個版本里面還引入了一個細粒度的recovery,細粒度的recovery在作恢復的時候,有時不須要對整個Job作恢復,可能只須要恢復這個Job中的某一個子圖,這樣便可以提升恢復效率。
在Flink 1.5.0時期,引入了Task local 的State的recovery。由於基於checkpoint機制,會把State持久化地存儲到某一個遠程存儲,好比HDFS,當發生Failover的時候須要從新把這個數據從遠程HDFS再download下來,若是這個狀態特別大那麼該download操做的過程就會很漫長,致使Failover恢復所花的時間會很長。Task local state recovery提供的機制是當Job發生Failover以後,可以保證該Job狀態在本地不會丟失,進行恢復時只需在本地直接恢復,不需從遠程HDFS從新把狀態download下來,因而就提高了Failover recovery的效率。
Runtime的變遷歷史是很是重要的。
在Flink 1.2.0時期,提供了Async I/O功能。若是任務內部須要頻繁地跟外部存儲作查詢訪問,好比說查詢一個HBase表,在該版本以前每次查詢的操做都是阻塞的,會頻繁地被I/O的請求卡住。當加入異步I/O以後就能夠同時地發起N個異步查詢的請求,這樣便提高了整個job的吞吐量,同時Async I/O又可以保證該job的Async語義。
在Flink 1.3.0時期,引入了HistoryServer的模塊。HistoryServer主要功能是當job結束之後,它會把job的狀態以及信息都進行歸檔,方便後續開發人員作一些深刻排查。
在Flink 1.4.0時期,提供了端到端的exactly once的語義保證,Flink中所謂exactly once通常是指Flink引擎自己的exactly once。若是要作到從輸入處處理再到輸出,整個端到端總體的exactly once的話,它須要輸出組件具有commit功能。在kafka老版本中不存在commit功能,從最近的1.1開始有了這個功能,因而Flink很快便實現了端到端exactly once。
在Flink 1.5.0時期,Flink首次對外正式地提到新的部署模型和處理模型。新的模型開發工做已經持續了好久,在阿里巴巴內部這個新的處理模型也已經運行了有兩年以上,該模型的實現對Flink內部代碼改動量特別大,能夠說是自Flink項目創建以來,Runtime改動最大的一個改進。簡而言之,它的一個特性就是它可使得在使用YARN、Mesos這種調度系統時,能夠更加更好地動態分配資源、動態釋放資源、提升資源利用性,還有提供更好的jobs之間的隔離。最後是在這個版本中,Flink對其網絡站進行了一個基本重構。
在流計算中有兩個用來衡量性能的指標:延遲和吞吐。通常來說若是想要更高吞吐就要犧牲一些延遲,若是想要更低的延遲就要犧牲必定的吞吐。可是網絡棧的重構卻實現了延遲和吞吐的同時提高,這主要得益於它兩方面的工做:第一個是基於信用的流控,另外一個是基於事件的I/O。一個用來提升它的吞吐,另外一個用來下降它的延遲。
在介紹流控以前須要先介紹一下現有的網絡棧。Flink中TaskManager就是用來管理各個task的角色,它是以進程爲單位;task用來執行用戶代碼,以線程爲單位。當tasks之間有數據傳輸的交互的時候就要創建網絡的鏈接,若是2秒之間都創建一個TCP鏈接的話,那麼這個TCP鏈接會被嚴重浪費,因此Flink在兩個TaskManager之間創建一個TCP鏈接,即兩個進程之間只存在一個鏈接。各個task之間以TCP channel的方式來共享TCP的鏈接,這樣整個job中就不會有太多的TCP鏈接。
反壓的意思是當某一個task的處理性能跟不上輸入速率的時候,其輸入端的Buffer就會被填滿,當輸入端Buffer被填滿的時候就會致使TCP的讀取被暫停。TCP的讀取被暫停以後,就會致使上游輸出端的Buffer池越積越多,由於下游此時已經再也不進行消費。當上遊輸出端的Buffer池也堆滿的時候, TCP通道就會被關閉,其內部全部的TCP channel也會被關閉。從而上游task就會逐級的向上遊進行反壓,這是總體的反壓流程,因此說Flink之前的反壓機制是比較原生態、比較粗暴的,由於其控制力度很大,整個TCP中一旦某一個Task性能跟不上,就會把整個TCP鏈接關掉。以下圖所示:
基於信用的流控的核心思想就是基於信用額度的消費。好比銀行作貸款,爲了防止壞帳太多,它會對每個人評估其信用額度,當發放貸款時貸款不會超過這我的能承受的額度。基於這種方式,它可以一方面不會產生太多壞帳,另外一方面能夠充分地把銀行的資金利用起來。基於信用的流控就是基於這種思想,Flink中所謂的信用額度,就是指這個下游消費端的可用的Buffer數。以下圖:
該圖左邊是指發送端,有四個輸出的隊列,每一個隊列裏面的方塊表明輸出Buffer,即準備丟給下游處理的Buffer。右邊是消費端,消費端也有四個隊列,這四個隊列裏面也有一些Buffer塊,這些Buffer塊是空閒的Buffer,準備用來接收上游發給本身的數據。
上面提到基於數據的流控中所謂的信用就是指這個消費端它可用的Buffer數,表明當前還可以消費多少數據,消費端首先會向上遊反饋當前的信用是多少, producer端只會向信用額度大於0的下游進行發送,對於信用額度若是爲0的就再也不發送數據。這樣整個網絡的利用率便獲得了很大的提高,不會發生某些Buffer被長時間的停留在網絡的鏈路上的狀況。基於信用的流控主要有如下兩方面的優化提高:一個是當某一個task發生反壓處理跟不上的時候,不會發生全部的task都卡住,這種作法使吞吐量獲得了很大的提高,在阿里內部用雙11大屏做業進行測試,這種新的流控算法會獲得20%的提高;另外一個是基於事件的I/O,Flink在網絡端寫數據時會先往一個Buffer塊裏面寫數據,這個Buffer塊是一個32K的長度的單位,即32K的大小,當這個Buffer塊被填滿的時候就會輸出到網絡裏面,或者若是數據流比較慢,沒辦法很快填滿的話,那麼會等待一個超時,默認一個100毫秒,即若是100毫秒內還沒被填滿那麼這個Buffer也會被輸出到網絡裏面。此時如果在之前版本中Flink延遲多是在100毫秒之內,最差的狀況下是到100毫秒,由於須要到100毫秒等這個Buffer發出去。若是要獲得更低的延時,如今的作法就會將這個Buffer直接加入到輸出的隊列,可是仍是保持繼續往這個Buffer塊裏面寫數據,當網絡裏面有容量時這個Buffer塊便會馬上被髮出去,若是網絡如今也比較繁忙,那就繼續填充這個Buffer,這樣吞吐也會比較好一點。基於這種算法,Flink的延時幾乎是完美的,能夠看到它的曲線基本上是低於10毫秒的,這也充分利用了網絡的容量,幾乎對吞吐沒有影響。
11月4日,Apache Flink China社區第二季Meetup巡展開啓。
來自阿里、匯智、菜鳥、袋鼠雲、有讚的技術專家,將爲你展示:
- 如何擴展Flink SQL實現流與維表的join
- 如何經過平臺提升運維的效率,下降調優的成本
- Flink批處理與ML的嘗試
- Apache RocketMQ與Apache Flink的集成 …………