從flink-example分析flink組件(3)WordCount 流式實戰及源碼分析

前面介紹了批量處理的WorkCount是如何執行的html

<從flink-example分析flink組件(1)WordCount batch實戰及源碼分析>web

<從flink-example分析flink組件(2)WordCount batch實戰及源碼分析----flink如何在本地執行的?>less

這篇從WordCount的流式處理開始異步

/**
 * Implements the "WordCount" program that computes a simple word occurrence
 * histogram over text files in a streaming fashion.
 *
 * <p>The input is a plain text file with lines separated by newline characters.
 *
 * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
 * If no parameters are provided, the program is run with default data from
 * {@link WordCountData}.
 *
 * <p>This example shows how to:
 * <ul>
 * <li>write a simple Flink Streaming program,
 * <li>use tuple data types,
 * <li>write and use user-defined functions.
 * </ul>
 */
public class WordCount {

    // *************************************************************************
    // PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

 DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(0).sum(1);                                                     //1

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

        // execute program
        env.execute("Streaming WordCount");//2
    }

    // *************************************************************************
    // USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a
     * user-defined FlatMapFunction. The function takes a line (String) and
     * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
     * Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}

整個執行流程以下圖所示:ide

 

 第1~4步:main方法讀取文件,增長算子源碼分析

    private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,
                                                        TypeInformation<OUT> typeInfo,
                                                        String sourceName,
                                                        FileProcessingMode monitoringMode,
                                                        long interval) {

        Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
        Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
        Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
        Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");

        Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
                interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
            "The path monitoring interval cannot be less than " +
                    ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");

        ContinuousFileMonitoringFunction<OUT> monitoringFunction =
            new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);

        ContinuousFileReaderOperator<OUT> reader =
            new ContinuousFileReaderOperator<>(inputFormat);

 SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
                .transform("Split Reader: " + sourceName, typeInfo, reader);                //1

        return new DataStreamSource<>(source);
    }

增長算子的方法,當調用execute方法時,此時增長的算子會被執行。ui

    /**
     * Adds an operator to the list of operators that should be executed when calling
     * {@link #execute}.
     *
     * <p>When calling {@link #execute()} only the operators that where previously added to the list
     * are executed.
     *
     * <p>This is not meant to be used by users. The API methods that create operators must call
     * this method.
     */
    @Internal
    public void addOperator(StreamTransformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

第5步:產生StreamGraph,從而能夠獲得JobGraph,即將Stream程序轉換成JobGraphthis

        // transform the streaming program into a JobGraph
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);

        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

第6~8步啓動MiniCluster,爲執行job作準備spa

/**
     * Starts the mini cluster, based on the configured properties.
     *
     * @throws Exception This method passes on any exception that occurs during the startup of
     *                   the mini cluster.
     */
    public void start() throws Exception {
        synchronized (lock) {
            checkState(!running, "MiniCluster is already running");

            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", miniClusterConfiguration);

            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

            try {
                initializeIOFormatClasses(configuration);

                LOG.info("Starting Metrics Registry");
                metricRegistry = createMetricRegistry(configuration);

                // bring up all the RPC services
                LOG.info("Starting RPC Service(s)");

                AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);

                final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;

                if (useSingleRpcService) {
                    // we always need the 'commonRpcService' for auxiliary calls
                    commonRpcService = createRpcService(akkaRpcServiceConfig, false, null);
                    final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
                    taskManagerRpcServiceFactory = commonRpcServiceFactory;
                    dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
                } else {
                    // we always need the 'commonRpcService' for auxiliary calls
                    commonRpcService = createRpcService(akkaRpcServiceConfig, true, null);

                    // start a new service per component, possibly with custom bind addresses
                    final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
                    final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();

                    dispatcherResourceManagreComponentRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, jobManagerBindAddress);
                    taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, taskManagerBindAddress);
                }

                RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(
                    configuration,
                    commonRpcService.getAddress());
                metricRegistry.startQueryService(metricQueryServiceRpcService, null);

                ioExecutor = Executors.newFixedThreadPool(
                    Hardware.getNumberCPUCores(),
                    new ExecutorThreadFactory("mini-cluster-io"));
                haServices = createHighAvailabilityServices(configuration, ioExecutor);

                blobServer = new BlobServer(configuration, haServices.createBlobStore());
                blobServer.start();

                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

                blobCacheService = new BlobCacheService(
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
                );

                startTaskManagers();

                MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());

                dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
                    configuration,
                    dispatcherResourceManagreComponentRpcServiceFactory,
                    haServices,
                    blobServer,
                    heartbeatServices,
                    metricRegistry,
                    metricQueryServiceRetriever,
                    new ShutDownFatalErrorHandler()
                ));

                resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
                dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
                webMonitorLeaderRetrievalService = haServices.getWebMonitorLeaderRetriever();

                dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
                    commonRpcService,
                    DispatcherGateway.class,
                    DispatcherId::fromUuid,
                    20,
                    Time.milliseconds(20L));
                resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
                    commonRpcService,
                    ResourceManagerGateway.class,
                    ResourceManagerId::fromUuid,
                    20,
                    Time.milliseconds(20L));
                webMonitorLeaderRetriever = new LeaderRetriever();

                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
                webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);
            }
            catch (Exception e) {
                // cleanup everything
                try {
                    close();
                } catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }

            // create a new termination future
            terminationFuture = new CompletableFuture<>();

            // now officially mark this as running
            running = true;

            LOG.info("Flink Mini Cluster started successfully");
        }
    }

第9~12步 執行jobdebug

    /**
     * This method runs a job in blocking mode. The method returns only after the job
     * completed successfully, or after it failed terminally.
     *
     * @param job  The Flink job to execute
     * @return The result of the job execution
     *
     * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
     *         or if the job terminally failed.
     */
    @Override
    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        checkNotNull(job, "job is null");

        final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

        final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
            (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

        final JobResult jobResult;

        try {
            jobResult = jobResultFuture.get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
        }

        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), e);
        }
    }

先上傳jar包文件,此時須要DispatcherGateway來執行上轉任務,異步等待結果執行完畢

總結:

batch和stream的執行流程很類似,又有不一樣。

不一樣:Stream傳遞的是DataStream,Batch傳遞的是DataSet

相同:都轉換成JobGraph執行

相關文章
相關標籤/搜索