透過源碼看懂Flink核心框架的執行流程

前言

Flink是大數據處理領域最近很火的一個開源的分佈式、高性能的流式處理框架,其對數據的處理能夠達到毫秒級別。本文以一個來自官網的WordCount例子爲引,全面闡述flink的核心架構及執行流程,但願讀者能夠藉此更加深刻的理解Flink邏輯。html

本文跳過了一些基本概念,若是對相關概念感到迷惑,請參考官網文檔。另外在本文寫做過程當中,Flink正式發佈了其1.5 RELEASE版本,在其發佈以後完成的內容將按照1.5的實現來組織。java

 

1.從 Hello,World WordCount開始

首先,咱們把WordCount的例子再放一遍:mysql

複製代碼
    public class SocketTextStreamWordCount {
    
    public static void main(String[] args) throws Exception {
        if (args.length != 2){
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostName = args[0];
        Integer port = Integer.parseInt(args[1]);
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        // get input data
        DataStream<String> text = env.socketTextStream(hostName, port);
        
        text.flatMap(new LineSplitter()).setParallelism(1)
        // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0)
                .sum(1).setParallelism(1)
                .print();

        // execute program
        env.execute("Java WordCount from SocketTextStream Example");
    }
    
        /**
         * Implements the string tokenizer that splits sentences into words as a user-defined
         * FlatMapFunction. The function takes a line (String) and splits it into
         * multiple pairs in the form of "(word,1)" (Tuple2&lt;String, Integer&gt;).
         */
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    }
複製代碼

首先從命令行中獲取socket對端的ip和端口,而後啓動一個執行環境,從socket中讀取數據,split成單個單詞的流,並按單詞進行總和的計數,最後打印出來。這個例子相信接觸過大數據計算或者函數式編程的人都能看懂,就不過多解釋了。程序員

 

1.1 flink執行環境

程序的啓動,從這句開始:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
這行代碼會返回一個可用的執行環境。執行環境是整個flink程序執行的上下文,記錄了相關配置(如並行度等),並提供了一系列方法,如讀取輸入流的方法,以及真正開始運行整個代碼的execute方法等。對於分佈式流處理程序來講,咱們在代碼中定義的flatMap,keyBy等等操做,事實上能夠理解爲一種聲明,告訴整個程序咱們採用了什麼樣的算子,而真正開啓計算的代碼不在此處。因爲咱們是在本地運行flink程序,所以這行代碼會返回一個LocalStreamEnvironment,最後咱們要調用它的execute方法來開啓真正的任務。咱們先接着往下看。web

 

1.2 算子(Operator)的註冊(聲明)

咱們以flatMap爲例,text.flatMap(new LineSplitter())這一句話跟蹤進去是這樣的:redis

複製代碼
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);

        return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

    }
複製代碼

 

裏面完成了兩件事,一是用反射拿到了flatMap算子的輸出類型,二是生成了一個Operator。flink流式計算的核心概念,就是將數據從輸入流一個個傳遞給Operator進行鏈式處理,最後交給輸出流的過程。對數據的每一次處理在邏輯上成爲一個operator,而且爲了本地化處理的效率起見,operator之間也能夠串成一個chain一塊兒處理(能夠參考責任鏈模式幫助理解)。下面這張圖代表了flink是如何看待用戶的處理流程的:抽象化爲一系列operator,以source開始,以sink結尾,中間的operator作的操做叫作transform,而且能夠把幾個操做串在一塊兒執行。
image_1cae39t06eoo3ml1be8o0412c69.png-43.5kB
咱們也能夠更改flink的設置,要求它不要對某個操做進行chain處理,或者從某個操做開啓一個新chain等。
上面代碼中的最後一行transform方法的做用是返回一個SingleOutputStreamOperator,它繼承了Datastream類而且定義了一些輔助方法,方便對流的操做。在返回以前,transform方法還把它註冊到了執行環境中(後面生成執行圖的時候還會用到它)。其餘的操做,包括keyBy,sum和print,都只是不一樣的算子,在這裏出現都是同樣的效果,即生成一個operator並註冊給執行環境用於生成DAG。spring

 

1.3 程序的執行

程序執行即env.execute("Java WordCount from SocketTextStream Example")這行代碼。sql

 

1.3.1 本地模式下的execute方法

這行代碼主要作了如下事情:數據庫

  • 生成StreamGraph。表明程序的拓撲結構,是從用戶代碼直接生成的圖。
  • 生成JobGraph。這個圖是要交給flink去生成task的圖。
  • 生成一系列配置
  • 將JobGraph和配置交給flink集羣去運行。若是不是本地運行的話,還會把jar文件經過網絡發給其餘節點。
  • 以本地模式運行的話,能夠看到啓動過程,如啓動性能度量、web模塊、JobManager、ResourceManager、taskManager等等
  • 啓動任務。值得一提的是在啓動任務以前,先啓動了一個用戶類加載器,這個類加載器能夠用來作一些在運行時動態加載類的工做。
 

1.3.2 遠程模式(RemoteEnvironment)的execute方法

遠程模式的程序執行更加有趣一點。第一步仍然是獲取StreamGraph,而後調用executeRemotely方法進行遠程執行。
該方法首先建立一個用戶代碼加載器apache

ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,   getClass().getClassLoader());

 

而後建立一系列配置,交給Client對象。Client這個詞有意思,看見它就知道這裏絕對是跟遠程集羣打交道的客戶端。

複製代碼
        ClusterClient client;

        try {
            client = new StandaloneClusterClient(configuration);
            client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
        }
        }
        try {
            return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();
        }
複製代碼

 

client的run方法首先生成一個JobGraph,而後將其傳遞給JobClient。關於Client、JobClient、JobManager到底誰管誰,能夠看這張圖:
image_1cae7g15p6k94no1ves121c5pd9.png-19.7kB
確切的說,JobClient負責以異步的方式和JobManager通訊(Actor是scala的異步模塊),具體的通訊任務由JobClientActor完成。相對應的,JobManager的通訊任務也由一個Actor完成。

複製代碼
        JobListeningContext jobListeningContext = submitJob(
                actorSystem,config,highAvailabilityServices,jobGraph,timeout,sysoutLogUpdates,    classLoader);

        return awaitJobResult(jobListeningContext);
複製代碼

 

能夠看到,該方法阻塞在awaitJobResult方法上,並最終返回了一個JobListeningContext,透過這個Context能夠獲得程序運行的狀態和結果。

 

1.3.3 程序啓動過程

上面提到,整個程序真正意義上開始執行,是這裏:

 
  1. env.execute("Java WordCount from SocketTextStream Example");

遠程模式和本地模式有一點不一樣,咱們先按本地模式來調試。
咱們跟進源碼,(在本地調試模式下)會啓動一個miniCluster,而後開始執行代碼:

複製代碼
// LocalStreamEnvironment.java

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        
        //生成各類圖結構
        ......

        try {
            //啓動集羣,包括啓動JobMaster,進行leader選舉等等
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
            
            //提交任務到JobMaster
            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }
複製代碼

 

這個方法裏有一部分邏輯是與生成圖結構相關的,咱們放在第二章裏講;如今咱們先接着往裏跟:

複製代碼
//MiniCluster.java
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        checkNotNull(job, "job is null");
        
        //在這裏,最終把job提交給了jobMaster
        final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

        final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
            (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

    ......
    
    }
複製代碼

 

正如我在註釋裏寫的,這一段代碼核心邏輯就是調用那個submitJob方法。那麼咱們再接着看這個方法:

複製代碼
    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        final DispatcherGateway dispatcherGateway;
        try {
            dispatcherGateway = getDispatcherGateway();
        } catch (LeaderRetrievalException | InterruptedException e) {
            ExceptionUtils.checkInterrupted(e);
            return FutureUtils.completedExceptionally(e);
        }

        // we have to allow queued scheduling in Flip-6 mode because we need to request slots
        // from the ResourceManager
        jobGraph.setAllowQueuedScheduling(true);

        final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);

        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
        
        //在這裏執行了真正的submit操做
            (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));

        return acknowledgeCompletableFuture.thenApply(
            (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
    }
複製代碼

 

這裏的Dispatcher是一個接收job,而後指派JobMaster去啓動任務的類,咱們能夠看看它的類結構,有兩個實現。在本地環境下啓動的是MiniDispatcher,在集羣上提交任務時,集羣上啓動的是StandaloneDispatcher
image_1cenfj3p9fp110p0a8unn1mrh9.png-27.4kB

那麼這個Dispatcher又作了什麼呢?它啓動了一個JobManagerRunner(這裏我要吐槽Flink的命名,這個東西應該叫作JobMasterRunner纔對,flink裏的JobMaster和JobManager不是一個東西),委託JobManagerRunner去啓動該Job的JobMaster。咱們看一下對應的代碼:

複製代碼
//jobManagerRunner.java
    private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {

        ......

        final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);

        ......
    }
複製代碼

 

而後,JobMaster通過了一堆方法嵌套以後,執行到了這裏:

複製代碼
    private void scheduleExecutionGraph() {
        checkState(jobStatusListener == null);
        // register self as job status change listener
        jobStatusListener = new JobManagerJobStatusListener();
        executionGraph.registerJobStatusListener(jobStatusListener);

        try {
            //這裏調用了ExecutionGraph的啓動方法
            executionGraph.scheduleForExecution();
        }
        catch (Throwable t) {
            executionGraph.failGlobal(t);
        }
    }
複製代碼

 

咱們知道,flink的框架裏有三層圖結構,其中ExecutionGraph就是真正被執行的那一層,因此到這裏爲止,一個任務從提交到真正執行的流程就走完了,咱們再回顧一下(順便提一下遠程提交時的流程區別):

  • 客戶端代碼的execute方法執行;
  • 本地環境下,MiniCluster完成了大部分任務,直接把任務委派給了MiniDispatcher;
  • 遠程環境下,啓動了一個RestClusterClient,這個類會以HTTP Rest的方式把用戶代碼提交到集羣上;
  • 遠程環境下,請求發到集羣上以後,必然有個handler去處理,在這裏是JobSubmitHandler。這個類接手了請求後,委派StandaloneDispatcher啓動job,到這裏以後,本地提交和遠程提交的邏輯日後又統一了;
  • Dispatcher接手job以後,會實例化一個JobManagerRunner,而後用這個runner啓動job;
  • JobManagerRunner接下來把job交給了JobMaster去處理;
  • JobMaster使用ExecutionGraph的方法啓動了整個執行圖;整個任務就啓動起來了。

至此,第一部分就講完了。

 

2.理解flink的圖結構

第一部分講到,咱們的主函數最後一項任務就是生成StreamGraph,而後生成JobGraph,而後以此開始調度任務運行,因此接下來咱們從這裏入手,繼續探索flink。

 

2.1 flink的三層圖結構

事實上,flink總共提供了三種圖的抽象,咱們前面已經提到了StreamGraph和JobGraph,還有一種是ExecutionGraph,是用於調度的基本數據結構。
image_1caf1oll019fp1odv1bh9idosr79.png-486.3kB
上面這張圖清晰的給出了flink各個圖的工做原理和轉換過程。其中最後一個物理執行圖並不是flink的數據結構,而是程序開始執行後,各個task分佈在不一樣的節點上,所造成的物理上的關係表示。

  • 從JobGraph的圖裏能夠看到,數據從上一個operator流到下一個operator的過程當中,上游做爲生產者提供了IntermediateDataSet,而下游做爲消費者須要JobEdge。事實上,JobEdge是一個通訊管道,鏈接了上游生產的dataset和下游的JobVertex節點。
  • 在JobGraph轉換到ExecutionGraph的過程當中,主要發生瞭如下轉變:
    • 加入了並行度的概念,成爲真正可調度的圖結構
    • 生成了與JobVertex對應的ExecutionJobVertex,ExecutionVertex,與IntermediateDataSet對應的IntermediateResult和IntermediateResultPartition等,並行將經過這些類實現
  • ExecutionGraph已經能夠用於調度任務。咱們能夠看到,flink根據該圖生成了一一對應的Task,每一個task對應一個ExecutionGraph的一個Execution。Task用InputGate、InputChannel和ResultPartition對應了上面圖中的IntermediateResult和ExecutionEdge。

那麼,flink抽象出這三層圖結構,四層執行邏輯的意義是什麼呢?
StreamGraph是對用戶邏輯的映射。JobGraph在此基礎上進行了一些優化,好比把一部分操做串成chain以提升效率。ExecutionGraph是爲了調度存在的,加入了並行處理的概念。而在此基礎上真正執行的是Task及其相關結構。

 

2.2 StreamGraph的生成

在第一節的算子註冊部分,咱們能夠看到,flink把每個算子transform成一個對流的轉換(好比上文中返回的SingleOutputStreamOperator是一個DataStream的子類),而且註冊到執行環境中,用於生成StreamGraph。實際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations) 其中的transformations是一個list,裏面記錄的就是咱們在transform方法中放進來的算子。

 

2.2.1 StreamTransformation類表明了流的轉換

StreamTransformation表明了從一個或多個DataStream生成新DataStream的操做。順便,DataStream類在內部組合了一個StreamTransformation類,實際的轉換操做均經過該類完成。
image_1caf64b7c1gjnv2eebi1v9e1cvum.png-129.4kB
咱們能夠看到,從source到各類map,union再到sink操做所有被映射成了StreamTransformation。
其映射過程以下所示:
image_1caf6ak4rkqsc1u1hci93fe0d13.png-36.6kB

以MapFunction爲例:

  • 首先,用戶代碼裏定義的UDF會被看成其基類對待,而後交給StreamMap這個operator作進一步包裝。事實上,每個Transformation都對應了一個StreamOperator。
  • 因爲map這個操做只接受一個輸入,因此再被進一步包裝爲OneInputTransformation。
  • 最後,將該transformation註冊到執行環境中,當執行上文提到的generate方法時,生成StreamGraph圖結構。

    另外,並非每個 StreamTransformation 都會轉換成runtime層中的物理操做。有一些只是邏輯概念,好比union、split/select、partition等。以下圖所示的轉換樹,在運行時會優化成下方的操做圖。
    image_1caf71h79s0s3fodem1aeb1j3m1g.png-83.8kB

 

2.2.2 StreamGraph生成函數分析

咱們從StreamGraphGenerator.generate()方法往下看:

複製代碼
    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
        return new StreamGraphGenerator(env).generateInternal(transformations);
    }
    
    //注意,StreamGraph的生成是從sink開始的
    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }
    
    //這個方法的核心邏輯就是判斷傳入的steamOperator是哪一種類型,並執行相應的操做,詳情見下面那一大堆if-else
    private Collection<Integer> transform(StreamTransformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        //這裏對操做符的類型進行判斷,並以此調用相應的處理邏輯.簡而言之,處理的核心無非是遞歸的將該節點和節點的上游節點加入圖
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        //注意這裏和函數開始時的方法相對應,在有向圖中要注意避免循環的產生
        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() > 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }
複製代碼

 

由於map,filter等經常使用操做都是OneInputStreamOperator,咱們就來看看transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。

複製代碼
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        Collection<Integer> inputIds = transform(transform.getInput());

        // 在遞歸處理節點過程當中,某個節點可能已經被其餘子節點先處理過了,須要跳過
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        //這裏是獲取slotSharingGroup。這個group用來定義當前咱們在處理的這個操做符能夠跟什麼操做符chain到一個slot裏進行操做
        //由於有時候咱們可能不滿意flink替咱們作的chain聚合
        //一個slot就是一個執行task的基本容器
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        //把該operator加入圖
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());
        
        //對於keyedStream,咱們還要記錄它的keySelector方法
        //flink並不真正爲每一個keyedStream保存一個key,而是每次須要用到key的時候都使用keySelector方法進行計算
        //所以,咱們自定義的keySelector方法須要保證冪等性
        //到後面介紹keyGroup的時候咱們還會再次提到這一點
        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
        
        //爲當前節點和它的依賴節點創建邊
        //這裏能夠看到以前提到的select union partition等邏輯節點被合併入edge的過程
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }
    
    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        addEdgeInternal(upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                null,
                new ArrayList<String>(),
                null);

    }
    //addEdge的實現,會合並一些邏輯節點
    private void addEdgeInternal(Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag) {
        //若是輸入邊是側輸出節點,則把side的輸入邊做爲本節點的輸入邊,並遞歸調用
        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
            //若是輸入邊是select,則把select的輸入邊做爲本節點的輸入邊
        } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
            if (outputNames.isEmpty()) {
                // selections that happen downstream override earlier selections
                outputNames = virtualSelectNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
            //若是是partition節點
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
        } else {
        //正常的edge處理邏輯
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner<Object>();
            }

            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow " +
                            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);

            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }
複製代碼

 

 

2.2.3 WordCount函數的StreamGraph

flink提供了一個StreamGraph可視化顯示工具,在這裏
咱們能夠把咱們的程序的執行計劃打印出來System.out.println(env.getExecutionPlan()); 複製到這個網站上,點擊生成,如圖所示:
image_1cafgsliu1n2n1uj21p971b0h6m71t.png-25.7kB
能夠看到,咱們源程序被轉化成了4個operator。
另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程。因爲我設定了每一個操做符的並行度都是1,因此在每一個操做符之間都是直接FORWARD,不存在shuffle的過程。

 

2.3 JobGraph的生成

flink會根據上一步生成的StreamGraph生成JobGraph,而後將JobGraph發送到server端進行ExecutionGraph的解析。

 

2.3.1 JobGraph生成源碼

與StreamGraph相似,JobGraph的入口方法是StreamingJobGraphGenerator.createJobGraph()。咱們直接來看源碼

複製代碼
private JobGraph createJobGraph() {

        // 設置啓動模式爲全部節點均在一開始就啓動
        jobGraph.setScheduleMode(ScheduleMode.EAGER);

        // 爲每一個節點生成hash id
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // 爲了保持兼容性建立的hash
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
        //生成jobvertex,串成chain等
        //這裏的邏輯大體能夠理解爲,挨個遍歷節點,若是該節點是一個chain的頭節點,就生成一個JobVertex,若是不是頭節點,就要把自身配置併入頭節點,而後把頭節點和本身的出邊相連;對於不能chain的節點,看成只有頭節點處理便可
        setChaining(hashes, legacyHashes, chainedOperatorHashes);
        //設置輸入邊edge
        setPhysicalEdges();
        //設置slot共享group
        setSlotSharing();
        //配置檢查點
        configureCheckpointing();

        // 若是有以前的緩存文件的配置的話,從新讀入
        for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
            DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
        }

        // 傳遞執行環境配置
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }
複製代碼

 

2.3.2 operator chain的邏輯

爲了更高效地分佈式執行,Flink會盡量地將operator的subtask連接(chain)在一塊兒造成task。每一個task在一個線程中執行。將operators連接成task是很是有效的優化:它能減小線程之間的切換,減小消息的序列化/反序列化,減小數據在緩衝區的交換,減小了延遲的同時提升總體的吞吐量。

image_1cafj7s6bittk5tt0bequlig2a.png-158.7kB
上圖中將KeyAggregation和Sink兩個operator進行了合併,由於這兩個合併後並不會改變總體的拓撲結構。可是,並非任意兩個 operator 就能 chain 一塊兒的,其條件仍是很苛刻的:

  • 上下游的並行度一致
  • 下游節點的入度爲1 (也就是說下游節點沒有來自其餘節點的輸入)
  • 上下游節點都在同一個 slot group 中(下面會解釋 slot group)
  • 下游節點的 chain 策略爲 ALWAYS(能夠與上下游連接,map、flatmap、filter等默認是ALWAYS)
  • 上游節點的 chain 策略爲 ALWAYS 或 HEAD(只能與下游連接,不能與上游連接,Source默認是HEAD)
  • 兩個節點間數據分區方式是 forward(參考理解數據流的分區)
  • 用戶沒有禁用 chain

flink的chain邏輯是一種很常見的設計,好比spring的interceptor也是相似的實現方式。經過把操做符串成一個大操做符,flink避免了把數據序列化後經過網絡發送給其餘節點的開銷,可以大大加強效率。

 

2.3.3 JobGraph的提交

前面已經提到,JobGraph的提交依賴於JobClient和JobManager之間的異步通訊,如圖所示:
image_1cafn516r1p68kt31g7r196rcsv2n.png-40.1kB
在submitJobAndWait方法中,其首先會建立一個JobClientActor的ActorRef,而後向其發起一個SubmitJobAndWait消息,該消息將JobGraph的實例提交給JobClientActor。發起模式是ask,它表示須要一個應答消息。

Future<Object> future = Patterns.ask(jobClientActor, new JobClientMessages.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

 

該SubmitJobAndWait消息被JobClientActor接收後,最終經過調用tryToSubmitJob方法觸發真正的提交動做。當JobManager的actor接收到來自client端的請求後,會執行一個submitJob方法,主要作如下事情:

  • 向BlobLibraryCacheManager註冊該Job;
  • 構建ExecutionGraph對象;
  • 對JobGraph中的每一個頂點進行初始化;
  • 將DAG拓撲中從source開始排序,排序後的頂點集合附加到Exec> - utionGraph對象;
  • 獲取檢查點相關的配置,並將其設置到ExecutionGraph對象;
  • 向ExecutionGraph註冊相關的listener;
  • 執行恢復操做或者將JobGraph信息寫入SubmittedJobGraphStore以在後續用於恢復目的;
  • 響應給客戶端JobSubmitSuccess消息;
  • 對ExecutionGraph對象進行調度執行;

最後,JobManger會返回消息給JobClient,通知該任務是否提交成功。

 

2.4 ExecutionGraph的生成

與StreamGraph和JobGraph不一樣,ExecutionGraph並非在咱們的客戶端程序生成,而是在服務端(JobManager處)生成的,順便flink只維護一個JobManager。其入口代碼是ExecutionGraphBuilder.buildGraph(...)
該方法長200多行,其中一大半是checkpoiont的相關邏輯,咱們暫且略過,直接看核心方法executionGraph.attachJobGraph(sortedTopology)
由於ExecutionGraph事實上只是改動了JobGraph的每一個節點,而沒有對整個拓撲結構進行變更,因此代碼裏只是挨個遍歷jobVertex並進行處理:

複製代碼
for (JobVertex jobVertex : topologiallySorted) {

            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }

            //在這裏生成ExecutionGraph的每一個節點
            //首先是進行了一堆賦值,將任務信息交給要生成的圖節點,以及設定並行度等等
            //而後是建立本節點的IntermediateResult,根據本節點的下游節點的個數肯定建立幾份
            //最後是根據設定好的並行度建立用於執行task的ExecutionVertex
            //若是job有設定inputsplit的話,這裏還要指定inputsplits
            ExecutionJobVertex ejv = new ExecutionJobVertex(
                this,
                jobVertex,
                1,
                rpcCallTimeout,
                globalModVersion,
                createTimestamp);
            
            //這裏要處理全部的JobEdge
            //對每一個edge,獲取對應的intermediateResult,並記錄到本節點的輸入上
            //最後,把每一個ExecutorVertex和對應的IntermediateResult關聯起來
            ejv.connectToPredecessors(this.intermediateResults);

            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                        jobVertex.getID(), ejv, previousTask));
            }

            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                            res.getId(), res, previousDataSet));
                }
            }

            this.verticesInCreationOrder.add(ejv);
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);
        }
複製代碼

 

至此,ExecutorGraph就建立完成了。

 

3. 任務的調度與執行

關於flink的任務執行架構,官網的這兩張圖就是最好的說明:
image_1cafnu1pl1d8c15m219b8vkb2334.png-112.9kB
Flink 集羣啓動後,首先會啓動一個 JobManger 和多個的 TaskManager。用戶的代碼會由JobClient 提交給 JobManager,JobManager 再把來自不一樣用戶的任務發給 不一樣的TaskManager 去執行,每一個TaskManager管理着多個task,task是執行計算的最小結構, TaskManager 將心跳和統計信息彙報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述除了task外的三者均爲獨立的 JVM 進程。
要注意的是,TaskManager和job並不是一一對應的關係。flink調度的最小單元是task而非TaskManager,也就是說,來自不一樣job的不一樣task可能運行於同一個TaskManager的不一樣線程上。
image_1cclle7ui2j41nf611gs1is18m19.png-127.5kB
一個flink任務全部可能的狀態如上圖所示。圖上畫的很明白,就再也不贅述了。

 

3.1 計算資源的調度

Task slot是一個TaskManager內資源分配的最小載體,表明了一個固定大小的資源子集,每一個TaskManager會將其所佔有的資源平分給它的slot。
經過調整 task slot 的數量,用戶能夠定義task之間是如何相互隔離的。每一個 TaskManager 有一個slot,也就意味着每一個task運行在獨立的 JVM 中。每一個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。
而在同一個JVM進程中的task,能夠共享TCP鏈接(基於多路複用)和心跳消息,能夠減小數據的網絡傳輸,也能共享一些數據結構,必定程度上減小了每一個task的消耗。
每一個slot能夠接受單個task,也能夠接受多個連續task組成的pipeline,以下圖所示,FlatMap函數佔用一個taskslot,而key Agg函數和sink函數共用一個taskslot:
image_1cafpf21c1jh3s5ap1fisu4v23h.png-44.7kB
爲了達到共用slot的目的,除了能夠以chain的方式pipeline算子,咱們還能夠容許SlotSharingGroup,以下圖所示:
image_1cafpko68b3r1lk0dpsnmbj3c3u.png-61.2kB
咱們能夠把不能被chain成一條的兩個操做如flatmap和key&sink放在一個TaskSlot裏執行,這樣作能夠得到如下好處:

  • 共用slot使得咱們再也不須要計算每一個任務須要的總task數目,直接取最高算子的並行度便可
  • 對計算資源的利用率更高。例如,一般的輕量級操做map和重量級操做Aggregate再也不分別須要一個線程,而是能夠在同一個線程內執行,並且對於slot有限的場景,咱們能夠增大每一個task的並行度了。
    接下來咱們仍是用官網的圖來講明flink是如何重用slot的:
    image_1cafqroarkjkuje1hfi18gor654b.png-137kB
    1. TaskManager1分配一個SharedSlot0
    2. 把source task放入一個SimpleSlot0,再把該slot放入SharedSlot0
    3. 把flatmap task放入一個SimpleSlot1,再把該slot放入SharedSlot0
    4. 由於咱們的flatmap task並行度是2,所以不能再放入SharedSlot0,因此向TaskMange21申請了一個新的SharedSlot0
    5. 把第二個flatmap task放進一個新的SimpleSlot,並放進TaskManager2的SharedSlot0
    6. 開始處理key&sink task,由於其並行度也是2,因此先把第一個task放進TaskManager1的SharedSlot
    7. 把第二個key&sink放進TaskManager2的SharedSlot
 

3.2 JobManager執行job

JobManager負責接收 flink 的做業,調度 task,收集 job 的狀態、管理 TaskManagers。被實現爲一個 akka actor。

 

3.2.1 JobManager的組件

  • BlobServer 是一個用來管理二進制大文件的服務,好比保存用戶上傳的jar文件,該服務會將其寫到磁盤上。還有一些相關的類,如BlobCache,用於TaskManager向JobManager下載用戶的jar文件
  • InstanceManager 用來管理當前存活的TaskManager的組件,記錄了TaskManager的心跳信息等
  • CompletedCheckpointStore 用於保存已完成的checkpoint相關信息,持久化到內存中或者zookeeper上
  • MemoryArchivist 保存了已經提交到flink的做業的相關信息,如JobGraph等
 

3.2.2 JobManager的啓動過程

先列出JobManager啓動的核心代碼

複製代碼
def runJobManager(
      configuration: Configuration,
      executionMode: JobManagerMode,
      listeningAddress: String,
      listeningPort: Int)
    : Unit = {

    val numberProcessors = Hardware.getNumberCPUCores()

    val futureExecutor = Executors.newScheduledThreadPool(
      numberProcessors,
      new ExecutorThreadFactory("jobmanager-future"))

    val ioExecutor = Executors.newFixedThreadPool(
      numberProcessors,
      new ExecutorThreadFactory("jobmanager-io"))

    val timeout = AkkaUtils.getTimeout(configuration)

    // we have to first start the JobManager ActorSystem because this determines the port if 0
    // was chosen before. The method startActorSystem will update the configuration correspondingly.
    val jobManagerSystem = startActorSystem(
      configuration,
      listeningAddress,
      listeningPort)

    val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
      configuration,
      ioExecutor,
      AddressResolution.NO_ADDRESS_RESOLUTION)

    val metricRegistry = new MetricRegistryImpl(
      MetricRegistryConfiguration.fromConfiguration(configuration))

    metricRegistry.startQueryService(jobManagerSystem, null)

    val (_, _, webMonitorOption, _) = try {
      startJobManagerActors(
        jobManagerSystem,
        configuration,
        executionMode,
        listeningAddress,
        futureExecutor,
        ioExecutor,
        highAvailabilityServices,
        metricRegistry,
        classOf[JobManager],
        classOf[MemoryArchivist],
        Option(classOf[StandaloneResourceManager])
      )
    } catch {
      case t: Throwable =>
        futureExecutor.shutdownNow()
        ioExecutor.shutdownNow()

        throw t
    }

    // block until everything is shut down
    jobManagerSystem.awaitTermination()
    
    .......
}
複製代碼

 

  • 配置Akka並生成ActorSystem,啓動JobManager
  • 啓動HA和metric相關服務
  • startJobManagerActors()方法中啓動JobManagerActors,以及webserver,TaskManagerActor,ResourceManager等等
  • 阻塞等待終止
  • 集羣經過LeaderService等選出JobManager的leader
 

3.2.3 JobManager啓動Task

JobManager 是一個Actor,經過各類消息來完成核心邏輯:

複製代碼
override def handleMessage: Receive = {
  case GrantLeadership(newLeaderSessionID) =>
    log.info(s"JobManager $getAddress was granted leadership with leader session ID " +
      s"$newLeaderSessionID.")
    leaderSessionID = newLeaderSessionID
    
    .......
複製代碼

 

有幾個比較重要的消息:

  • GrantLeadership 得到leader受權,將自身被分發到的 session id 寫到 zookeeper,並恢復全部的 jobs
  • RevokeLeadership 剝奪leader受權,打斷清空全部的 job 信息,可是保留做業緩存,註銷全部的 TaskManagers
  • RegisterTaskManagers 註冊 TaskManager,若是以前已經註冊過,則只給對應的 Instance 發送消息,不然啓動註冊邏輯:在 InstanceManager 中註冊該 Instance 的信息,並中止 Instance BlobLibraryCacheManager 的端口【供下載 lib 包用】,同時使用 watch 監聽 task manager 的存活
  • SubmitJob 提交 jobGraph
    最後一項SubmintJob就是咱們要關注的,從客戶端收到JobGraph,轉換爲ExecutionGraph並執行的過程。
複製代碼
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
    
    ......
    
    executionGraph = ExecutionGraphBuilder.buildGraph(
          executionGraph,
          jobGraph,
          flinkConfiguration,
          futureExecutor,
          ioExecutor,
          scheduler,
          userCodeLoader,
          checkpointRecoveryFactory,
          Time.of(timeout.length, timeout.unit),
          restartStrategy,
          jobMetrics,
          numSlots,
          blobServer,
          log.logger)
          
    ......
    
    if (leaderElectionService.hasLeadership) {
            log.info(s"Scheduling job $jobId ($jobName).")
            
            executionGraph.scheduleForExecution()
            
          } else {
            self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))

            log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
              "this. I am not scheduling the job for execution.")
    
    ......
}
複製代碼

 

首先作一些準備工做,而後獲取一個ExecutionGraph,判斷是不是恢復的job,而後將job保存下來,而且通知客戶端本地已經提交成功了,最後若是確認本JobManager是leader,則執行executionGraph.scheduleForExecution()方法,這個方法通過一系列調用,把每一個ExecutionVertex傳遞給了Excution類的deploy方法:

複製代碼
public void deploy() throws JobException {

        ......

        try {
            // good, we are allowed to deploy
            if (!slot.setExecutedVertex(this)) {
                throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
            }

            // race double check, did we fail/cancel and do we need to release the slot?
            if (this.state != DEPLOYING) {
                slot.releaseSlot();
                return;
            }

            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
                        attemptNumber, getAssignedResourceLocation().getHostname()));
            }

            final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
                attemptId,
                slot,
                taskState,
                attemptNumber);

            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);

            ......
        }
        catch (Throwable t) {
            markFailed(t);
            ExceptionUtils.rethrow(t);
        }
    }
複製代碼

 

咱們首先生成了一個TaskDeploymentDescriptor,而後交給了taskManagerGateway.submitTask()方法執行。接下來的部分,就屬於TaskManager的範疇了。

 

3.3 TaskManager執行task

 

3.3.1 TaskManager的基本組件

TaskManager是flink中資源管理的基本組件,是全部執行任務的基本容器,提供了內存管理、IO管理、通訊管理等一系列功能,本節對各個模塊進行簡要介紹。
1. MemoryManager flink並無把全部內存的管理都委託給JVM,由於JVM廣泛存在着存儲對象密度低、大內存時GC對系統影響大等問題。因此flink本身抽象了一套內存管理機制,將全部對象序列化後放在本身的MemorySegment上進行管理。MemoryManger涉及內容較多,將在後續章節進行繼續剖析。
2. IOManager flink經過IOManager管理磁盤IO的過程,提供了同步和異步兩種寫模式,又進一步區分了block、buffer和bulk三種讀寫方式。
IOManager提供了兩種方式枚舉磁盤文件,一種是直接遍歷文件夾下全部文件,另外一種是計數器方式,對每一個文件名以遞增順序訪問。
在底層,flink將文件IO抽象爲FileIOChannle,封裝了底層實現。
image_1cag7idg4vfj1l871n0l1k0e1f7u4o.png-194.1kB
能夠看到,flink在底層實際上都是以異步的方式進行讀寫。
3. NetworkEnvironment 是TaskManager的網絡 IO 組件,包含了追蹤中間結果和數據交換的數據結構。它的構造器會統一將配置的內存先分配出來,抽象成 NetworkBufferPool 統一管理內存的申請和釋放。意思是說,在輸入和輸出數據時,無論是保留在本地內存,等待chain在一塊兒的下個操做符進行處理,仍是經過網絡把本操做符的計算結果發送出去,都被抽象成了NetworkBufferPool。後續咱們還將對這個組件進行詳細分析。

 

3.3.2 TaskManager執行Task

對於TM來講,執行task就是把收到的TaskDeploymentDescriptor對象轉換成一個task並執行的過程。TaskDeploymentDescriptor這個類保存了task執行所必須的全部內容,例如序列化的算子,輸入的InputGate和輸出的ResultPartition的定義,該task要做爲幾個subtask執行等等。
按照正常邏輯思惟,很容易想到TM的submitTask方法的行爲:首先是確認資源,如尋找JobManager和Blob,然後創建鏈接,解序列化算子,收集task相關信息,接下來就是建立一個新的Task對象,這個task對象就是真正執行任務的關鍵所在。

複製代碼
val task = new Task(
        jobInformation,
        taskInformation,
        tdd.getExecutionAttemptId,
        tdd.getAllocationId,
        tdd.getSubtaskIndex,
        tdd.getAttemptNumber,
        tdd.getProducedPartitions,
        tdd.getInputGates,
        tdd.getTargetSlotNumber,
        tdd.getTaskStateHandles,
        memoryManager,
        ioManager,
        network,
        bcVarManager,
        taskManagerConnection,
        inputSplitProvider,
        checkpointResponder,
        blobCache,
        libCache,
        fileCache,
        config,
        taskMetricGroup,
        resultPartitionConsumableNotifier,
        partitionStateChecker,
        context.dispatcher)
複製代碼

 

若是讀者是從頭開始看這篇blog,裏面有不少對象應該已經比較明確其做用了(除了那個brVarManager,這個是管理廣播變量的,廣播變量是一類會被分發到每一個任務中的共享變量)。接下來的主要任務,就是把這個task啓動起來,而後報告說已經啓動task了:

// all good, we kick off the task, which performs its own initialization
task.startTaskThread()

sender ! decorateMessage(Acknowledge.get())

 

3.3.2.1 生成Task對象

在執行new Task()方法時,第一步是把構造函數裏的這些變量賦值給當前task的fields。
接下來是初始化ResultPartition和InputGate。這兩個類描述了task的輸出數據和輸入數據。

複製代碼
for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) {
    ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);

    this.producedPartitions[counter] = new ResultPartition(
        taskNameWithSubtaskAndId,
        this,
        jobId,
        partitionId,
        desc.getPartitionType(),
        desc.getNumberOfSubpartitions(),
        desc.getMaxParallelism(),
        networkEnvironment.getResultPartitionManager(),
        resultPartitionConsumableNotifier,
        ioManager,
        desc.sendScheduleOrUpdateConsumersMessage());        
    //爲每一個partition初始化對應的writer 
    writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);

    ++counter;
}

// Consumed intermediate result partitions
this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
this.inputGatesById = new HashMap<>();

counter = 0;

for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) {
    SingleInputGate gate = SingleInputGate.create(
        taskNameWithSubtaskAndId,
        jobId,
        executionId,
        inputGateDeploymentDescriptor,
        networkEnvironment,
        this,
        metricGroup.getIOMetricGroup());

    inputGates[counter] = gate;
    inputGatesById.put(gate.getConsumedResultId(), gate);

    ++counter;
}
複製代碼

 

最後,建立一個Thread對象,並把本身放進該對象,這樣在執行時,本身就有了自身的線程的引用。

 

3.3.2.2 運行Task對象

Task對象自己就是一個Runable,所以在其run方法裏定義了運行邏輯。
第一步是切換Task的狀態:

複製代碼
        while (true) {
            ExecutionState current = this.executionState;
            ////若是當前的執行狀態爲CREATED,則將其設置爲DEPLOYING狀態
            if (current == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    // success, we can start our work
                    break;
                }
            }
            //若是當前執行狀態爲FAILED,則發出通知並退出run方法
            else if (current == ExecutionState.FAILED) {
                // we were immediately failed. tell the TaskManager that we reached our final state
                notifyFinalState();
                if (metrics != null) {
                    metrics.close();
                }
                return;
            }
            //若是當前執行狀態爲CANCELING,則將其修改成CANCELED狀態,並退出run
            else if (current == ExecutionState.CANCELING) {
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    // we were immediately canceled. tell the TaskManager that we reached our final state
                    notifyFinalState();
                    if (metrics != null) {
                        metrics.close();
                    }
                    return;
                }
            }
            //不然說明發生了異常
            else {
                if (metrics != null) {
                    metrics.close();
                }
                throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
            }
        }
複製代碼

 

接下來,就是導入用戶類加載器並加載用戶代碼。
而後,是向網絡管理器註冊當前任務(flink的各個算子在運行時進行數據交換須要依賴網絡管理器),分配一些緩存以保存數據
而後,讀入指定的緩存文件。
而後,再把task建立時傳入的那一大堆變量用於建立一個執行環境Envrionment。
再而後,對於那些並非第一次執行的task(好比失敗後重啓的)要恢復其狀態。
接下來最重要的是

  1. invokable.invoke();

方法。爲何這麼說呢,由於這個方法就是用戶代碼所真正被執行的入口。好比咱們寫的什麼new MapFunction()的邏輯,最終就是在這裏被執行的。這裏說一下這個invokable,這是一個抽象類,提供了能夠被TaskManager執行的對象的基本抽象。
這個invokable是在解析JobGraph的時候生成相關信息的,並在此處造成真正可執行的對象

// now load the task's invokable code
//經過反射生成對象
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

 

image_1cbkaa8r9182i18ct1kfu8g829m9.png-29.9kB
上圖顯示了flink提供的可被執行的Task類型。從名字上就能夠看出各個task的做用,在此再也不贅述。
接下來就是invoke方法了,由於咱們的wordcount例子用了流式api,在此咱們以StreamTask的invoke方法爲例進行說明。

 

3.3.2.3 StreamTask的執行邏輯

先上部分核心代碼:

複製代碼
public final void invoke() throws Exception {

    boolean disposed = false;
    try {
            // -------- Initialize ---------
            //先作一些賦值操做
            ......

    // if the clock is not already set, then assign a default TimeServiceProvider
    //處理timer
    if (timerService == null) {
        ThreadFactory timerThreadFactory =
            new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());

        timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
    }

    //把以前JobGraph串起來的chain的信息造成實現
    operatorChain = new OperatorChain<>(this);
    headOperator = operatorChain.getHeadOperator();

    // task specific initialization
    //這個init操做的起名很是詭異,由於這裏主要是處理算子採用了自定義的checkpoint檢查機制的狀況,可是起了一個很是大衆臉的名字
    init();

    // save the work of reloading state, etc, if the task is already canceled
    if (canceled) {
        throw new CancelTaskException();
    }

    // -------- Invoke --------
    LOG.debug("Invoking {}", getName());
            
    // we need to make sure that any triggers scheduled in open() cannot be
    // executed before all operators are opened
    synchronized (lock) {

        // both the following operations are protected by the lock
        // so that we avoid race conditions in the case that initializeState()
        // registers a timer, that fires before the open() is called.

        //初始化操做符狀態,主要是一些state啥的
        initializeState();
        //對於富操做符,執行其open操做
        openAllOperators();
    }

    // final check to exit early before starting to run
    f (canceled) {
        throw new CancelTaskException();
    }

    // let the task do its work
    //真正開始執行的代碼
    isRunning = true;
    run();
複製代碼

 

StreamTask.invoke()方法裏,第一個值得一說的是TimerService。Flink在2015年決定向StreamTask類加入timer service的時候解釋到:

This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase.

第二個要注意的是chain操做。前面提到了,flink會出於優化的角度,把一些算子chain成一個總體的算子做爲一個task來執行。好比wordcount例子中,Source和FlatMap算子就被chain在了一塊兒。在進行chain操做的時候,會設定頭節點,而且指定輸出的RecordWriter。

接下來不出所料仍然是初始化,只不過初始化的對象變成了各個operator。若是是有checkpoint的,那就從state信息裏恢復,否則就做爲全新的算子處理。從源碼中能夠看到,flink針對keyed算子和普通算子作了不一樣的處理。keyed算子在初始化時須要計算出一個group區間,這個區間的值在整個生命週期裏都不會再變化,後面key就會根據hash的不一樣結果,分配到特定的group中去計算。順便提一句,flink的keyed算子保存的是對每一個數據的key的計算方法,而非真實的key,用戶須要本身保證對每一行數據提供的keySelector的冪等性。至於爲何要用KeyGroup的設計,這就牽扯到擴容的範疇了,將在後面的章節進行講述。
對於openAllOperators()方法,就是對各類RichOperator執行其open方法,一般可用於在執行計算以前加載資源。
最後,run方法千呼萬喚始出來,該方法通過一系列跳轉,最終調用chain上的第一個算子的run方法。在wordcount的例子中,它最終調用了SocketTextStreamFunction的run,創建socket鏈接並讀入文本。

 

3.4 StreamTask與StreamOperator

前面提到,Task對象在執行過程當中,把執行的任務交給了StreamTask這個類去執行。在咱們的wordcount例子中,實際初始化的是OneInputStreamTask的對象(參考上面的類圖)。那麼這個對象是如何執行用戶的代碼的呢?

複製代碼
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
複製代碼

 

它作的,就是把任務直接交給了InputProcessor去執行processInput方法。這是一個StreamInputProcessor的實例,該processor的任務就是處理輸入的數據,包括用戶數據、watermark和checkpoint數據等。咱們先來看看這個processor是如何產生的:

複製代碼
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();

        TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
        int numberOfInputs = configuration.getNumberOfInputs();

        if (numberOfInputs > 0) {
            InputGate[] inputGates = getEnvironment().getAllInputGates();

            inputProcessor = new StreamInputProcessor<>(
                    inputGates,
                    inSerializer,
                    this,
                    configuration.getCheckpointMode(),
                    getCheckpointLock(),
                    getEnvironment().getIOManager(),
                    getEnvironment().getTaskManagerInfo().getConfiguration(),
                    getStreamStatusMaintainer(),
                    this.headOperator);

            // make sure that stream tasks report their I/O statistics
            inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
        }
    }
複製代碼

 

這是OneInputStreamTask的init方法,從configs裏面獲取StreamOperator信息,生成本身的inputProcessor。那麼inputProcessor是如何處理數據的呢?咱們接着跟進源碼:

複製代碼
public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        }

        //這個while是用來處理單個元素的(不要想固然覺得是循環處理元素的)
        while (true) {
            //注意 1在下面
            //2.接下來,會利用這個反序列化器獲得下一個數據記錄,並進行解析(是用戶數據仍是watermark等等),而後進行對應的操做
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycle();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    //若是元素是watermark,就準備更新當前channel的watermark值(並非簡單賦值,由於有亂序存在),
                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                    //若是元素是status,就進行相應處理。能夠看做是一個flag,標誌着當前stream接下來即將沒有元素輸入(idle),或者當前即將由空閒狀態轉爲有元素狀態(active)。同時,StreamStatus還對如何處理watermark有影響。經過發送status,上游的operator能夠很方便的通知下游當前的數據流的狀態。
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                    //LatencyMarker是用來衡量代碼執行時間的。在Source處建立,攜帶建立時的時間戳,流到Sink時就能夠知道通過了多長時間
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                    //這裏就是真正的,用戶的代碼即將被執行的地方。從章節1到這裏足足用了三萬字,有點萬里長征的感受
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            //1.程序首先獲取下一個buffer
            //這一段代碼是服務於flink的FaultTorrent機制的,後面我會講到,這裏只需理解到它會嘗試獲取buffer,而後賦值給當前的反序列化器
            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }
複製代碼

 

到此爲止,以上部分就是一個flink程序啓動後,到執行用戶代碼以前,flink框架所作的準備工做。回顧一下:

  • 啓動一個環境
  • 生成StreamGraph
  • 註冊和選舉JobManager
  • 在各節點生成TaskManager,並根據JobGraph生成對應的Task
  • 啓動各個task,準備執行代碼

接下來,咱們挑幾個Operator看看flink是如何抽象這些算子的。

 

4. StreamOperator的抽象與實現

 

4.1 數據源的邏輯——StreamSource與時間模型

StreamSource抽象了一個數據源,而且指定了一些如何處理數據的模式。

複製代碼
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
        extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

    ......

    public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(lockingObject, streamStatusMaintainer, output);
    }

    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector) throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        LatencyMarksEmitter latencyEmitter = null;
        if (getExecutionConfig().isLatencyTrackingEnabled()) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                getExecutionConfig().getLatencyTrackingInterval(),
                getOperatorConfig().getVertexID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }

        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);

        try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }

    ......

    private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(
                final ProcessingTimeService processingTimeService,
                final Output<StreamRecord<OUT>> output,
                long latencyTrackingInterval,
                final int vertexID,
                final int subtaskIndex) {

            latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        try {
                            // ProcessingTimeService callbacks are executed under the checkpointing lock
                            output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
                        } catch (Throwable t) {
                            // we catch the Throwables here so that we don't trigger the processing
                            // timer services async exception handler
                            LOG.warn("Error while emitting latency marker.", t);
                        }
                    }
                },
                0L,
                latencyTrackingInterval);
        }

        public void close() {
            latencyMarkTimer.cancel(true);
        }
    }
}
複製代碼

 

在StreamSource生成上下文以後,接下來就是把上下文交給SourceFunction去執行:

  1. userFunction.run(ctx);

SourceFunction是對Function的一個抽象,就好像MapFunction,KeyByFunction同樣,用戶選擇實現這些函數,而後flink框架就能利用這些函數進行計算,完成用戶邏輯。
咱們的wordcount程序使用了flink提供的一個SocketTextStreamFunction。咱們能夠看一下它的實現邏輯,對source如何運行有一個基本的認識:

複製代碼
public void run(SourceContext<String> ctx) throws Exception {
        final StringBuilder buffer = new StringBuilder();
        long attempt = 0;

        while (isRunning) {

            try (Socket socket = new Socket()) {
                currentSocket = socket;

                LOG.info("Connecting to server socket " + hostname + ':' + port);
                socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                char[] cbuf = new char[8192];
                int bytesRead;
                //核心邏輯就是一直讀inputSocket,而後交給collect方法
                while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
                    buffer.append(cbuf, 0, bytesRead);
                    int delimPos;
                    while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
                        String record = buffer.substring(0, delimPos);
                        // truncate trailing carriage return
                        if (delimiter.equals("\n") && record.endsWith("\r")) {
                            record = record.substring(0, record.length() - 1);
                        }
                        //讀到數據後,把數據交給collect方法,collect方法負責把數據交到合適的位置(如發佈爲br變量,或者交給下個operator,或者經過網絡發出去)
                        ctx.collect(record);
                        buffer.delete(0, delimPos + delimiter.length());
                    }
                }
            }

            // if we dropped out of this loop due to an EOF, sleep and retry
            if (isRunning) {
                attempt++;
                if (maxNumRetries == -1 || attempt < maxNumRetries) {
                    LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
                    Thread.sleep(delayBetweenRetries);
                }
                else {
                    // this should probably be here, but some examples expect simple exists of the stream source
                    // throw new EOFException("Reached end of stream and reconnects are not enabled.");
                    break;
                }
            }
        }

        // collect trailing data
        if (buffer.length() > 0) {
            ctx.collect(buffer.toString());
        }
    }
複製代碼

 

整段代碼裏,只有collect方法有些複雜度,後面咱們在講到flink的對象機制時會結合來說,此處知道collect方法會收集結果,而後發送給接收者便可。在咱們的wordcount裏,這個算子的接收者就是被chain在一塊兒的flatmap算子,不記得這個示例程序的話,能夠返回第一章去看一下。

 

4.2 從數據輸入到數據處理——OneInputStreamOperator & AbstractUdfStreamOperator

StreamSource是用來開啓整個流的算子,而承接輸入數據並進行處理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。
image_1cdc1tbgs136k1ppf17at14fumjf2d.png-126.7kB
整個StreamOperator的繼承關係如上圖所示(圖很大,建議點開放大看)。
OneInputStreamOperator這個接口的邏輯很簡單:

複製代碼
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {

    /**
     * Processes one element that arrived at this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement(StreamRecord<IN> element) throws Exception;

    /**
     * Processes a {@link Watermark}.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     */
    void processWatermark(Watermark mark) throws Exception;

    void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}
複製代碼

 

而實現了這個接口的StreamFlatMap算子也很簡單,沒什麼可說的:

複製代碼
public class StreamFlatMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
}
複製代碼

 

從類圖裏能夠看到,flink爲咱們封裝了一個算子的基類AbstractUdfStreamOperator,提供了一些通用功能,好比把context賦給算子,保存快照等等,其中最爲你們瞭解的應該是這兩個:

複製代碼
    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, new Configuration());
    }

    @Override
    public void close() throws Exception {
        super.close();
        functionsClosed = true;
        FunctionUtils.closeFunction(userFunction);
    }
複製代碼

 

這兩個就是flink提供的Rich***Function系列算子的open和close方法被執行的地方。

 

4.3 StreamSink

StreamSink着實沒什麼可說的,邏輯很簡單,值得一提的只有兩個方法:

複製代碼
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        sinkContext.element = element;
        userFunction.invoke(element.getValue(), sinkContext);
    }

    @Override
    protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
        // all operators are tracking latencies
        this.latencyGauge.reportLatency(maker, true);

        // sinks don't forward latency markers
    }
複製代碼

 

其中,processElement 是繼承自StreamOperator的方法。reportOrForwardLatencyMarker是用來計算延遲的,前面提到StreamSource會產生LateMarker,用於記錄數據計算時間,就是在這裏完成了計算。

算子這部分邏輯相對簡單清晰,就講這麼多吧。

 

5. 爲執行保駕護航——Fault Tolerant與保證Exactly-Once語義

 

5.1 Fault Tolerant演進之路

對於7×24小時不間斷運行的流程序來講,要保證fault tolerant是很難的,這不像是離線任務,若是失敗了只須要清空已有結果,從新跑一次就能夠了。對於流任務,若是要保證可以從新處理已處理過的數據,就要把數據保存下來;而這就面臨着幾個問題:好比一是保存多久的數據?二是重複計算的數據應該怎麼處理,怎麼保證冪等性?
對於一個流系統,咱們有如下但願:

  1. 最好能作到exactly-once
  2. 處理延遲越低越好
  3. 吞吐量越高越好
  4. 計算模型應當足夠簡單易用,又具備足夠的表達力
  5. 從錯誤恢復的開銷越低越好
  6. 足夠的流控制能力(背壓能力)
 

5.1.1 Storm的Record acknowledgement模式

storm的fault tolerant是這樣工做的:每個被storm的operator處理的數據都會向其上一個operator發送一份應答消息,通知其已被下游處理。storm的源operator保存了全部已發送的消息的每個下游算子的應答消息,當它收到來自sink的應答時,它就知道該消息已經被完整處理,能夠移除了。
若是沒有收到應答,storm就會重發該消息。顯而易見,這是一種at least once的邏輯。另外,這種方式面臨着嚴重的冪等性問題,例如對一個count算子,若是count的下游算子出錯,source重發該消息,那麼防止該消息被count兩遍的邏輯須要程序員本身去實現。最後,這樣一種處理方式很是低效,吞吐量很低。

 

5.1.2 Spark streaming的micro batch模式

前面提到,storm的實現方式就註定了與高吞吐量無緣。那麼,爲了提升吞吐量,把一批數據彙集在一塊兒處理就是很天然的選擇。Spark Streaming的實現就是基於這樣的思路:
咱們能夠在徹底的連續計算與徹底的分批計算中間取折中,經過控制每批計算數據的大小來控制延遲與吞吐量的制約,若是想要低延遲,就用小一點的batch,若是想要大吞吐量,就不得不忍受更高的延遲(更久的等待數據到來的時間和更多的計算),以下圖所示。
image_1ceop58ha180p1h3ren58jk15gb9.png-105.7kB
以這樣的方式,能夠在每一個batch中作到exactly-once,可是這種方式也有其弊端:
首先,batch的方式使得一些須要跨batch的操做變得很是困難,例如session window;用戶不得不本身想辦法去實現相關邏輯。
其次,batch模式很難作好背壓。當一個batch由於種種緣由處理慢了,那麼下一個batch要麼不得不容納更多的新來數據,要麼不得不堆積更多的batch,整個任務可能會被拖垮,這是一個很是致命的問題。
最後,batch的方式基本意味着其延遲是有比較高的下限的,實時性上很差。

 

5.1.3 Google Cloud Dataflow的事務式模型

咱們在傳統數據庫,如mysql中使用binlog來完成事務,這樣的思路也能夠被用在實現exactly-once模型中。例如,咱們能夠log下每一個數據元素每一次被處理時的結果和當時所處的操做符的狀態。這樣,當咱們須要fault tolerant時,咱們只須要讀一下log就能夠了。這種模式規避了storm和spark所面臨的問題,而且可以很好的實現exactly-once,惟一的弊端是:如何儘量的減小log的成本?Flink給了咱們答案。

 

5.1.4 Flink的分佈式快照機制

實現exactly-once的關鍵是什麼?是可以準確的知道和快速記錄下來當前的operator的狀態、當前正在處理的元素(以及正處在不一樣算子之間傳遞的元素)。若是上面這些能夠作到,那麼fault tolerant無非就是從持久化存儲中讀取上次記錄的這些元信息,而且恢復到程序中。那麼Flink是如何實現的呢?

Flink的分佈式快照的核心是其輕量級異步分佈式快照機制。爲了實現這一機制,flink引入了一個概念,叫作Barrier。Barrier是一種標記,它被source產生而且插入到流數據中,被髮送到下游節點。當下遊節點處理到該barrier標誌時,這就意味着在該barrier插入到流數據時,已經進入系統的數據在當前節點已經被處理完畢。
image_1ceos05badva20hb5glen1voqm.png-15.3kB

如圖所示,每當一個barrier流過一個算子節點時,就說明了在該算子上,能夠觸發一次檢查點,用以保存當前節點的狀態和已經處理過的數據,這就是一份快照。(在這裏能夠聯想一下micro-batch,把barrier想象成分割每一個batch的邏輯,會好理解一點)這樣的方式下,記錄快照就像和前面提到的micro-batch同樣容易。

與此同時,該算子會向下遊發送該barrier。由於數據在算子之間是按順序發送的,因此當下遊節點收到該barrier時,也就意味着一樣的一批數據在下游節點上也處理完畢,能夠進行一次checkpoint,保存基於該節點的一份快照,快照完成後,會通知JobMananger本身完成了這個快照。這就是分佈式快照的基本含義。

再看這張圖:
image_1ceot7q13apu1a04170af7j1jao34.png-66.6kB
有時,有的算子的上游節點和下游節點都不止一個,應該怎麼處理呢?若是有不止一個下游節點,就向每一個下游發送barrier。同理,若是有不止一個上游節點,那麼就要等到全部上游節點的同一批次的barrier到達以後,才能觸發checkpoint。由於每一個節點運算速度不一樣,因此有的上游節點可能已經在發下個barrier週期的數據了,有的上游節點還沒發送本次的barrier,這時候,當前算子就要緩存一下提早到來的數據,等比較慢的上游節點發送barrier以後,才能處理下一批數據。

當整個程序的最後一個算子sink都收到了這個barrier,也就意味着這個barrier和上個barrier之間所夾雜的這批元素已經所有落袋爲安。這時,最後一個算子通知JobManager整個流程已經完成,而JobManager隨後發出通知,要求全部算子刪除本次快照內容,以完成清理。這整個部分,就是Flink的兩階段提交的checkpoint過程,以下面四幅圖所示:
image_1ceot517e14g31u2u1mnt12o91dkb1g.png-175.5kB

image_1ceot5kqbnik1f2i1dss1q5c1a1t.png-221.3kB

image_1ceot64dppjtojkq3n1jl5j0h2a.png-297.8kB

image_1ceot6kes56sidn1f2u1voo19kf2n.png-255.5kB

總之,經過這種方式,flink實現了咱們前面提到的六項對流處理框架的要求:exactly-once、低延遲、高吞吐、易用的模型、方便的恢復機制。

最後,貼一個美團作的flink與storm的性能對比:flink與storm的性能對比

 

5.2 checkpoint的生命週期

接下來,咱們結合源碼來看看flink的checkpoint究竟是如何實現其生命週期的:

因爲flink提供的SocketSource並不支持checkpoint,因此這裏我以FlinkKafkaConsumer010做爲sourceFunction。

 

5.2.1 觸發checkpoint

要完成一次checkpoint,第一步必然是發起checkpoint請求。那麼,這個請求是哪裏發出的,怎麼發出的,又由誰控制呢?
還記得若是咱們要設置checkpoint的話,須要指定checkpoint間隔吧?既然是一個指定間隔觸發的功能,那應該會有相似於Scheduler的東西存在,flink裏,這個負責觸發checkpoint的類是CheckpointCoordinator

flink在提交job時,會啓動這個類的startCheckpointScheduler方法,以下所示

複製代碼
    public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            periodicScheduling = true;
            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                    new ScheduledTrigger(), 
                    baseInterval, baseInterval, TimeUnit.MILLISECONDS);
        }
    }
    
    private final class ScheduledTrigger implements Runnable {

        @Override
        public void run() {
            try {
                triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint.", e);
            }
        }
    }
複製代碼

 

啓動以後,就會以設定好的頻率調用triggerCheckPoint()方法。這個方法太長,我大概說一下都作了什麼:

  • 檢查符合觸發checkpoint的條件,例如若是禁止了週期性的checkpoint,還沒有達到觸發checkpoint的最小間隔等等,就直接return
  • 檢查是否全部須要checkpoint和須要響應checkpoint的ACK(ack涉及到checkpoint的兩階段提交,後面會講)的task都處於running狀態,不然return
  • 若是都符合,那麼執行checkpointID = checkpointIdCounter.getAndIncrement();以生成一個新的id,而後生成一個PendingCheckpoint。PendingCheckpoint是一個啓動了的checkpoint,可是尚未被確認。等到全部的task都確認了本次checkpoint,那麼這個checkpoint對象將轉化爲一個CompletedCheckpoint
  • 定義一個超時callback,若是checkpoint執行了好久還沒完成,就把它取消
  • 觸發MasterHooks,用戶能夠定義一些額外的操做,用以加強checkpoint的功能(如準備和清理外部資源)
  • 接下來是核心邏輯:
   // send the messages to the tasks that trigger their checkpoint
    for (Execution execution: executions) {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }

 

這裏是調用了Execution的triggerCheckpoint方法,一個execution就是一個executionVertex的實際執行者。咱們看一下這個方法:

複製代碼
    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final LogicalSlot slot = assignedResource;

        if (slot != null) {
        //TaskManagerGateway是用來跟taskManager進行通訊的組件
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }
複製代碼

 

再往下跟就進入了Task類的範疇,咱們將在下一小節進行解讀。本小節主要講了CheckpointCoordinator類是如何觸發一次checkpoint,從其名字也能夠看出來其功能:檢查點協調器。

 

5.2.2 Task層面checkpoint的準備工做

先說Task類中的部分,該類建立了一個CheckpointMetaData的對象,而且生成了一個Runable匿名類用於執行checkpoint,而後以異步的方式觸發了該Runable:

複製代碼
    public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {

            ......

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // set safety net from the task's context for checkpointing thread
                    LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

                    try {
                        boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                        if (!success) {
                            checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                        }
                    }
                    
                    ......
                }
            };
            executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
        }
    }
複製代碼

 

上面代碼裏的invokable事實上就是咱們的StreamTask了。Task類其實是將checkpoint委託給了更具體的類去執行,而StreamTask也將委託給更具體的類,直到業務代碼。
StreamTask是這樣實現的:

  • 若是task還在運行,那就能夠進行checkpoint。方法是先向下游全部出口廣播一個Barrier,而後觸發本task的State保存。
  • 若是task結束了,那咱們就要通知下游取消本次checkpoint,方法是發送一個CancelCheckpointMarker,這是相似於Barrier的另外一種消息。
  • 注意,從這裏開始,整個執行鏈路上開始出現Barrier,能夠和前面講Fault Tolerant原理的地方結合看一下。
複製代碼
    private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        synchronized (lock) {
            if (isRunning) {
            
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            else {

                ......
                
            }
        }
    }
複製代碼

 

完成broadcastCheckpointBarrier方法後,在checkpointState()方法中,StreamTask還作了不少別的工做:

複製代碼
        public void executeCheckpointing() throws Exception {
            
            ......

            try {
                //這裏,就是調用StreamOperator進行snapshotState的入口方法
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                //這裏註冊了一個Runnable,在執行完checkpoint以後向JobManager發出CompletedCheckPoint消息,這也是fault tolerant兩階段提交的一部分
                owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
                
                ......
            
            } 
        }
複製代碼

 

說到checkpoint,咱們印象裏最直觀的感覺確定是咱們的一些作聚合的操做符的狀態保存,好比sum的和以及count的值等等。這些內容就是StreamOperator部分將要觸發保存的內容。能夠看到,除了咱們直觀的這些操做符的狀態保存外,flink的checkpoint作了大量的其餘工做。

接下來,咱們就把目光轉向操做符的checkpoint機制。

 

5.2.3 操做符的狀態保存及barrier傳遞

第四章時,咱們已經瞭解了StreamOperator的類關係,這裏,咱們就直接接着上一節的checkpointStreamOperator(op)方法往下講。
順便,前面也提到了,在進行checkpoint以前,operator初始化時,會執行一個initializeState方法,在該方法中,若是task是從失敗中恢復的話,其保存的state也會被restore進來。

傳遞barrier是在進行本operator的statesnapshot以前完成的,咱們先來看看其邏輯,其實和傳遞一條數據是相似的,就是生成一個CheckpointBarrier對象,而後向每一個streamOutput寫進去:

複製代碼
    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
            for (RecordWriterOutput<?> streamOutput : streamOutputs) {
                streamOutput.broadcastEvent(barrier);
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }
複製代碼

 

下游的operator接收到本barrier,就會觸發其自身的checkpoint。

StreamTask在執行完broadcastCheckpointBarrier以後,
咱們當前的wordcount程序裏有兩個operator chain,分別是:

  • kafka source -> flatmap
  • keyed aggregation -> sink

咱們就按這個順序來捋一下checkpoint的過程。

1.kafka source的checkpoint過程

複製代碼
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }
複製代碼

 

kafka的snapshot邏輯就是記錄一下當前消費的offsets,而後作成tuple(partitiion,offset)放進一個StateBackend裏。StateBackend是flink抽象出來的一個用於保存狀態的接口。

2.FlatMap算子的checkpoint過程
沒什麼可說的,就是調用了snapshotState()方法而已。

3.本operator chain的state保存過程
細心的同窗應該注意到了,各個算子的snapshot方法只把本身的狀態保存到了StateBackend裏,沒有寫入的持久化操做。這部分操做被放到了AbstractStreamOperator中,由flink統一負責持久化。其實不須要看源碼咱們也能想出來,持久化無非就是把這些數據用一個流寫到磁盤或者別的地方,接下來咱們來看看是否是這樣:

複製代碼
            //仍是AbstractStreamOperator.java的snapshotState方法
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
複製代碼

 

那麼這個operatorStateBackend是怎麼保存狀態的呢?
  • 首先把各個算子的state作了一份深拷貝;
  • 而後以異步的方式執行了一個內部類的runnable,該內部類的run方法實現了一個模版方法,首先打開stream,而後寫入數據,而後再關閉stream。

咱們來看看這個寫入數據的方法:

複製代碼
                public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {
                    long asyncStartTime = System.currentTimeMillis();

                    CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;

                    // get the registered operator state infos ...
                    List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =
                        new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                    for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {
                        operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }

                    // ... write them all in the checkpoint stream ...
                    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                    OperatorBackendSerializationProxy backendSerializationProxy =
                        new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                    backendSerializationProxy.write(dov);

                    ......
                    
                }
複製代碼

 

註釋寫的很清楚,我就很少說了。

4.後繼operatorChain的checkpoint過程
前面說到,在flink的流中,barrier流過期會觸發checkpoint。在上面第1步中,上游節點已經發出了Barrier,因此在咱們的keyed aggregation -> sink 這個operatorchain中,咱們將首先捕獲這個barrier。

捕獲barrier的過程其實就是處理input數據的過程,對應着StreamInputProcessor.processInput()方法,該方法咱們在第四章已經講過,這裏咱們簡單回顧一下:

複製代碼
            //每一個元素都會觸發這一段邏輯,若是下一個數據是buffer,則從外圍的while循環裏進入處理用戶數據的邏輯;這個方法裏默默的處理了barrier的邏輯
            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
複製代碼

 

處理barrier的過程在這段代碼裏沒有體現,由於被包含在了getNextNonBlocked()方法中,咱們看下這個方法的核心邏輯:

複製代碼
            //BarrierBuffer.getNextNonBlocked方法
            else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                if (!endOfStream) {
                    // process barriers only if there is a chance of the checkpoint completing
                    processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                }
            }
            else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
            }
複製代碼

 

先提一嘴,你們還記得以前的部分也提到過CheckpointMarker吧,這裏正好也對上了。

處理barrier也是個麻煩事,你們回想一下5.1節提到的屏障的原理圖,一個opertor必須收到從每一個inputchannel發過來的同一序號的barrier以後才能發起本節點的checkpoint,若是有的channel的數據處理的快了,那該barrier後的數據還須要緩存起來,若是有的inputchannel被關閉了,那它就不會再發送barrier過來了:

複製代碼
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
        final long barrierId = receivedBarrier.getId();

        // fast path for single channel cases
        if (totalNumberOfInputChannels == 1) {
            if (barrierId > currentCheckpointId) {
                // new checkpoint
                currentCheckpointId = barrierId;
                notifyCheckpoint(receivedBarrier);
            }
            return;
        }

        // -- general code path for multiple input channels --

        if (numBarriersReceived > 0) {
            // this is only true if some alignment is already progress and was not canceled

            if (barrierId == currentCheckpointId) {
                // regular case
                onBarrier(channelIndex);
            }
            else if (barrierId > currentCheckpointId) {
                // we did not complete the current checkpoint, another started before
                LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                        "Skipping current checkpoint.", barrierId, currentCheckpointId);

                // let the task know we are not completing this
                notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

                // abort the current checkpoint
                releaseBlocksAndResetBarriers();

                // begin a the new checkpoint
                beginNewAlignment(barrierId, channelIndex);
            }
            else {
                // ignore trailing barrier from an earlier checkpoint (obsolete now)
                return;
            }
        }
        else if (barrierId > currentCheckpointId) {
            // first barrier of a new checkpoint
            beginNewAlignment(barrierId, channelIndex);
        }
        else {
            // either the current checkpoint was canceled (numBarriers == 0) or
            // this barrier is from an old subsumed checkpoint
            return;
        }

        // check if we have all barriers - since canceled checkpoints always have zero barriers
        // this can only happen on a non canceled checkpoint
        if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
            // actually trigger checkpoint
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received all barriers, triggering checkpoint {} at {}",
                        receivedBarrier.getId(), receivedBarrier.getTimestamp());
            }

            releaseBlocksAndResetBarriers();
            notifyCheckpoint(receivedBarrier);
        }
    }
複製代碼

 

總之,當收到所有的barrier以後,就會觸發notifyCheckpoint(),該方法又會調用StreamTask的triggerCheckpoint,和以前的operator是同樣的。

若是還有後續的operator的話,就是徹底相同的循環,再也不贅述。

5.報告完成checkpoint事件
當一個operator保存完checkpoint數據後,就會啓動一個異步對象AsyncCheckpointRunnable,用以報告該檢查點已完成,其具體邏輯在reportCompletedSnapshotStates中。這個方法把任務又最終委託給了RpcCheckpointResponder這個類:

複製代碼
checkpointResponder.acknowledgeCheckpoint(
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
複製代碼

 

從這個類也能夠看出來,它的邏輯是經過rpc的方式遠程調JobManager的相關方法完成報告事件,底層也是經過akka實現的。
那麼,誰響應了這個rpc調用呢?是該任務的JobMaster。

複製代碼
    //JobMaster.java
    public void acknowledgeCheckpoint(
            final JobID jobID,
            final ExecutionAttemptID executionAttemptID,
            final long checkpointId,
            final CheckpointMetrics checkpointMetrics,
            final TaskStateSnapshot checkpointState) {

        final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            checkpointState);

        if (checkpointCoordinator != null) {
            getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                } catch (Throwable t) {
                    log.warn("Error while processing checkpoint acknowledgement message");
                }
            });
        } else {
            log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
                    jobGraph.getJobID());
        }
    }
複製代碼

 

JobMaster反手就是一巴掌就把任務又rpc給了CheckpointCoordinator.receiveAcknowledgeMessage()方法。

以前提到,coordinator在觸發checkpoint時,生成了一個PendingCheckpoint,保存了全部operator的id。

當PendingCheckpoint收到一個operator的完成checkpoint的消息時,它就把這個operator從未完成checkpoint的節點集合移動到已完成的集合。當全部的operator都報告完成了checkpoint時,CheckpointCoordinator會觸發completePendingCheckpoint()方法,該方法作了如下事情:

  • 把pendinCgCheckpoint轉換爲CompletedCheckpoint
  • 把CompletedCheckpoint加入已完成的檢查點集合,並從未完成檢查點集合刪除該檢查點
  • 再度向各個operator發出rpc,通知該檢查點已完成

本文裏,收到這個遠程調用的就是那兩個operator chain,咱們來看看其邏輯:

複製代碼
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        synchronized (lock) {
            if (isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", getName());

                for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
                    if (operator != null) {
                        operator.notifyCheckpointComplete(checkpointId);
                    }
                }
            }
            else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
            }
        }
    }
複製代碼

 

再接下來無非就是層層通知對應的算子作出響應罷了。

至此,flink的兩階段提交的checkpoint邏輯所有完成。

 

5.3 承載checkpoint數據的抽象:State & StateBackend

State是快照數據的載體,StateBackend是快照如何被保存的抽象。

State分爲 KeyedState和OperatorState,從名字就能夠看出來分別對應着keyedStream和其餘的oeprator。從State由誰管理上,也能夠區分爲raw state和Managed state。Flink管理的就是Managed state,用戶本身管理的就是raw state。Managed State又分爲ValueState、ListState、ReducingState、AggregatingState、FoldingState、MapState這麼幾種,看名字知用途。

StateBackend目前提供了三個backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,都是看名字知用途系列。

State接口、StateBackend接口及其實現都比較簡單,代碼就不貼了, 尤爲State本質上就是一層容器封裝。

貼個別人寫的狀態管理的文章吧:詳解Flink中的狀態管理

 

6.數據流轉——Flink的數據抽象及數據交換過程

本章打算講一下flink底層是如何定義和在操做符之間傳遞數據的。

 

6.1 flink的數據抽象

 

6.1.1 MemorySegment

Flink做爲一個高效的流框架,爲了不JVM的固有缺陷(java對象存儲密度低,FGC影響吞吐和響應等),必然走上自主管理內存的道路。

這個MemorySegment就是Flink的內存抽象。默認狀況下,一個MemorySegment能夠被看作是一個32kb大的內存塊的抽象。這塊內存既能夠是JVM裏的一個byte[],也能夠是堆外內存(DirectByteBuffer)。

若是說byte[]數組和direct memory是最底層的存儲,那麼memorysegment就是在其上覆蓋的一層統一抽象。它定義了一系列抽象方法,用於控制和底層內存的交互,如:

複製代碼
public abstract class MemorySegment {

    public abstract byte get(int index);
    
    public abstract void put(int index, byte b);
    
    public int size() ;
    
    public abstract ByteBuffer wrap(int offset, int length);
    
    ......
}
複製代碼

 

咱們能夠看到,它在提供了諸多直接操做內存的方法外,還提供了一個wrap()方法,將本身包裝成一個ByteBuffer,咱們待會兒講這個ByteBuffer。

Flink爲MemorySegment提供了兩個實現類:HeapMemorySegmentHybridMemorySegment。他們的區別在於前者只能分配堆內存,然後者能用來分配堆內和堆外內存。事實上,Flink框架裏,只使用了後者。這是爲何呢?

若是HybridMemorySegment只能用於分配堆外內存的話,彷佛更合常理。可是在JVM的世界中,若是一個方法是一個虛方法,那麼每次調用時,JVM都要花時間去肯定調用的究竟是哪一個子類實現的該虛方法(方法重寫機制,不明白的去看JVM的invokeVirtual指令),也就意味着每次都要去翻方法表;而若是該方法雖然是個虛方法,但實際上整個JVM裏只有一個實現(就是說只加載了一個子類進來),那麼JVM會很聰明的把它去虛化處理,這樣就不用每次調用方法時去找方法表了,可以大大提高性能。可是隻分配堆內或者堆外內存不能知足咱們的須要,因此就出現了HybridMemorySegment同時能夠分配兩種內存的設計。

咱們能夠看看HybridMemorySegment的構造代碼:

複製代碼
    HybridMemorySegment(ByteBuffer buffer, Object owner) {
        super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
        this.offHeapBuffer = buffer;
    }
    
        HybridMemorySegment(byte[] buffer, Object owner) {
        super(buffer, owner);
        this.offHeapBuffer = null;
    }
複製代碼

 

其中,第一個構造函數的checkBufferAndGetAddress()方法可以獲得direct buffer的內存地址,所以能夠操做堆外內存。

 

6.1.2 ByteBuffer與NetworkBufferPool

MemorySegment這個抽象之上,Flink在數據從operator內的數據對象在向TaskManager上轉移,預備被髮給下個節點的過程當中,使用的抽象或者說內存對象是Buffer

注意,這個Buffer是個flink接口,不是java.nio提供的那個Buffer抽象類。Flink在這一層面同時使用了這兩個同名概念,用來存儲對象,直接看代碼時處處都是各類xxxBuffer很容易混淆:

  • java提供的那個Buffer抽象類在這一層主要用於構建HeapByteBuffer,這個主要是當數據從jvm裏的一個對象被序列化成字節數組時用的;
  • Flink的這個Buffer接口主要是一種flink層面用於傳輸數據和事件的統一抽象,其實現類是NetworkBuffer,是對MemorySegment的包裝。Flink在各個TaskManager之間傳遞數據時,使用的是這一層的抽象。

由於Buffer的底層是MemorySegment,這可能不是JVM所管理的,因此爲了知道何時一個Buffer用完了能夠回收,Flink引入了引用計數的概念,當確認這個buffer沒有人引用,就能夠回收這一片MemorySegment用於別的地方了(JVM的垃圾回收爲啥不用引用計數?讀者思考一下):

複製代碼
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private volatile int refCnt = 1;
    
    ......
}
複製代碼

 

爲了方便管理NetworkBuffer,Flink提供了BufferPoolFactory,而且提供了惟一實現NetworkBufferPool,這是個工廠模式的應用。

NetworkBufferPool在每一個TaskManager上只有一個,負責全部子task的內存管理。其實例化時就會嘗試獲取全部可由它管理的內存(對於堆內存來講,直接獲取全部內存並放入老年代,並令用戶對象只在新生代存活,能夠極大程度的減小Full GC),咱們看看其構造方法:

複製代碼
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {

        ......
        
        try {
            this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
        }
        catch (OutOfMemoryError err) {
            throw new OutOfMemoryError("Could not allocate buffer queue of length "
                    + numberOfSegmentsToAllocate + " - " + err.getMessage());
        }

        try {
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
                availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
            }
        }

        ......
        
        long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

        LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
                allocatedMb, availableMemorySegments.size(), segmentSize);
    }
複製代碼

 

因爲NetworkBufferPool只是個工廠,實際的內存池是LocalBufferPool。每一個TaskManager都只有一個NetworkBufferPool工廠,可是上面運行的每一個task都要有一個和其餘task隔離的LocalBufferPool池,這從邏輯上很好理解。另外,NetworkBufferPool會計算本身所擁有的全部內存分片數,在分配新的內存池時對每一個內存池應該佔有的內存分片數重分配,步驟是:

  • 首先,從整個工廠管理的內存片中拿出全部的內存池所須要的最少Buffer數目總和
  • 若是正好分配完,就結束
  • 其次,把全部的剩下的沒分配的內存片,按照每一個LocalBufferPool內存池的剩餘想要容量大小進行按比例分配
  • 剩餘想要容量大小是這麼個東西:若是該內存池至少須要3個buffer,最大須要10個buffer,那麼它的剩餘想要容量就是7

實現代碼以下:

複製代碼
    private void redistributeBuffers() throws IOException {
        assert Thread.holdsLock(factoryLock);

        // All buffers, which are not among the required ones
        final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;

        if (numAvailableMemorySegment == 0) {
            // in this case, we need to redistribute buffers so that every pool gets its minimum
            for (LocalBufferPool bufferPool : allBufferPools) {
                bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
            }
            return;
        }

        long totalCapacity = 0; // long to avoid int overflow

        for (LocalBufferPool bufferPool : allBufferPools) {
            int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
                bufferPool.getNumberOfRequiredMemorySegments();
            totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
        }

        // no capacity to receive additional buffers?
        if (totalCapacity == 0) {
            return; // necessary to avoid div by zero when nothing to re-distribute
        }

        final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
                Math.min(numAvailableMemorySegment, totalCapacity));

        long totalPartsUsed = 0; // of totalCapacity
        int numDistributedMemorySegment = 0;
        for (LocalBufferPool bufferPool : allBufferPools) {
            int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
                bufferPool.getNumberOfRequiredMemorySegments();

            // shortcut
            if (excessMax == 0) {
                continue;
            }

            totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);


            final int mySize = MathUtils.checkedDownCast(
                    memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);

            numDistributedMemorySegment += mySize;
            bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
        }

        assert (totalPartsUsed == totalCapacity);
        assert (numDistributedMemorySegment == memorySegmentsToDistribute);
    }
複製代碼

 

接下來講說這個LocalBufferPool內存池。
LocalBufferPool的邏輯想一想無非是增刪改查,值得說的是其fields:

複製代碼
    /** 該內存池須要的最少內存片數目*/
    private final int numberOfRequiredMemorySegments;

    /**
     * 當前已經得到的內存片中,尚未寫入數據的空白內存片
     */
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

    /**
     * 註冊的全部監控buffer可用性的監聽器
     */
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

    /** 能給內存池分配的最大分片數*/
    private final int maxNumberOfMemorySegments;

    /** 當前內存池大小 */
    private int currentPoolSize;

    /**
     * 全部經由NetworkBufferPool分配的,被本內存池引用到的(非直接得到的)分片數
     */
    private int numberOfRequestedMemorySegments;
複製代碼

 

承接NetworkBufferPool的重分配方法,咱們來看看LocalBufferPool的setNumBuffers()方法,代碼很短,邏輯也至關簡單,就不展開說了:

複製代碼
    public void setNumBuffers(int numBuffers) throws IOException {
        synchronized (availableMemorySegments) {
            checkArgument(numBuffers >= numberOfRequiredMemorySegments,
                    "Buffer pool needs at least %s buffers, but tried to set to %s",
                    numberOfRequiredMemorySegments, numBuffers);

            if (numBuffers > maxNumberOfMemorySegments) {
                currentPoolSize = maxNumberOfMemorySegments;
            } else {
                currentPoolSize = numBuffers;
            }

            returnExcessMemorySegments();

            // If there is a registered owner and we have still requested more buffers than our
            // size, trigger a recycle via the owner.
            if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
                owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
            }
        }
    }
複製代碼

 

 

6.1.3 RecordWriter與Record

咱們接着往高層抽象走,剛剛提到了最底層內存抽象是MemorySegment,用於數據傳輸的是Buffer,那麼,承上啓下對接從Java對象轉爲Buffer的中間對象是什麼呢?是StreamRecord

StreamRecord<T>這個類名字就能夠看出來,這個類就是個wrap,裏面保存了原始的Java對象。另外,StreamRecord還保存了一個timestamp。

那麼這個對象是怎麼變成LocalBufferPool內存池裏的一個大號字節數組的呢?藉助了StreamWriter這個類。

咱們直接來看把數據序列化交出去的方法:

複製代碼
    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");
        
        
        
        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }
複製代碼

 

先說最後一行,若是配置爲flushAlways,那麼會馬上把元素髮送出去,可是這樣吞吐量會降低;Flink的默認設置其實也不是一個元素一個元素的發送,是單獨起了一個線程,每隔固定時間flush一次全部channel,較真起來也算是mini batch了。

再說序列化那一句:SerializationResult result = serializer.addRecord(record);。在這行代碼中,Flink把對象調用該對象所屬的序列化器序列化爲字節數組。

 

6.2 數據流轉過程

上一節講了各層數據的抽象,這一節講講數據在各個task之間exchange的過程。

 

6.2.1 總體過程

看這張圖:
image_1cetavukjja42ce1261v5k57i9.png-821.8kB

  1. 第一步必然是準備一個ResultPartition;
  2. 通知JobMaster;
  3. JobMaster通知下游節點;若是下游節點還沒有部署,則部署之;
  4. 下游節點向上遊請求數據
  5. 開始傳輸數據
 

6.2.2 數據跨task傳遞

本節講一下算子之間具體的數據傳輸過程。也先上一張圖:
image_1cfmpba9v15anggtvsba2o1277m.png-357.5kB
數據在task之間傳遞有以下幾步:

  1. 數據在本operator處理完後,交給RecordWriter。每條記錄都要選擇一個下游節點,因此要通過ChannelSelector
  2. 每一個channel都有一個serializer(我認爲這應該是爲了不多線程寫的麻煩),把這條Record序列化爲ByteBuffer
  3. 接下來數據被寫入ResultPartition下的各個subPartition裏,此時該數據已經存入DirectBuffer(MemorySegment)
  4. 單獨的線程控制數據的flush速度,一旦觸發flush,則經過Netty的nio通道向對端寫入
  5. 對端的netty client接收到數據,decode出來,把數據拷貝到buffer裏,而後通知InputChannel
  6. 有可用的數據時,下游算子從阻塞醒來,從InputChannel取出buffer,再解序列化成record,交給算子執行用戶代碼

數據在不一樣機器的算子之間傳遞的步驟就是以上這些。

瞭解了步驟以後,再來看一下部分關鍵代碼:
首先是把數據交給recordwriter。

複製代碼
    //RecordWriterOutput.java
    @Override
    public void collect(StreamRecord<OUT> record) {
        if (this.outputTag != null) {
            // we are only responsible for emitting to the main input
            return;
        }
        //這裏能夠看到把記錄交給了recordwriter
        pushToRecordWriter(record);
    }
複製代碼

 

而後recordwriter把數據發送到對應的通道。

複製代碼
    //RecordWriter.java
    public void emit(T record) throws IOException, InterruptedException {
        //channelselector登場了
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }
    
        private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        
        //選擇序列化器並序列化數據
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            //寫入channel
            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }
複製代碼

 

接下來是把數據推給底層設施(netty)的過程:

複製代碼
    //ResultPartition.java
    @Override
    public void flushAll() {
        for (ResultSubpartition subpartition : subpartitions) {
            subpartition.flush();
        }
    }
    
        //PartitionRequestQueue.java
        void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
        //這裏交給了netty server線程去推
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                ctx.pipeline().fireUserEventTriggered(reader);
            }
        });
    }
複製代碼

 

netty相關的部分:

複製代碼
    //AbstractChannelHandlerContext.java
    public ChannelHandlerContext fireUserEventTriggered(final Object event) {
        if (event == null) {
            throw new NullPointerException("event");
        } else {
            final AbstractChannelHandlerContext next = this.findContextInbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeUserEventTriggered(event);
            } else {
                executor.execute(new OneTimeTask() {
                    public void run() {
                        next.invokeUserEventTriggered(event);
                    }
                });
            }

            return this;
        }
    }
複製代碼

 

最後真實的寫入:

複製代碼
    //PartittionRequesetQueue.java
    private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
        if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
            return;
        }
        // Queue an available reader for consumption. If the queue is empty,
        // we try trigger the actual write. Otherwise this will be handled by
        // the writeAndFlushNextMessageIfPossible calls.
        boolean triggerWrite = availableReaders.isEmpty();
        registerAvailableReader(reader);

        if (triggerWrite) {
            writeAndFlushNextMessageIfPossible(ctx.channel());
        }
    }
    
    private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
        
        ......

                next = reader.getNextBuffer();
                if (next == null) {
                    if (!reader.isReleased()) {
                        continue;
                    }
                    markAsReleased(reader.getReceiverId());

                    Throwable cause = reader.getFailureCause();
                    if (cause != null) {
                        ErrorResponse msg = new ErrorResponse(
                            new ProducerFailedException(cause),
                            reader.getReceiverId());

                        ctx.writeAndFlush(msg);
                    }
                } else {
                    // This channel was now removed from the available reader queue.
                    // We re-add it into the queue if it is still available
                    if (next.moreAvailable()) {
                        registerAvailableReader(reader);
                    }

                    BufferResponse msg = new BufferResponse(
                        next.buffer(),
                        reader.getSequenceNumber(),
                        reader.getReceiverId(),
                        next.buffersInBacklog());

                    if (isEndOfPartitionEvent(next.buffer())) {
                        reader.notifySubpartitionConsumed();
                        reader.releaseAllResources();

                        markAsReleased(reader.getReceiverId());
                    }

                    // Write and flush and wait until this is done before
                    // trying to continue with the next buffer.
                    channel.writeAndFlush(msg).addListener(writeListener);

                    return;
                }
        
        ......
        
    }
複製代碼

 

上面這段代碼裏第二個方法中調用的writeAndFlush(msg)就是真正往netty的nio通道里寫入的地方了。在這裏,寫入的是一個RemoteInputChannel,對應的就是下游節點的InputGate的channels。

有寫就有讀,nio通道的另外一端須要讀入buffer,代碼以下:

複製代碼
    //CreditBasedPartitionRequestClientHandler.java
    private void decodeMsg(Object msg) throws Throwable {
        final Class<?> msgClazz = msg.getClass();

        // ---- Buffer --------------------------------------------------------
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

            RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
            if (inputChannel == null) {
                bufferOrEvent.releaseBuffer();

                cancelRequestFor(bufferOrEvent.receiverId);

                return;
            }

            decodeBufferOrEvent(inputChannel, bufferOrEvent);

        } 
        
        ......
        
    }
複製代碼

 

插一句,Flink其實作阻塞和獲取數據的方式很是天然,利用了生產者和消費者模型,當獲取不到數據時,消費者天然阻塞;當數據被加入隊列,消費者被notify。Flink的背壓機制也是藉此實現。

而後在這裏又反序列化成StreamRecord

複製代碼
    //StreamElementSerializer.java
    public StreamElement deserialize(DataInputView source) throws IOException {
        int tag = source.readByte();
        if (tag == TAG_REC_WITH_TIMESTAMP) {
            long timestamp = source.readLong();
            return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
        }
        else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
            return new StreamRecord<T>(typeSerializer.deserialize(source));
        }
        else if (tag == TAG_WATERMARK) {
            return new Watermark(source.readLong());
        }
        else if (tag == TAG_STREAM_STATUS) {
            return new StreamStatus(source.readInt());
        }
        else if (tag == TAG_LATENCY_MARKER) {
            return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());
        }
        else {
            throw new IOException("Corrupt stream, found tag: " + tag);
        }
    }
複製代碼

 

而後再次在StreamInputProcessor.processInput()循環中獲得處理。

至此,數據在跨jvm的節點之間的流轉過程就講完了。

 

6.3 Credit漫談

在看上一部分的代碼時,有一個小細節不知道讀者有沒有注意到,咱們的數據發送端的代碼叫作PartittionRequesetQueue.java,而咱們的接收端卻起了一個徹底不相干的名字:CreditBasedPartitionRequestClientHandler.java。爲何前面加了CreditBased的前綴呢?

 

6.3.1 背壓問題

在流模型中,咱們期待數據是像水流同樣平滑的流過咱們的引擎,但現實生活不會這麼美好。數據的上游可能由於各類緣由數據量暴增,遠遠超出了下游的瞬時處理能力(回憶一下98年大洪水),致使系統崩潰。
那麼框架應該怎麼應對呢?和人類處理天然災害的方式相似,咱們修建了三峽大壩,當洪水來臨時把大量的水囤積在大壩裏;對於Flink來講,就是在數據的接收端和發送端放置了緩存池,用以緩衝數據,而且設置閘門阻止數據向下流。

那麼Flink又是如何處理背壓的呢?答案也是靠這些緩衝池。
image_1cfksrl5cd4m1lbqqqgvc811349.png-43.1kB
這張圖說明了Flink在生產和消費數據時的大體狀況。ResultPartitionInputGate在輸出和輸入數據時,都要向NetworkBufferPool申請一塊MemorySegment做爲緩存池。
接下來的狀況和生產者消費者很相似。當數據發送太多,下游處理不過來了,那麼首先InputChannel會被填滿,而後是InputChannel能申請到的內存達到最大,因而下游中止讀取數據,上游負責發送數據的nettyServer會獲得響應,中止從ResultSubPartition讀取緩存,那麼ResultPartition很快也將存滿數據不能被消費,從而生產數據的邏輯被阻塞在獲取新buffer上,很是天然地造成背壓的效果。

Flink本身作了個試驗用以說明這個機制的效果:
image_1cfkta54rkdd1od4aau1e3n7nhm.png-240.6kB
咱們首先設置生產者的發送速度爲60%,而後下游的算子以一樣的速度處理數據。而後咱們將下游算子的處理速度下降到30%,能夠看到上游的生產者的數據產生曲線幾乎與消費者同步下滑。然後當咱們解除限速,整個流的速度馬上提升到了100%。

 

6.3.2 使用Credit實現ATM網絡流控

上文已經提到,對於流量控制,一個樸素的思路就是在長江上建三峽鏈路上創建一個攔截的dam,以下圖所示:
image_1cfku114lf7hpqf3lmcl0116c13.png-22.7kB
基於Credit的流控就是這樣一種創建在信用(消費數據的能力)上的,面向每一個虛鏈路(而非端到端的)流模型,以下圖所示:
image_1cfku4g4g174d7gb5ecbfcib71g.png-22.5kB
首先,下游會向上遊發送一條credit message,用以通知其目前的信用(可聯想信用卡的可用額度),而後上游會根據這個信用消息來決定向下游發送多少數據。當上遊把數據發送給下游時,它就從下游的信用卡上划走相應的額度(credit balance):
image_1cfkug5sm1v4l15pbgj4jntc7q1t.png-12.9kB
下游總共得到的credit數目是Buf_Alloc,已經消費的數據是Fwd_Cnt,上游發送出來的數據是Tx_Cnt,那麼剩下的那部分就是Crd_Bal:
Crd_Bal = Buf_Alloc - ( Tx_Cnt - Fwd_Cnt )
上面這個式子應該很好理解。

能夠看到,Credit Based Flow Control的關鍵是buffer分配。這種分配能夠在數據的發送端完成,也能夠在接收端完成。對於下游可能有多個上游節點的狀況(好比Flink),使用接收端的credit分配更加合理:
image_1cfkvpmlh1gl31ef41cvh1c903a19.png-13.1kB
上圖中,接收者能夠觀察到每一個上游鏈接的帶寬狀況,而上游的節點Snd1卻不可能輕易知道發往同一個下游節點的其餘Snd2的帶寬狀況,從而若是在上游控制流量將會很困難,而在下游控制流量將會很方便。

所以,這就是爲什麼Flink在接收端有一個基於Credit的Client,而不是在發送端有一個CreditServer的緣由。

最後,再講一下Credit的面向虛鏈路的流設計和端到端的流設計的區別:
image_1cfl05d2f1ub879c1lc5qsq14n9m.png-13.4kB
如上圖所示,a是面向鏈接的流設計,b是端到端的流設計。其中,a的設計使得當下游節點3因某些狀況必須緩存數據暫緩處理時,每一個上游節點(1和2)均可以利用其緩存保存數據;而端到端的設計b裏,只有節點3的緩存才能夠用於保存數據(讀者能夠從如何實現上想一想爲何)。

對流控制感興趣的讀者,能夠看這篇文章:Traffic Management For High-Speed Networks

 

7.其餘核心概念

截至第六章,和執行過程相關的部分就所有講完,告一段落了。第七章主要講一點雜七雜八的內容,有時間就不按期更新。

 

7.1 EventTime時間模型

flink有三種時間模型:ProcessingTime,EventTime和IngestionTime。
關於時間模型看這張圖:
image_1cdbotdcmoe11q961st5lbn1j4n9.png-38.4kB
從這張圖裏能夠很清楚的看到三種Time模型的區別。

  • EventTime是數據被生產出來的時間,能夠是好比傳感器發出信號的時間等(此時數據尚未被傳輸給flink)。
  • IngestionTime是數據進入flink的時間,也就是從Source進入flink流的時間(此時數據剛剛被傳給flink)
  • ProcessingTime是針對當前算子的系統時間,是指該數據已經進入某個operator時,operator所在系統的當前時間

例如,我在寫這段話的時間是2018年5月13日03點47分,可是我引用的這張EventTime的圖片,是2015年畫出來的,那麼這張圖的EventTime是2015年,而ProcessingTime是如今。
Flink官網對於時間戳的解釋很是詳細:點我
Flink對於EventTime模型的實現,依賴的是一種叫作watermark的對象。watermark是攜帶有時間戳的一個對象,會按照程序的要求被插入到數據流中,用以標誌某個事件在該時間發生了。
我再作一點簡短的說明,仍是以官網的圖爲例:
image_1cdbt8v5jl2ujn91uu1joh1p4gm.png-11.3kB
對於有序到來的數據,假設咱們在timestamp爲11的元素後加入一個watermark,時間記錄爲11,則下個元素收到該watermark時,認爲全部早於11的元素均已到達。這是很是理想的狀況。
image_1cdbtcc5c1a6i1tuaadb1rd5136913.png-11.6kB
而在現實生活中,常常會遇到亂序的數據。這時,咱們雖然在timestamp爲7的元素後就收到了11,可是咱們一直等到了收到元素12以後,才插入了watermark爲11的元素。與上面的圖相比,若是咱們仍然在11後就插入11的watermark,那麼元素9就會被丟棄,形成數據丟失。而咱們在12以後插入watermark11,就保證了9仍然會被下一個operator處理。固然,咱們不可能無限制的永遠等待遲到元素,因此要在哪一個元素後插入11須要根據實際場景權衡。

對於來自多個數據源的watermark,能夠看這張圖:
image_1cdbufp4a1opmsit5n61mial4520.png-72kB
能夠看到,當一個operator收到多個watermark時,它遵循最小原則(或者說最先),即算子的當前watermark是流經該算子的最小watermark,以允許來自不一樣的source的亂序數據到來。
關於事件時間模型,更多內容能夠參考Stream 101 和谷歌的這篇論文:Dataflow Model paper

 

7.2 FLIP-6 部署及處理模型演進

就在老白寫這篇blog的時候,Flink發佈了其1.5 RELEASE版本,號稱實現了其部署及處理模型(也就是FLIP-6),因此打算簡略地說一下FLIP-6的主要內容。

 

7.2.1 現有模型不足

1.5以前的Flink模型有不少不足,包括:

  • 只能靜態分配計算資源
  • 在YARN上全部的資源分配都是一碗水端平的
  • 與Docker/k8s的集成很是之蠢,很有脫褲子放屁的神韻
  • JobManager沒有任務調度邏輯
  • 任務在YARN上執行結束後web dashboard就不可用
  • 集羣的session模式和per job模式混淆難以理解

就我我的而言,我以爲Flink有一個這裏徹底沒提到的不足纔是最應該修改的:針對任務的徹底的資源隔離。尤爲是若是用Standalone集羣,一個用戶的task跑掛了TaskManager,而後拖垮了整個集羣的狀況簡直不要太多。

 

7.2.2 核心變動

Single Job JobManager
最重要的變動是一個JobManager只處理一個job。當咱們生成JobGraph時就順便起一個JobManager,這顯然更加天然。

ResourceManager
其職責包括獲取新的TM和slot,通知失敗,釋放資源以及緩存TM以用於重用等。重要的是,這個組件要能作到掛掉時不要搞垮正在運行的好好的任務。其職責和與JobManager、TaskManager的交互圖以下:
image_1cfl9453k1gld4acr1m13j3195sg.png-23.9kB

TaskManager
TM要與上面的兩個組件交互。與JobManager交互時,要能提供slot,要能與全部給出slot的JM交互。丟失與JM的鏈接時要能試圖把本TM上的slot的狀況通告給新JM,若是這一步失敗,就要能從新分配slot。
與ResourceManager交互時,要通知RM本身的資源和當前的Job分配狀況,能按照RM的要求分配資源或者關閉自身。

JobManager Slot Pool
這個pool要持有全部分配給當前job的slot資源,而且能在RM掛掉的狀況下管理當前已經持有的slot。

Dispatcher
須要一個Job的分發器的主要緣由是在有的集羣環境下咱們可能須要一個統一的提交和監控點,以及替代以前的Standalone模式下的JobManager。未來對分發器的指望可能包括權限控制等。
image_1cfl9ju2617bh1s191mar1jsp12vot.png-31.4kB

 

7.2.3 Cluster Manager的架構

YARN
新的基於YARN的架構主要包括再也不須要先在容器裏啓動集羣,而後提交任務;用戶代碼再也不使用動態ClassLoader加載;不用的資源能夠釋放;能夠按需分配不一樣大小的容器等。其執行過程以下:
無Dispatcher時
image_1cfla0n7u1lg21n3o36uu0c1o5h1a.png-46.2kB
有Dispatcher時
image_1cfla15os15i3qcsu6c4p4clk1n.png-50.7kB

Mesos
與基於YARN的模式很像,可是隻有帶Dispatcher模式,由於只有這樣才能在Mesos集羣裏跑其RM。
image_1cfla4tka101n18bf1mno4npu9s24.png-49.2kB
Mesos的Fault Tolerance是相似這樣的:
image_1cfla6eka1ph71mu1pll1q0mgqq2h.png-12.1kB
必須用相似Marathon之類的技術保證Dispatcher的HA。

Standalone
其實沒啥可說的,把之前的JobManager的職責換成如今的Dispatcher就好了。
image_1cflaaim2ih2v54umsmq01lqc2u.png-36.8kB
未來可能會實現一個相似於輕量級Yarn的模式。

Docker/k8s
用戶定義好容器,至少有一個是job specific的(否則怎麼啓動任務);還有用於啓動TM的,能夠不是job specific的。啓動過程以下
image_1cflafs2o1trgicjmdbndn1bdq3b.png-24.2kB

 

7.2.4 組件設計及細節

分配slot相關細節
重新的TM取slot過程:
image_1cflakoadvjm8pf6nt1k331qj33o.png-77.2kB

從Cached TM取slot過程:
image_1cflambu91ufi5fl1cg9gimdff45.png-63.4kB

失敗處理

    1. TM失敗
      TM失敗時,RM要能檢測到失敗,更新本身的狀態,發送消息給JM,重啓一份TM;JM要能檢測到失敗,從狀態移除失效slot,標記該TM的task爲失敗,並在沒有足夠slot繼續任務時調整規模;TM自身則要能從Checkpoint恢復

    2. RM失敗
      此時TM要能檢測到失敗,並準備向新的RM註冊自身,而且向新的RM傳遞自身的資源狀況;JM要能檢測到失敗而且等待新的RM可用,從新請求須要的資源;丟失的數據要能從Container、TM等處恢復。

    3. JM失敗
      TM釋放全部task,向新JM註冊資源,而且若是不成功,就向RM報告這些資源可用於重分配;RM坐等;JM丟失的數據從持久化存儲中得到,已完成的checkpoints從HA恢復,從最近的checkpoint重啓task,並申請資源。

    4. JM & RM 失敗
      TM將在一段時間內試圖把資源交給新上任的JM,若是失敗,則把資源交給新的RM

    5. TM & RM失敗JM若是正在申請資源,則要等到新的RM啓動後才能得到;JM可能須要調整其規模,由於損失了TM的slot。

相關文章
相關標籤/搜索