Flink源碼分析 - 剖析一個簡單的Flink程序

本篇文章首發於頭條號Flink程序是如何執行的?經過源碼來剖析一個簡單的Flink程序,歡迎關注頭條號和微信公衆號「大數據技術和人工智能」(微信搜索bigdata_ai_tech)獲取更多幹貨,也歡迎關注個人CSDN博客java

在這以前已經介紹了如何在本地搭建Flink環境和如何建立Flink應用如何構建Flink源碼,這篇文章用官方提供的SocketWindowWordCount例子來解析一下一個常規Flink程序的每個基本步驟。apache

示例程序

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");
        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });
        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }
    // ------------------------------------------------------------------------
    /**
     * Data type for words with count.
     */
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

上面這個是官網的SocketWindowWordCount程序示例,它首先從命令行中獲取socket鏈接的host和port,而後獲取執行環境、從socket鏈接中讀取數據、解析和轉換數據,最後輸出結果數據。
每一個Flink程序都包含如下幾個相同的基本部分:編程

  1. 得到一個execution environment,
  2. 加載/建立初始數據,
  3. 指定此數據的轉換,
  4. 指定放置計算結果的位置,
  5. 觸發程序執行

Flink執行環境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Flink程序都是從這句代碼開始,這行代碼會返回一個執行環境,表示當前執行程序的上下文。若是程序是獨立調用的,則此方法返回一個由createLocalEnvironment()建立的本地執行環境LocalStreamEnvironment。從其源碼裏能夠看出來:api

//代碼目錄:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
public static StreamExecutionEnvironment getExecutionEnvironment() {
    if (contextEnvironmentFactory != null) {
        return contextEnvironmentFactory.createExecutionEnvironment();
    }
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    if (env instanceof ContextEnvironment) {
        return new StreamContextEnvironment((ContextEnvironment) env);
    } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
        return new StreamPlanEnvironment(env);
    } else {
        return createLocalEnvironment();
    }
}

獲取輸入數據

DataStream<String> text = env.socketTextStream(hostname, port, "\n");

這個例子裏的源數據來自於socket,這裏會根據指定的socket配置建立socket鏈接,而後建立一個新數據流,包含從套接字無限接收的字符串,接收的字符串由系統的默認字符集解碼。當socket鏈接關閉時,數據讀取會當即終止。經過查看源碼能夠發現,這裏其實是經過指定的socket配置來構造一個SocketTextStreamFunction實例,而後源源不斷的從socket鏈接裏讀取輸入的數據建立數據流。微信

//代碼目錄:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@PublicEvolving
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
    return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
            "Socket Stream");
}

SocketTextStreamFunction的類繼承關係以下:
SocketTextStreamFunction類關係圖app

能夠看出SocketTextStreamFunctionSourceFunction的子類,SourceFunction是Flink中全部流數據源的基本接口。SourceFunction的定義以下:socket

//代碼目錄:org/apache/flink/streaming/api/functions/source/SourceFunction.java
@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
    @Public
    interface SourceContext<T> {
        void collect(T element);
        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);
        @PublicEvolving
        void emitWatermark(Watermark mark);
        @PublicEvolving
        void markAsTemporarilyIdle();
        Object getCheckpointLock();
        void close();
    }
}

SourceFunction定義了runcancel兩個方法和SourceContext內部接口。ide

  • run(SourceContex):實現數據獲取邏輯,並能夠經過傳入的參數ctx進行向下遊節點的數據轉發。
  • cancel():用來取消數據源,通常在run方法中,會存在一個循環來持續產生數據,cancel方法則能夠使該循環終止。
  • SourceContext:source函數用於發出元素和可能的watermark的接口,返回source生成的元素的類型。

瞭解了SourceFunction這個接口,再來看下SocketTextStreamFunction的具體實現(主要是run方法),邏輯就已經很清晰了,就是從指定的hostname和port持續不斷的讀取數據,按回車換行分隔符劃分紅一個個字符串,而後再將數據轉發到下游。如今回到StreamExecutionEnvironmentsocketTextStream方法,它經過調用addSource返回一個DataStreamSource實例。思考一下,例子裏的text變量是DataStream類型,爲何源碼裏的返回類型倒是DataStreamSource呢?這是由於DataStreamDataStreamSource的父類,下面的類關係圖能夠看出來,這也體現出了Java的多態的特性。
DataStreamSource類關係圖函數

數據流操做

對上面取到的DataStreamSource,進行flatMapkeyBytimeWindowreduce轉換操做。大數據

DataStream<WordWithCount> windowCounts = text
        .flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String value, Collector<WordWithCount> out) {
                for (String word : value.split("\\s")) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        })
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .reduce(new ReduceFunction<WordWithCount>() {
            @Override
            public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                return new WordWithCount(a.word, a.count + b.count);
            }
        });

這段邏輯中,對上面取到的DataStreamSource數據流分別作了flatMapkeyBytimeWindowreduce四個轉換操做,下面說一下flatMap轉換,其餘三個轉換操做讀者能夠試着本身查看源碼理解一下。

先看一下flatMap方法的源碼吧,以下。

//代碼目錄:org/apache/flink/streaming/api/datastream/DataStream.java
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
            getType(), Utils.getCallLocationName(), true);
    return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}

這裏面作了兩件事,一是用反射拿到了flatMap算子的輸出類型,二是生成了一個operator。flink流式計算的核心概念就是將數據從輸入流一個個傳遞給operator進行鏈式處理,最後交給輸出流的過程。對數據的每一次處理在邏輯上成爲一個operator。上面代碼中的最後一行transform方法的做用是返回一個SingleOutputStreamOperator,它繼承了Datastream類而且定義了一些輔助方法,方便對流的操做。在返回以前,transform方法還把它註冊到了執行環境中。下面這張圖是一個由Flink程序映射爲Streaming Dataflow的示意圖:
Flink基本編程模型

結果輸出

windowCounts.print().setParallelism(1);

每一個Flink程序都是以source開始以sink結尾,這裏的print方法就是把計算出來的結果sink標準輸出流。在實際開發中,通常會經過官網提供的各類Connectors或者自定義的Connectors把計算好的結果數據sink到指定的地方,好比Kafka、HBase、FileSystem、Elasticsearch等等。這裏的setParallelism是設置此接收器的並行度的,值必須大於零。

執行程序

env.execute("Socket Window WordCount");

Flink有遠程模式和本地模式兩種執行模式,這兩種模式有一點不一樣,這裏按本地模式來解析。先看下execute方法的源碼,以下:

//代碼目錄:org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@Override
public JobExecutionResult execute(String jobName) throws Exception {
    // transform the streaming program into a JobGraph
    StreamGraph streamGraph = getStreamGraph();
    streamGraph.setJobName(jobName);
    JobGraph jobGraph = streamGraph.getJobGraph();
    jobGraph.setAllowQueuedScheduling(true);
    Configuration configuration = new Configuration();
    configuration.addAll(jobGraph.getJobConfiguration());
    configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
    // add (and override) the settings with what the user defined
    configuration.addAll(this.configuration);
    if (!configuration.contains(RestOptions.BIND_PORT)) {
        configuration.setString(RestOptions.BIND_PORT, "0");
    }
    int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
    MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
        .setConfiguration(configuration)
        .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
        .build();
    if (LOG.isInfoEnabled()) {
        LOG.info("Running job on local embedded Flink mini cluster");
    }
    MiniCluster miniCluster = new MiniCluster(cfg);
    try {
        miniCluster.start();
        configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
        return miniCluster.executeJobBlocking(jobGraph);
    }
    finally {
        transformations.clear();
        miniCluster.close();
    }
}

這個方法包含三部分:將流程序轉換爲JobGraph、使用用戶定義的內容添加(或覆蓋)設置、啓動一個miniCluster並執行任務。關於JobGraph暫先不講,這裏就只說一下執行任務,跟進下return miniCluster.executeJobBlocking(jobGraph);這行的源碼,以下:

//代碼目錄:org/apache/flink/runtime/minicluster/MiniCluster.java
@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
    checkNotNull(job, "job is null");
    final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
    final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
        (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
    final JobResult jobResult;
    try {
        jobResult = jobResultFuture.get();
    } catch (ExecutionException e) {
        throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e);
    }
    try {
        return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
    } catch (IOException | ClassNotFoundException e) {
        throw new JobExecutionException(job.getJobID(), e);
    }
}

這段代碼的核心邏輯就是final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);,調用了MiniCluster類的submitJob方法,接着看這個方法:

//代碼目錄:org/apache/flink/runtime/minicluster/MiniCluster.java
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
    final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
    // we have to allow queued scheduling in Flip-6 mode because we need to request slots
    // from the ResourceManager
    jobGraph.setAllowQueuedScheduling(true);
    final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
    final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
    final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
        .thenCombine(
            dispatcherGatewayFuture,
            (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
        .thenCompose(Function.identity());
    return acknowledgeCompletableFuture.thenApply(
        (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}

這裏的Dispatcher組件負責接收做業提交,持久化它們,生成JobManagers來執行做業並在主機故障時恢復它們。Dispatcher有兩個實現,在本地環境下啓動的是MiniDispatcher,在集羣環境上啓動的是StandaloneDispatcher。下面是類結構圖:
MiniDispatcher類結構圖

這裏的Dispatcher啓動了一個JobManagerRunner,委託JobManagerRunner去啓動該Job的JobMaster。對應的代碼以下:

//代碼目錄:org/apache/flink/runtime/jobmaster/JobManagerRunner.java
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
    final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
    return jobSchedulingStatusFuture.thenCompose(
        jobSchedulingStatus -> {
            if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
                return jobAlreadyDone();
            } else {
                return startJobMaster(leaderSessionId);
            }
        });
}

JobMaster通過一系列方法嵌套調用以後,最終執行到下面這段邏輯:

//代碼目錄:org/apache/flink/runtime/jobmaster/JobMaster.java
private void scheduleExecutionGraph() {
    checkState(jobStatusListener == null);
    // register self as job status change listener
    jobStatusListener = new JobManagerJobStatusListener();
    executionGraph.registerJobStatusListener(jobStatusListener);
    try {
        executionGraph.scheduleForExecution();
    }
    catch (Throwable t) {
        executionGraph.failGlobal(t);
    }
}

這裏executionGraph.scheduleForExecution();調用了ExecutionGraph的啓動方法。在Flink的圖結構中,ExecutionGraph是真正被執行的地方,因此到這裏爲止,一個任務從提交到真正執行的流程就結束了,下面再回顧一下本地環境下的執行流程:

  1. 客戶端執行execute方法;
  2. MiniCluster完成了大部分任務後把任務直接委派給MiniDispatcher
  3. Dispatcher接收job以後,會實例化一個JobManagerRunner,而後用這個實例啓動job;
  4. JobManagerRunner接下來把job交給JobMaster去處理;
  5. JobMaster使用ExecutionGraph的方法啓動整個執行圖,整個任務就啓動起來了。
相關文章
相關標籤/搜索