目錄html
分佈式系統須要解決:分配和管理在集羣的計算資源、處理配合、持久和可訪問的數據存儲、失敗恢復。Fink專一分佈式流處理。算法
Components of a Flink Setupapache
task是最基本的調度單位,由一個線程執行,裏面包含一個或多個operator。多個operators就成爲operation chain,須要上下游併發度一致,且傳遞模式(以前的Data exchange strategies)是forward。緩存
slot是TM的資源子集。結合下面Task Execution的圖,一個slot並不表明一個線程,它裏面並不必定只放一個task。多個task在一個slot就涉及slot sharing group。一個jobGraph的任務須要多少slot,取決於最大的併發度,這樣的話,併發1和併發2就不會放到一個slot中。Co-Location Group是在此基礎上,數據的forward形式,即一個slot中,若是它處理的是key1的數據,那麼接下來的task也是處理key1的數據,此時就達到Co-Location Group。安全
儘管有slot sharing group,但一個group裏串聯起來的task各自所需資源的大小並很差肯定。阿里平常用得最多的仍是一個task一個slot的方式。bash
Session模式(上圖):預先啓動好AM和TM,每提交一個job就啓動一個Job Manager並向Flink的RM申請資源,不夠的話,Flink的RM向YARN的RM申請資源。適合規模小,運行時間短的做業。./bin/flink run ./path/to/job.jar
網絡
Job模式:每個job都從新啓動一個Flink集羣,完成後結束Flink,且只有一個Job Manager。資源按需申請,適合大做業。./bin/flink run -m yarn-cluster ./path/to/job.jar
session
下面是簡單例子,詳細看官網。數據結構
# 啓動yarn-session,4個TM,每一個有4GB堆內存,4個slot cd flink-1.7.0/ ./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m -s 4 # 啓動做業 ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
細節取決於具體環境,如不一樣的RM多線程
Application Deployment
Framework模式:Flink做業爲JAR,並被提交到Dispatcher or JM or YARN。
Library模式:Flink做業爲application-specific container image,如Docker image,適合微服務。
Task Execution
做業調度:在流計算中預先啓動好節點,而在批計算中,每當某個階段完成計算才啓動下一個節點。
資源管理:slot做爲基本單位,有大小和位置屬性。JM有SlotPool,向Flink RM申請Slot,FlinkRM發現本身的SlotManager中沒有足夠的Slot,就會向集羣RM申請。後者返回可用TM的ip,讓FlinkRM去啓動,TM啓動後向FlinkRM註冊。後者向TM請求Slot,TM向JM提供相應Slot。JM用完後釋放Slot,TM會把釋放的Slot報告給FlinkRM。在Blink版本中,job模式會根據申請slot的大小分配相應的TM,而session模式則預先設置好TM大小,每有slot申請就從TM中劃分相應的資源。
任務能夠是相同operator (data parallelism),不一樣 operator (task parallelism),甚至不一樣application (job parallelism)。TM提供必定數量的slots來控制並行的任務數。
上圖A和C是source function,E是sink function,小數字表示並行度。
一個TM是一個JVM進程,它經過多線程完成任務。線程的隔離不太好,一個線程失敗有可能致使整個TM失敗。
Highly-Available Setup
從失敗中恢復須要重啓失敗進程、做業和恢復它的state。
當一個TM掛掉而RM又沒法找到空閒的資源時,就只能暫時下降並行度,直到有空閒的資源重啓TM。
當JM掛掉就靠ZK來從新選舉,和找到JM存儲到遠程storage的元數據、JobGraph。重啓JM並從最後一個完成的checkpoint開始。
JM在執行期間會獲得每一個task checkpoints的state存儲路徑(task將state寫到遠程storage)並寫到遠程storage,同時在ZK的存儲路徑留下pointer指明到哪裏找上面的存儲路徑。
背壓
數據涌入的速度大於處理速度。在source operator中,可經過Kafka解決。在任務間的operator有以下機制應對:
Local exchange:task1和2在同一個工做節點,那麼buffer pool能夠直接交給下一個任務,但下一個任務task2消費buffer pool中的信息速度減慢時,當前任務task1填充buffer pool的速度也會減慢。
Remote exchange:TM保證每一個task至少有一個incoming和一個outgoing緩衝區。當下遊receiver的處理速度低於上有的sender的發送速度,receiver的incoming緩衝區就會開始積累數據(須要空閒的buffer來放從TCP鏈接中接收的數據),當擠滿後就再也不接收數據。上游sender利用netty水位機制,當網絡中的緩衝數據過多時暫停發送。
TM負責數據在tasks間的轉移,轉移以前會存儲到buffer(這又變回micro-batches)。每一個TM有32KB的網絡buffer用於接收和發送數據。若是sender和receiver在不一樣進程,那麼會經過操做系統的網絡棧來通訊。每對TM保持permanent TCP鏈接來交換數據。每一個sender任務可以給全部receiving任務發送數據,反之,全部receiver任務可以接收全部sender任務的數據。TM保證每一個任務都至少有一個incoming和outgoing的buffer,並增長額外的緩衝區分配約束來避免死鎖。
若是sender和receiver任務在同一個TM進程,sender會序列化結果數據到buffer,若是滿了就放到隊列。receiver任務經過隊列獲得數據並進行反序列化。這樣的好處是解耦任務並容許在任務中使用可變對象,從而減小了對象實例化和垃圾收集。一旦數據被序列化,就能安全地修改。而缺點是計算消耗大,在一些條件下可以把task穿起來,避免序列化。(C10)
Flow Control with Back Pressure
receiver放到緩衝區的數據變爲隊列,sender將要發送的數據變爲隊列,最後sender減慢發送速度。
event time處理的數據必須有時間戳(Long unix timestamp)並定義了watermarks。watermark是一種特殊的records holding a timestamp long value。它必須是遞增的(防止倒退),有一個timestamp t(下圖的5),暗示全部接下來的數據都會大於這個值。後來的,小於這個值,就被視爲遲來數據,Flink有其餘機制處理。
Watermarks and Event Time
WM在Flink是一種特殊的record,它會被operator tasks接收和釋放。
tasks有時間服務來維持timers(timers註冊到時間服務上),在time-window task中,timers分別記錄了各個window的結束時間。當任務得到一個watermark時,task會根據這個watermark的timestamp更新內部的event-time clock。任務內部的時間服務肯定全部timers時間是否小於watermark的timestamp,若是大於則觸發call-back算子來釋放記錄並返回結果。最後task還會將更新的event-time clock的WM進行廣播。(結合下圖理解)
只有ProcessFunction能夠讀取和修改timestamp或者watermark(The ProcessFunction
can read the timestamp of a currently processed record, request the current event-time of the operator, and register timers)。下面是PF的行爲。
當收到WM大於全部目前擁有的WM,就會把event-time clock更新爲全部WM中最小的那個,並廣播這個最小的WM。即使是多個streams輸入,機制也同樣,只是增長Paritition WM數量。這種機制要求得到的WM必須是累加的,並且task必須有新的WM接收,不然clock就不會更新,task的timers就不會被觸發。另外,當多個streams輸入時,timers會被WM比較離散的stream主導,從而使更密集的stream的state不斷積累。
Timestamp Assignment and Watermark Generation
當streaming application消化流時產生。Flink有三種方式產生:
AssignerWithPeriodicWatermarks
提取每條記錄的timestamp,並週期性的查詢當前WM,即上圖的Partition WM。AssignerWithPunctuatedWatermarks
能夠從每條數據提取WM。上面兩個User-defined timestamp assignment functions一般用在source operator附近,由於stream一經處理就很難把握record的時間順序了。因此UDF能夠修改timestamp和WM,但在數據處理時使用不是一個好主意。
由任務維護並用於計算函數結果的全部數據都屬於任務的state。其實state能夠理解爲task業務邏輯的本地或實例變量。
在Flink,state老是和特定的operator關聯。operator須要註冊它的state,而state有兩種類型:
上面兩種state的存在方式有兩種:raw和managed,通常都是用後者,也推薦用後者(更好的內存管理、不需造輪子)。
State Backends
state backend決定了state如何被存儲、訪問和維持。它的主要職責是本地state管理和checkpoint state到遠程。在管理方面,可選擇將state存儲到內存仍是磁盤。checkpoint方面在C8詳細介紹。
MemoryStateBackend, FsStateBackend, RocksDBStateBackend適合愈來愈大的state。都支持異步checkpoint,其中RocksDB還支持incremental的checkpoint。
Scaling Stateful Operators
Flink會根據input rate調整併發度。對於stateful operators有如下4種方式:
keyed state:根據key group來調整,即分爲同一組的key-value會被分到相同的task
list state:全部list entries會被收集並從新均勻分佈,當增長併發度時,要新建list
union list state:增長併發時,廣播整個list,因此rescaling後,全部task都有全部的list state。
Flink’s Lightweight Checkpointing Algorithm
在分佈式開照算法Chandy-Lamport的基礎上實現。有一種特殊的record叫checkpoint barrier(由JM產生),它帶有checkpoint ID來把流進行劃分。在CB前面的records會被包含到checkpoint,以後的會被包含在以後的checkpoint。
當source task收到這種信息,就會中止發送recordes,觸發state backend對本地state的checkpoint,並廣播checkpoint ID到全部下游task。當checkpoint完成時,state backend喚醒source task,後者向JM肯定相應的checkpoint ID已經完成任務。
當下遊得到其中一個CB時,就會暫停處理這個CB對應的source的數據(完成checkpoint後發送的數據),並將這些數據存到緩衝區,直到其餘相同ID的CB都到齊,就會把state(下圖的十二、8)進行checkpoint,並廣播CB到下游。直到全部CB被廣播到下游,纔開始處理排隊在緩衝區的數據。固然,其餘沒有發送CB的source的數據會繼續處理。
最後,當全部sink會向JM發送BC肯定checkpoint已完成。
這種機制還有兩個優化:
Recovery from Consistent Checkpoints
上圖隊列中的7和6之因此能恢復,取決於數據源是否resettable,如Kafka,不會由於發送信息就把信息刪除。這才能實現處理過程的exactly-once state consistency(嚴格來說,數據仍是被重複處理,可是在讀檔後重復的)。可是下游系統有可能接收到多個結果。這方面,Flink提供sink算子實現output的exactly-once,例如給checkpoint提交records釋放記錄。另外一個方法是idempotent updates,詳細看C7。
Savepoints
checkpoints加上一些額外的元數據,功能也是在checkpoint的基礎上豐富。不一樣於checkpoints,savepoint不會被Flink自動創造(由用戶或者外部scheduler觸發創造)和銷燬。savepoint能夠重啓不一樣但兼容的做業,從而:
也能夠用於暫停做業,經過savepoint查看做業狀況。
參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske