Apache Flink 進階(六):Flink 做業執行深度解析

做者:嶽猛
整理:毛鶴數組

本文根據 Apache Flink 系列直播課程整理而成,由 Apache Flink Contributor、網易雲音樂實時計算平臺研發工程師嶽猛分享。主要分享內容爲 Flink Job 執行做業的流程,文章將從兩個方面進行分享:一是如何從 Program 到物理執行計劃,二是生成物理執行計劃後該如何調度和執行。緩存

Flink 四層轉化流程

Flink 有四層轉換流程,第一層爲 Program 到 StreamGraph;第二層爲 StreamGraph 到 JobGraph;第三層爲 JobGraph 到 ExecutionGraph;第四層爲 ExecutionGraph 到物理執行計劃。經過對 Program 的執行,可以生成一個 DAG 執行圖,即邏輯執行圖。以下:session

img1

第一部分將先講解四層轉化的流程,而後將以詳細案例講解四層的具體轉化。架構

  • 第一層 StreamGraph 從 Source 節點開始,每一次 transform 生成一個 StreamNode,兩個 StreamNode 經過 StreamEdge 鏈接在一塊兒,造成 StreamNode 和 StreamEdge 構成的DAG。
  • 第二層 JobGraph,依舊從 Source 節點開始,而後去遍歷尋找可以嵌到一塊兒的 operator,若是可以嵌到一塊兒則嵌到一塊兒,不能嵌到一塊兒的單獨生成 jobVertex,經過 JobEdge 連接上下游 JobVertex,最終造成 JobVertex 層面的 DAG。
  • JobVertex DAG 提交到任務之後,從 Source 節點開始排序,根據 JobVertex 生成ExecutionJobVertex,根據 jobVertex的IntermediateDataSet 構建IntermediateResult,而後 IntermediateResult 構建上下游的依賴關係,造成 ExecutionJobVertex 層面的 DAG 即 ExecutionGraph。
  • 最後經過 ExecutionGraph 層到物理執行層。

Program 到 StreamGraph 的轉化

Program 轉換成 StreamGraph 具體分爲三步:併發

  • 從 StreamExecutionEnvironment.execute 開始執行程序,將 transform 添加到 StreamExecutionEnvironment 的 transformations。
  • 調用 StreamGraphGenerator 的 generateInternal 方法,遍歷 transformations 構建 StreamNode 及 StreamEage。
  • 經過 StreamEdge 鏈接 StreamNode。

img2

經過 WindowWordCount 來看代碼到 StreamGraph 的轉化,在 flatMap transform 設置 slot 共享組爲 flatMap_sg,併發設置爲 4,在聚合的操做中設置 slot 共享組爲 sum_sg, sum() 和 counts() 併發設置爲 3,這樣設置主要是爲了演示後面如何嵌到一塊兒的,跟上下游節點的併發以及上游的共享組有關。app

WindowWordCount 代碼中能夠看到,在 readTextFile() 中會生成一個 transform,且 transform 的 ID 是 1;而後到 flatMap() 會生成一個 transform, transform 的 ID 是 2;接着到 keyBy() 生成一個 transform 的 ID 是 3;再到 sum() 生成一個 transform 的 ID 是 4;最後到 counts()生成 transform 的 ID 是 5。框架

img3

transform 的結構如圖所示,第一個是 flatMap 的 transform,第二個是 window 的 transform,第三個是 SinkTransform 的 transform。除此以外,還能在 transform 的結構中看到每一個 transform 的 input 是什麼。frontend

接下來介紹一下 StreamNode 和 StreamEdge。分佈式

  • StreamNode 是用來描述 operator 的邏輯節點,其關鍵成員變量有 slotSharingGroup、jobVertexClass、inEdges、outEdges以及transformationUID;
  • StreamEdge 是用來描述兩個 operator 邏輯的連接邊,其關鍵變量有 sourceVertex、targetVertex。

img4

WindowWordCount transform 到 StreamGraph 轉化如圖所示,StreamExecutionEnvironment 的 transformations 存在 3 個 transform,分別是 Flat Map(Id 2)、Window(Id 4)、Sink(Id 5)。ide

transform 的時候首先遞歸處理 transform 的 input,生成 StreamNode,而後經過 StreamEdge 連接上下游 StreamNode。須要注意的是,有些 transform 操做並不會生成StreamNode 如 PartitionTransformtion,而是生成個虛擬節點。

img5

在轉換完成後能夠看到,streamNodes 有四種 transform 形式,分別爲 Source、Flat Map、Window、Sink。

img6

每一個 streamNode 對象都攜帶併發個數、slotSharingGroup、執行類等運行信息。

StreamGraph 到 JobGraph 的轉化

img7

StreamGraph 到 JobGraph 的轉化步驟:

  • 設置調度模式,Eager 全部節點當即啓動。
  • 廣度優先遍歷 StreamGraph,爲每一個 streamNode 生成 byte 數組類型的 hash 值。
  • 從 source 節點開始遞歸尋找嵌到一塊兒的 operator,不能嵌到一塊兒的節點單獨生成 jobVertex,可以嵌到一塊兒的開始節點生成 jobVertex,其餘節點以序列化的形式寫入到 StreamConfig,而後 merge 到 CHAINED_TASK_CONFIG,再經過 JobEdge 連接上下游 JobVertex。
  • 將每一個 JobVertex 的入邊(StreamEdge)序列化到該 StreamConfig。
  • 根據 group name 爲每一個 JobVertext 指定 SlotSharingGroup。
  • 配置 checkpoint。
  • 將緩存文件存文件的配置添加到 configuration 中。
  • 設置 ExecutionConfig。

從 source 節點遞歸尋找嵌到一塊兒的 operator 中,嵌到一塊兒須要知足必定的條件,具體條件介紹以下:

  • 下游節點只有一個輸入。
  • 下游節點的操做符不爲 null。
  • 上游節點的操做符不爲 null。
  • 上下游節點在一個槽位共享組內。
  • 下游節點的鏈接策略是 ALWAYS。
  • 上游節點的鏈接策略是 HEAD 或者 ALWAYS。
  • edge 的分區函數是 ForwardPartitioner 的實例。
  • 上下游節點的並行度相等。
  • 能夠進行節點鏈接操做。

img8

JobGraph 對象結構如上圖所示,taskVertices 中只存在 Window、Flat Map、Source 三個 TaskVertex,Sink operator 被嵌到 window operator 中去了。

爲何要爲每一個 operator 生成 hash 值?

Flink 任務失敗的時候,各個 operator 是可以從 checkpoint 中恢復到失敗以前的狀態的,恢復的時候是依據 JobVertexID(hash 值)進行狀態恢復的。相同的任務在恢復的時候要求 operator 的 hash 值不變,所以可以獲取對應的狀態。

每一個 operator 是怎樣生成 hash 值的?

若是用戶對節點指定了一個散列值,則基於用戶指定的值可以產生一個長度爲 16 的字節數組。若是用戶沒有指定,則根據當前節點所處的位置,產生一個散列值。

考慮的因素主要有三點:

  • 一是在當前 StreamNode 以前已經處理過的節點的個數,做爲當前 StreamNode 的 id,添加到 hasher 中;
  • 二是遍歷當前 StreamNode 輸出的每一個 StreamEdge,並判斷當前 StreamNode 與這個 StreamEdge 的目標 StreamNode 是否能夠進行連接,若是能夠,則將目標 StreamNode 的 id 也放入 hasher 中,且這個目標 StreamNode 的 id 與當前 StreamNode 的 id 取相同的值;
  • 三是將上述步驟後產生的字節數據,與當前 StreamNode 的全部輸入 StreamNode 對應的字節數據,進行相應的位操做,最終獲得的字節數據,就是當前 StreamNode 對應的長度爲 16 的字節數組。

JobGraph 到 ExexcutionGraph 以及物理執行計劃

img9

JobGraph 到 ExexcutionGraph 以及物理執行計劃的流程:

  • 將 JobGraph 裏面的 jobVertex 從 Source 節點開始排序。
  • 在 executionGraph.attachJobGraph(sortedTopology)方法裏面,根據 JobVertex 生成 ExecutionJobVertex,在 ExecutionJobVertex 構造方法裏面,根據 jobVertex 的 IntermediateDataSet 構建 IntermediateResult,根據 jobVertex 併發構建 ExecutionVertex,ExecutionVertex 構建的時候,構建 IntermediateResultPartition(每個 Execution 構建 IntermediateResult 數個IntermediateResultPartition );將建立的 ExecutionJobVertex 與前置的 IntermediateResult 鏈接起來。
  • 構建 ExecutionEdge ,鏈接到前面的 IntermediateResultPartition,最終從 ExecutionGraph 到物理執行計劃。

Flink Job 執行流程

Flink On Yarn 模式

img10

基於 Yarn 層面的架構相似 Spark on Yarn 模式,都是由 Client 提交 App 到 RM 上面去運行,而後 RM 分配第一個 container 去運行 AM,而後由 AM 去負責資源的監督和管理。須要說明的是,Flink 的 Yarn 模式更加相似 Spark on Yarn 的 cluster 模式,在 cluster 模式中,dirver 將做爲 AM 中的一個線程去運行。Flink on Yarn 模式也是會將 JobManager 啓動在 container 裏面,去作個 driver 相似的任務調度和分配,Yarn AM 與 Flink JobManager 在同一個 Container 中,這樣 AM 能夠知道 Flink JobManager 的地址,從而 AM 能夠申請 Container 去啓動 Flink TaskManager。待 Flink 成功運行在 Yarn 集羣上,Flink Yarn Client 就能夠提交 Flink Job 到 Flink JobManager,並進行後續的映射、調度和計算處理。

Fink on Yarn 的缺陷

  • 資源分配是靜態的,一個做業須要在啓動時獲取所需的資源而且在它的生命週期裏一直持有這些資源。這致使了做業不能隨負載變化而動態調整,在負載降低時沒法歸還空閒的資源,在負載上升時也沒法動態擴展。
  • On-Yarn 模式下,全部的 container 都是固定大小的,致使沒法根據做業需求來調整 container 的結構。譬如 CPU 密集的做業或許須要更多的核,但不須要太多內存,固定結構的 container 會致使內存被浪費。
  • 與容器管理基礎設施的交互比較笨拙,須要兩個步驟來啓動 Flink 做業: 1.啓動 Flink 守護進程;2.提交做業。若是做業被容器化而且將做業部署做爲容器部署的一部分,那麼將再也不須要步驟2。
  • On-Yarn 模式下,做業管理頁面會在做業完成後消失不可訪問。
  • Flink 推薦 per job clusters 的部署方式,可是又支持能夠在一個集羣上運行多個做業的 session 模式,使人疑惑。

在 Flink 版本 1.5 中引入了 Dispatcher,Dispatcher 是在新設計裏引入的一個新概念。Dispatcher 會從 Client 端接受做業提交請求並表明它在集羣管理器上啓動做業。

引入 Dispatcher 的緣由主要有兩點:

  • 第一,一些集羣管理器須要一箇中心化的做業生成和監控實例;
  • 第二,可以實現 Standalone 模式下 JobManager 的角色,且等待做業提交。在一些案例中,Dispatcher 是可選的(Yarn)或者不兼容的(kubernetes)。

資源調度模型重構下的 Flink On Yarn 模式

img11

沒有 Dispatcher job 運行過程

客戶端提交 JobGraph 以及依賴 jar 包到 YarnResourceManager,接着 Yarn ResourceManager 分配第一個 container 以此來啓動 AppMaster,Application Master 中會啓動一個 FlinkResourceManager 以及 JobManager,JobManager 會根據 JobGraph 生成的 ExecutionGraph 以及物理執行計劃向 FlinkResourceManager 申請 slot,FlinkResoourceManager 會管理這些 slot 以及請求,若是沒有可用 slot 就向 Yarn 的 ResourceManager 申請 container,container 啓動之後會註冊到 FlinkResourceManager,最後 JobManager 會將 subTask deploy 到對應 container 的 slot 中去。

img12

在有 Dispatcher 的模式下

會增長一個過程,就是 Client 會直接經過 HTTP Server 的方式,而後用 Dispatcher 將這個任務提交到 Yarn ResourceManager 中。

新框架具備四大優點,詳情以下:

  • client 直接在 Yarn 上啓動做業,而不須要先啓動一個集羣而後再提交做業到集羣。所以 client 再提交做業後能夠立刻返回。
  • 全部的用戶依賴庫和配置文件都被直接放在應用的 classpath,而不是用動態的用戶代碼 classloader 去加載。
  • container 在須要時才請求,再也不使用時會被釋放。
  • 「須要時申請」的 container 分配方式容許不一樣算子使用不一樣 profile (CPU 和內存結構)的 container。

新的資源調度框架下 single cluster job on Yarn 流程介紹

img13

single cluster job on Yarn 模式涉及三個實例對象:

  • clifrontend

    • Invoke App code;
    • 生成 StreamGraph,而後轉化爲 JobGraph;
  • YarnJobClusterEntrypoint(Master)

    • 依次啓動 YarnResourceManager、MinDispatcher、JobManagerRunner 三者都服從分佈式協同一致的策略;
    • JobManagerRunner 將 JobGraph 轉化爲 ExecutionGraph ,而後轉化爲物理執行任務Execution,而後進行 deploy,deploy 過程會向 YarnResourceManager 請求 slot,若是有直接 deploy 到對應的 YarnTaskExecutiontor 的 slot 裏面,沒有則向 Yarn 的 ResourceManager 申請,帶 container 啓動之後 deploy。
  • YarnTaskExecutorRunner (slave)

    • 負責接收 subTask,並運行。

整個任務運行代碼調用流程以下圖:

img14

subTask 在執行時是怎麼運行的?

調用 StreamTask 的 invoke 方法,執行步驟以下:
* initializeState()即operator的initializeState()
* openAllOperators() 即operator的open()方法
* 最後調用 run 方法來進行真正的任務處理

咱們來看下 flatMap 對應的 OneInputStreamTask 的 run 方法具體是怎麼處理的。

@Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }

最終是調用 StreamInputProcessor 的 processInput() 作數據的處理,這裏麪包含用戶的處理邏輯。

public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();
                    //處理watermark
                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        //watermark處理邏輯,這裏可能引發timer的trigger
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                        //處理latency watermark
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        //用戶的真正的代碼邏輯
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            //處理數據
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }
            
            //這裏會進行checkpoint barrier的判斷和對齊,以及不一樣partition 裏面checkpoint barrier不一致時候的,數據buffer,

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }

streamOperator.processElement(record) 最終會調用用戶的代碼處理邏輯,假如 operator 是 StreamFlatMap 的話,

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);//用戶代碼
    }

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索