【4.分佈式計算】flink

flink有批處理和流處理的計算功能,其中批處理是用流計算來模擬,更多數據處理見:https://segmentfault.com/a/11...,分佈式部署;計算相關的並行模式,流處理時間窗口,容錯處理,增量計算等。
官方:https://flink.apache.orghtml

邏輯架構

clipboard.png

clipboard.png

部署架構

  • Standalone
    job manager(master)+wokers
  • 基於yarn的部署
    clipboard.png
  • HA:job manager單點
    Standalone : Zookeeper
    對於 Yarn Cluaster 模式來講,Flink 就要依靠 Yarn 自己來對 JobManager 作 HA 了。其實這裏徹底是 Yarn 的機制。對於 Yarn Cluster 模式來講,JobManager 和 TaskManager 都是被 Yarn 啓動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之爲 Flink Application Master。也就說它的故障恢復,就徹底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同樣)。

運行架構

clipboard.png

  • client
    當用戶提交一個Flink程序時,會首先建立一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集羣中處理,因此Client須要從用戶提交的Flink程序配置中獲取JobManager的地址,並創建到JobManager的鏈接,將Flink Job提交給JobManager。Flink程序=》JobGraph(Flink Dataflow:多個JobVertex組成的DAG,一個JobGraph包含了一個Flink程序的以下信息:JobID、Job名稱、配置信息、一組JobVertex等)。
  • jobmanager
    它負責接收Flink Job,調度組成Job的多個Task的執行。同時,JobManager還負責收集Job的狀態信息,並管理Flink集羣中從節點TaskManager。JobManager所負責的各項管理功能,它接收到並處理的事件主要包括:RegisterTaskManager,SubmitJob,CancelJob,UpdateTaskExecutionState,RequestNextInputSplit,JobStatusChanged
  • worker
    JVM進程多線程,task slot內存隔離資源單位,一個job的的多謳歌subtask能夠共享slot,

計算模式

在 Hadoop 中 Map 和 Reduce 是兩個獨立調度的 Task,而且都會去佔用計算資源。對 Flink 來講 MapReduce 是一個 Pipeline 的 Task,只佔用一個計算資源web

clipboard.png

https://ci.apache.org/project...
以上有6個源,6個map,6個reduce。在2個TM(每一個3個slots)的並行執行方式以下算法

clipboard.png

clipboard.png
其中每一個可並行的有一個JV和並行的EV.好比source會在一個JV中保含6個EV,ExecutionGraph還包含IntermediateResult和IntermediateResultPartition。前者跟蹤IntermediateDataSet的狀態,後者是每一個分區的狀態。apache

clipboard.png

窗口與時間

支持窗口

1)傾斜窗口(Tumbling Windows,記錄沒有重疊,固定窗口大小時間間隔)
2)滑動窗口(Slide Windows,記錄有重疊,固定窗口大小和窗口間隔)
3)會話窗口(Session Windows,在內部,會話窗口操做員爲每一個到達的記錄建立一個新窗口,若是它們彼此之間的距離比定義的間隙更接近,則將窗口合併在一塊兒。爲了可合併的,會話窗口操做者須要一個合併觸發器和一個合併 的窗函數)
4)全局窗口 全局窗口自動以觸發器,自定義聚合方式等,
能夠基於時間或數據計數(https://flink.apache.org/news...segmentfault

支持時間

事件時間,到達時間,處理時間
基於事件時間(事件建立時間)的水位線watermark算法(延後固定或推理出的關係式個時長,以便排除事件發生處處理的時長,來收集此刻建立的事件流):
clipboard.pngwindows

當一、watermark時間 >= window_end_time(對於out-of-order以及正常的數據而言)&& 二、在[window_start_time,window_end_time)中有數據存在 時窗口關閉開始計算
以下圖:設定的maxOutOfOrderness=10000L(10s),窗口3s
clipboard.png緩存

  • 按期水位線
    用戶定義maxOutOfOrderness,兩次水位線之間的數據能夠用來調用方法生成下一次的時間,再日後推遲maxOutOfOrderness的時間便可。好比多線程

    class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
        val maxOutOfOrderness = 3500L; // 3.5 seconds
        var currentMaxTimestamp: Long;
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            val timestamp = element.getCreationTime()
            currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
            timestamp;
        }
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
  • 標點水位線
    數據流中有標記事件才調用extractTimestamp生成新的wartermark
  • 對於map等在graph中交叉的流事件時間,是取輸入流事件時間的最小時間
  • 遲到事件:
    從新激活已經關閉的窗口並從新計算以修正結果。要保存上次結果從新計算,可能每一個遲到事件都要觸發。
    將遲到事件收集起來另外處理。直接返回收集結果
    將遲到事件視爲錯誤消息並丟棄。

容錯

  • 流障礙注入
    快照:https://arxiv.org/pdf/1506.08...
    源於Chandy-Lamport算法https://lamport.azurewebsites...
    https://ci.apache.org/project...
    過程:
    將流障礙被注入流源的並行數據流中,檢查點協調器會將快照n的屏障發送到其全部輸出流中。
    一旦接收器操做員(流式DAG的末端)從其全部輸入流接收到障礙n,本身狀態持久化,向快照n確認檢查點協調器。
    在全部接收器確認快照後,它被視爲已完成。
    一旦完成了快照n,做業將永遠再也不向源請求來自Sn以前的記錄,由於此時這些記錄(及其後代記錄)將經過整個數據流拓撲。

    clipboard.png

  • 徹底一次調用保證
    對齊(google的millwheel用的每一個數據生成惟一編號,dedup去重實現exactly-once(milwheel)) 接收到一個流的n後,這個流的數據暫存,直到其餘流也到n,對其發出快照。

    clipboard.png

  • 存儲
    狀態也要存儲(轉換函數,系統窗口數據緩衝區等等),信息很大,單獨state backend存儲,可存儲在HDFS中(選項有內存,rocksdb等)

    clipboard.png

內部優化

避免特定狀況下Shuffle、排序等昂貴操做,中間結果有必要進行緩存架構

批處理

Flink執行批處理程序做爲流程序的特殊狀況,其中流是有界的(有限數量的元素)。所以,上述概念以相同的方式應用於批處理程序,而且它們適用於流程序,除了少數例外:
批處理程序的容錯不使用檢查點。經過徹底重放流來進行恢復。成本更低。
支持迭代計算。機器學習

相關文章
相關標籤/搜索