本篇文章首發於頭條號Flink程序是如何執行的?經過源碼來剖析一個簡單的Flink程序,歡迎關注頭條號和微信公衆號「大數據技術和人工智能」(微信搜索bigdata_ai_tech)獲取更多幹貨,也歡迎關注個人CSDN博客。java
在這以前已經介紹了如何在本地搭建Flink環境和如何建立Flink應用和如何構建Flink源碼,這篇文章用官方提供的SocketWindowWordCount例子來解析一下一個常規Flink程序的每個基本步驟。apache
public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
上面這個是官網的SocketWindowWordCount
程序示例,它首先從命令行中獲取socket鏈接的host和port,而後獲取執行環境、從socket鏈接中讀取數據、解析和轉換數據,最後輸出結果數據。
每一個Flink程序都包含如下幾個相同的基本部分:編程
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Flink程序都是從這句代碼開始,這行代碼會返回一個執行環境,表示當前執行程序的上下文。若是程序是獨立調用的,則此方法返回一個由createLocalEnvironment()
建立的本地執行環境LocalStreamEnvironment
。從其源碼裏能夠看出來:api
//代碼目錄:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java public static StreamExecutionEnvironment getExecutionEnvironment() { if (contextEnvironmentFactory != null) { return contextEnvironmentFactory.createExecutionEnvironment(); } ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) { return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } }
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
這個例子裏的源數據來自於socket,這裏會根據指定的socket配置建立socket鏈接,而後建立一個新數據流,包含從套接字無限接收的字符串,接收的字符串由系統的默認字符集解碼。當socket鏈接關閉時,數據讀取會當即終止。經過查看源碼能夠發現,這裏其實是經過指定的socket配置來構造一個SocketTextStreamFunction
實例,而後源源不斷的從socket鏈接裏讀取輸入的數據建立數據流。微信
//代碼目錄:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream"); }
SocketTextStreamFunction
的類繼承關係以下:
app
能夠看出SocketTextStreamFunction
是SourceFunction
的子類,SourceFunction
是Flink中全部流數據源的基本接口。SourceFunction
的定義以下:socket
//代碼目錄:org/apache/flink/streaming/api/functions/source/SourceFunction.java @Public public interface SourceFunction<T> extends Function, Serializable { void run(SourceContext<T> ctx) throws Exception; void cancel(); @Public interface SourceContext<T> { void collect(T element); @PublicEvolving void collectWithTimestamp(T element, long timestamp); @PublicEvolving void emitWatermark(Watermark mark); @PublicEvolving void markAsTemporarilyIdle(); Object getCheckpointLock(); void close(); } }
SourceFunction
定義了run
和cancel
兩個方法和SourceContext
內部接口。ide
瞭解了SourceFunction
這個接口,再來看下SocketTextStreamFunction
的具體實現(主要是run
方法),邏輯就已經很清晰了,就是從指定的hostname和port持續不斷的讀取數據,按回車換行分隔符劃分紅一個個字符串,而後再將數據轉發到下游。如今回到StreamExecutionEnvironment
的socketTextStream
方法,它經過調用addSource
返回一個DataStreamSource
實例。思考一下,例子裏的text
變量是DataStream
類型,爲何源碼裏的返回類型倒是DataStreamSource
呢?這是由於DataStream
是DataStreamSource
的父類,下面的類關係圖能夠看出來,這也體現出了Java的多態的特性。
函數
對上面取到的DataStreamSource,進行flatMap
、keyBy
、timeWindow
、reduce
轉換操做。大數據
DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } });
這段邏輯中,對上面取到的DataStreamSource數據流分別作了flatMap
、keyBy
、timeWindow
、reduce
四個轉換操做,下面說一下flatMap
轉換,其餘三個轉換操做讀者能夠試着本身查看源碼理解一下。
先看一下flatMap
方法的源碼吧,以下。
//代碼目錄:org/apache/flink/streaming/api/datastream/DataStream.java public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper))); }
這裏面作了兩件事,一是用反射拿到了flatMap
算子的輸出類型,二是生成了一個operator。flink流式計算的核心概念就是將數據從輸入流一個個傳遞給operator進行鏈式處理,最後交給輸出流的過程。對數據的每一次處理在邏輯上成爲一個operator。上面代碼中的最後一行transform
方法的做用是返回一個SingleOutputStreamOperator
,它繼承了Datastream
類而且定義了一些輔助方法,方便對流的操做。在返回以前,transform
方法還把它註冊到了執行環境中。下面這張圖是一個由Flink程序映射爲Streaming Dataflow的示意圖:
windowCounts.print().setParallelism(1);
每一個Flink程序都是以source開始以sink結尾,這裏的print
方法就是把計算出來的結果sink標準輸出流。在實際開發中,通常會經過官網提供的各類Connectors或者自定義的Connectors把計算好的結果數據sink到指定的地方,好比Kafka、HBase、FileSystem、Elasticsearch等等。這裏的setParallelism
是設置此接收器的並行度的,值必須大於零。
env.execute("Socket Window WordCount");
Flink有遠程模式和本地模式兩種執行模式,這兩種模式有一點不一樣,這裏按本地模式來解析。先看下execute
方法的源碼,以下:
//代碼目錄:org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @Override public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // add (and override) the settings with what the user defined configuration.addAll(this.configuration); if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); } MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort()); return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
這個方法包含三部分:將流程序轉換爲JobGraph、使用用戶定義的內容添加(或覆蓋)設置、啓動一個miniCluster並執行任務。關於JobGraph暫先不講,這裏就只說一下執行任務,跟進下return miniCluster.executeJobBlocking(jobGraph);
這行的源碼,以下:
//代碼目錄:org/apache/flink/runtime/minicluster/MiniCluster.java @Override public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job, "job is null"); final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job); final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose( (JobSubmissionResult ignored) -> requestJobResult(job.getJobID())); final JobResult jobResult; try { jobResult = jobResultFuture.get(); } catch (ExecutionException e) { throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e); } try { return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new JobExecutionException(job.getJobID(), e); } }
這段代碼的核心邏輯就是final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
,調用了MiniCluster
類的submitJob
方法,接着看這個方法:
//代碼目錄:org/apache/flink/runtime/minicluster/MiniCluster.java public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture(); // we have to allow queued scheduling in Flip-6 mode because we need to request slots // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); }
這裏的Dispatcher
組件負責接收做業提交,持久化它們,生成JobManagers來執行做業並在主機故障時恢復它們。Dispatcher
有兩個實現,在本地環境下啓動的是MiniDispatcher
,在集羣環境上啓動的是StandaloneDispatcher
。下面是類結構圖:
這裏的Dispatcher
啓動了一個JobManagerRunner
,委託JobManagerRunner
去啓動該Job的JobMaster
。對應的代碼以下:
//代碼目錄:org/apache/flink/runtime/jobmaster/JobManagerRunner.java private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) { final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus(); return jobSchedulingStatusFuture.thenCompose( jobSchedulingStatus -> { if (jobSchedulingStatus == JobSchedulingStatus.DONE) { return jobAlreadyDone(); } else { return startJobMaster(leaderSessionId); } }); }
JobMaster
通過一系列方法嵌套調用以後,最終執行到下面這段邏輯:
//代碼目錄:org/apache/flink/runtime/jobmaster/JobMaster.java private void scheduleExecutionGraph() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } }
這裏executionGraph.scheduleForExecution();
調用了ExecutionGraph
的啓動方法。在Flink的圖結構中,ExecutionGraph
是真正被執行的地方,因此到這裏爲止,一個任務從提交到真正執行的流程就結束了,下面再回顧一下本地環境下的執行流程:
execute
方法;MiniCluster
完成了大部分任務後把任務直接委派給MiniDispatcher
;Dispatcher
接收job以後,會實例化一個JobManagerRunner
,而後用這個實例啓動job;JobManagerRunner
接下來把job交給JobMaster
去處理;JobMaster
使用ExecutionGraph
的方法啓動整個執行圖,整個任務就啓動起來了。