Flink是大數據處理領域最近很火的一個開源的分佈式、高性能的流式處理框架,其對數據的處理能夠達到毫秒級別。本文以一個來自官網的WordCount例子爲引,全面闡述flink的核心架構及執行流程,但願讀者能夠藉此更加深刻的理解Flink邏輯。html
本文跳過了一些基本概念,若是對相關概念感到迷惑,請參考官網文檔。另外在本文寫做過程當中,Flink正式發佈了其1.5 RELEASE版本,在其發佈以後完成的內容將按照1.5的實現來組織。java
首先,咱們把WordCount的例子再放一遍:mysql
public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { if (args.length != 2){ System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>"); return; } String hostName = args[0]; Integer port = Integer.parseInt(args[1]); // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // get input data DataStream<String> text = env.socketTextStream(hostName, port); text.flatMap(new LineSplitter()).setParallelism(1) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1).setParallelism(1) .print(); // execute program env.execute("Java WordCount from SocketTextStream Example"); } /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). */ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
首先從命令行中獲取socket對端的ip和端口,而後啓動一個執行環境,從socket中讀取數據,split成單個單詞的流,並按單詞進行總和的計數,最後打印出來。這個例子相信接觸過大數據計算或者函數式編程的人都能看懂,就不過多解釋了。程序員
程序的啓動,從這句開始:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
。
這行代碼會返回一個可用的執行環境。執行環境是整個flink程序執行的上下文,記錄了相關配置(如並行度等),並提供了一系列方法,如讀取輸入流的方法,以及真正開始運行整個代碼的execute方法等。對於分佈式流處理程序來講,咱們在代碼中定義的flatMap,keyBy等等操做,事實上能夠理解爲一種聲明,告訴整個程序咱們採用了什麼樣的算子,而真正開啓計算的代碼不在此處。因爲咱們是在本地運行flink程序,所以這行代碼會返回一個LocalStreamEnvironment,最後咱們要調用它的execute方法來開啓真正的任務。咱們先接着往下看。web
咱們以flatMap爲例,text.flatMap(new LineSplitter())
這一句話跟蹤進去是這樣的:redis
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper))); }
裏面完成了兩件事,一是用反射拿到了flatMap算子的輸出類型,二是生成了一個Operator。flink流式計算的核心概念,就是將數據從輸入流一個個傳遞給Operator進行鏈式處理,最後交給輸出流的過程。對數據的每一次處理在邏輯上成爲一個operator,而且爲了本地化處理的效率起見,operator之間也能夠串成一個chain一塊兒處理(能夠參考責任鏈模式幫助理解)。下面這張圖代表了flink是如何看待用戶的處理流程的:抽象化爲一系列operator,以source開始,以sink結尾,中間的operator作的操做叫作transform,而且能夠把幾個操做串在一塊兒執行。
咱們也能夠更改flink的設置,要求它不要對某個操做進行chain處理,或者從某個操做開啓一個新chain等。
上面代碼中的最後一行transform方法的做用是返回一個SingleOutputStreamOperator,它繼承了Datastream類而且定義了一些輔助方法,方便對流的操做。在返回以前,transform方法還把它註冊到了執行環境中(後面生成執行圖的時候還會用到它)。其餘的操做,包括keyBy,sum和print,都只是不一樣的算子,在這裏出現都是同樣的效果,即生成一個operator並註冊給執行環境用於生成DAG。spring
程序執行即env.execute("Java WordCount from SocketTextStream Example")
這行代碼。sql
這行代碼主要作了如下事情:數據庫
遠程模式的程序執行更加有趣一點。第一步仍然是獲取StreamGraph,而後調用executeRemotely方法進行遠程執行。
該方法首先建立一個用戶代碼加載器apache
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader());
而後建立一系列配置,交給Client對象。Client這個詞有意思,看見它就知道這裏絕對是跟遠程集羣打交道的客戶端。
ClusterClient client; try { client = new StandaloneClusterClient(configuration); client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); } } try { return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult(); }
client的run方法首先生成一個JobGraph,而後將其傳遞給JobClient。關於Client、JobClient、JobManager到底誰管誰,能夠看這張圖:
確切的說,JobClient負責以異步的方式和JobManager通訊(Actor是scala的異步模塊),具體的通訊任務由JobClientActor完成。相對應的,JobManager的通訊任務也由一個Actor完成。
JobListeningContext jobListeningContext = submitJob( actorSystem,config,highAvailabilityServices,jobGraph,timeout,sysoutLogUpdates, classLoader); return awaitJobResult(jobListeningContext);
能夠看到,該方法阻塞在awaitJobResult方法上,並最終返回了一個JobListeningContext,透過這個Context能夠獲得程序運行的狀態和結果。
上面提到,整個程序真正意義上開始執行,是這裏:
env.execute("Java WordCount from SocketTextStream Example");
遠程模式和本地模式有一點不一樣,咱們先按本地模式來調試。
咱們跟進源碼,(在本地調試模式下)會啓動一個miniCluster,而後開始執行代碼:
// LocalStreamEnvironment.java @Override public JobExecutionResult execute(String jobName) throws Exception { //生成各類圖結構 ...... try { //啓動集羣,包括啓動JobMaster,進行leader選舉等等 miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); //提交任務到JobMaster return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
這個方法裏有一部分邏輯是與生成圖結構相關的,咱們放在第二章裏講;如今咱們先接着往裏跟:
//MiniCluster.java public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job, "job is null"); //在這裏,最終把job提交給了jobMaster final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job); final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose( (JobSubmissionResult ignored) -> requestJobResult(job.getJobID())); ...... }
正如我在註釋裏寫的,這一段代碼核心邏輯就是調用那個submitJob
方法。那麼咱們再接着看這個方法:
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { final DispatcherGateway dispatcherGateway; try { dispatcherGateway = getDispatcherGateway(); } catch (LeaderRetrievalException | InterruptedException e) { ExceptionUtils.checkInterrupted(e); return FutureUtils.completedExceptionally(e); } // we have to allow queued scheduling in Flip-6 mode because we need to request slots // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph); final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose( //在這裏執行了真正的submit操做 (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); }
這裏的Dispatcher
是一個接收job,而後指派JobMaster去啓動任務的類,咱們能夠看看它的類結構,有兩個實現。在本地環境下啓動的是MiniDispatcher
,在集羣上提交任務時,集羣上啓動的是StandaloneDispatcher
。
那麼這個Dispatcher又作了什麼呢?它啓動了一個JobManagerRunner
(這裏我要吐槽Flink的命名,這個東西應該叫作JobMasterRunner纔對,flink裏的JobMaster和JobManager不是一個東西),委託JobManagerRunner去啓動該Job的JobMaster
。咱們看一下對應的代碼:
//jobManagerRunner.java private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception { ...... final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout); ...... }
而後,JobMaster通過了一堆方法嵌套以後,執行到了這裏:
private void scheduleExecutionGraph() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); try { //這裏調用了ExecutionGraph的啓動方法 executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } }
咱們知道,flink的框架裏有三層圖結構,其中ExecutionGraph就是真正被執行的那一層,因此到這裏爲止,一個任務從提交到真正執行的流程就走完了,咱們再回顧一下(順便提一下遠程提交時的流程區別):
RestClusterClient
,這個類會以HTTP Rest的方式把用戶代碼提交到集羣上;JobSubmitHandler
。這個類接手了請求後,委派StandaloneDispatcher啓動job,到這裏以後,本地提交和遠程提交的邏輯日後又統一了;JobManagerRunner
,而後用這個runner啓動job;JobMaster
去處理;ExecutionGraph
的方法啓動了整個執行圖;整個任務就啓動起來了。至此,第一部分就講完了。
第一部分講到,咱們的主函數最後一項任務就是生成StreamGraph,而後生成JobGraph,而後以此開始調度任務運行,因此接下來咱們從這裏入手,繼續探索flink。
事實上,flink總共提供了三種圖的抽象,咱們前面已經提到了StreamGraph和JobGraph,還有一種是ExecutionGraph,是用於調度的基本數據結構。
上面這張圖清晰的給出了flink各個圖的工做原理和轉換過程。其中最後一個物理執行圖並不是flink的數據結構,而是程序開始執行後,各個task分佈在不一樣的節點上,所造成的物理上的關係表示。
那麼,flink抽象出這三層圖結構,四層執行邏輯的意義是什麼呢?
StreamGraph是對用戶邏輯的映射。JobGraph在此基礎上進行了一些優化,好比把一部分操做串成chain以提升效率。ExecutionGraph是爲了調度存在的,加入了並行處理的概念。而在此基礎上真正執行的是Task及其相關結構。
在第一節的算子註冊部分,咱們能夠看到,flink把每個算子transform成一個對流的轉換(好比上文中返回的SingleOutputStreamOperator是一個DataStream的子類),而且註冊到執行環境中,用於生成StreamGraph。實際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations)
其中的transformations是一個list,裏面記錄的就是咱們在transform方法中放進來的算子。
StreamTransformation表明了從一個或多個DataStream生成新DataStream的操做。順便,DataStream類在內部組合了一個StreamTransformation類,實際的轉換操做均經過該類完成。
咱們能夠看到,從source到各類map,union再到sink操做所有被映射成了StreamTransformation。
其映射過程以下所示:
以MapFunction爲例:
最後,將該transformation註冊到執行環境中,當執行上文提到的generate方法時,生成StreamGraph圖結構。
另外,並非每個 StreamTransformation 都會轉換成runtime層中的物理操做。有一些只是邏輯概念,好比union、split/select、partition等。以下圖所示的轉換樹,在運行時會優化成下方的操做圖。
咱們從StreamGraphGenerator.generate()方法往下看:
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) { return new StreamGraphGenerator(env).generateInternal(transformations); } //注意,StreamGraph的生成是從sink開始的 private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) { for (StreamTransformation<?> transformation: transformations) { transform(transformation); } return streamGraph; } //這個方法的核心邏輯就是判斷傳入的steamOperator是哪一種類型,並執行相應的操做,詳情見下面那一大堆if-else private Collection<Integer> transform(StreamTransformation<?> transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection<Integer> transformedIds; //這裏對操做符的類型進行判斷,並以此調用相應的處理邏輯.簡而言之,處理的核心無非是遞歸的將該節點和節點的上游節點加入圖 if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } //注意這裏和函數開始時的方法相對應,在有向圖中要注意避免循環的產生 // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() > 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; }
由於map,filter等經常使用操做都是OneInputStreamOperator,咱們就來看看transformOneInputTransform((OneInputTransformation<?, ?>) transform)
方法。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) { Collection<Integer> inputIds = transform(transform.getInput()); // 在遞歸處理節點過程當中,某個節點可能已經被其餘子節點先處理過了,須要跳過 if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } //這裏是獲取slotSharingGroup。這個group用來定義當前咱們在處理的這個操做符能夠跟什麼操做符chain到一個slot裏進行操做 //由於有時候咱們可能不滿意flink替咱們作的chain聚合 //一個slot就是一個執行task的基本容器 String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); //把該operator加入圖 streamGraph.addOperator(transform.getId(), slotSharingGroup, transform.getOperator(), transform.getInputType(), transform.getOutputType(), transform.getName()); //對於keyedStream,咱們還要記錄它的keySelector方法 //flink並不真正爲每一個keyedStream保存一個key,而是每次須要用到key的時候都使用keySelector方法進行計算 //所以,咱們自定義的keySelector方法須要保證冪等性 //到後面介紹keyGroup的時候咱們還會再次提到這一點 if (transform.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } streamGraph.setParallelism(transform.getId(), transform.getParallelism()); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); //爲當前節點和它的依賴節點創建邊 //這裏能夠看到以前提到的select union partition等邏輯節點被合併入edge的過程 for (Integer inputId: inputIds) { streamGraph.addEdge(inputId, transform.getId(), 0); } return Collections.singleton(transform.getId()); } public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) { addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>(), null); } //addEdge的實現,會合並一些邏輯節點 private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag) { //若是輸入邊是側輸出節點,則把side的輸入邊做爲本節點的輸入邊,並遞歸調用 if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag); //若是輸入邊是select,則把select的輸入邊做爲本節點的輸入邊 } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag); //若是是partition節點 } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag); } else { //正常的edge處理邏輯 StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag); getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); } }
flink提供了一個StreamGraph可視化顯示工具,在這裏
咱們能夠把咱們的程序的執行計劃打印出來System.out.println(env.getExecutionPlan());
複製到這個網站上,點擊生成,如圖所示:
能夠看到,咱們源程序被轉化成了4個operator。
另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程。因爲我設定了每一個操做符的並行度都是1,因此在每一個操做符之間都是直接FORWARD,不存在shuffle的過程。
flink會根據上一步生成的StreamGraph生成JobGraph,而後將JobGraph發送到server端進行ExecutionGraph的解析。
與StreamGraph相似,JobGraph的入口方法是StreamingJobGraphGenerator.createJobGraph()
。咱們直接來看源碼
private JobGraph createJobGraph() { // 設置啓動模式爲全部節點均在一開始就啓動 jobGraph.setScheduleMode(ScheduleMode.EAGER); // 爲每一個節點生成hash id Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // 爲了保持兼容性建立的hash List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); //生成jobvertex,串成chain等 //這裏的邏輯大體能夠理解爲,挨個遍歷節點,若是該節點是一個chain的頭節點,就生成一個JobVertex,若是不是頭節點,就要把自身配置併入頭節點,而後把頭節點和本身的出邊相連;對於不能chain的節點,看成只有頭節點處理便可 setChaining(hashes, legacyHashes, chainedOperatorHashes); //設置輸入邊edge setPhysicalEdges(); //設置slot共享group setSlotSharing(); //配置檢查點 configureCheckpointing(); // 若是有以前的緩存文件的配置的話,從新讀入 for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) { DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration()); } // 傳遞執行環境配置 try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } return jobGraph; }
爲了更高效地分佈式執行,Flink會盡量地將operator的subtask連接(chain)在一塊兒造成task。每一個task在一個線程中執行。將operators連接成task是很是有效的優化:它能減小線程之間的切換,減小消息的序列化/反序列化,減小數據在緩衝區的交換,減小了延遲的同時提升總體的吞吐量。
上圖中將KeyAggregation和Sink兩個operator進行了合併,由於這兩個合併後並不會改變總體的拓撲結構。可是,並非任意兩個 operator 就能 chain 一塊兒的,其條件仍是很苛刻的:
- 上下游的並行度一致
- 下游節點的入度爲1 (也就是說下游節點沒有來自其餘節點的輸入)
- 上下游節點都在同一個 slot group 中(下面會解釋 slot group)
- 下游節點的 chain 策略爲 ALWAYS(能夠與上下游連接,map、flatmap、filter等默認是ALWAYS)
- 上游節點的 chain 策略爲 ALWAYS 或 HEAD(只能與下游連接,不能與上游連接,Source默認是HEAD)
- 兩個節點間數據分區方式是 forward(參考理解數據流的分區)
- 用戶沒有禁用 chain
flink的chain邏輯是一種很常見的設計,好比spring的interceptor也是相似的實現方式。經過把操做符串成一個大操做符,flink避免了把數據序列化後經過網絡發送給其餘節點的開銷,可以大大加強效率。
前面已經提到,JobGraph的提交依賴於JobClient和JobManager之間的異步通訊,如圖所示:
在submitJobAndWait方法中,其首先會建立一個JobClientActor的ActorRef,而後向其發起一個SubmitJobAndWait消息,該消息將JobGraph的實例提交給JobClientActor。發起模式是ask,它表示須要一個應答消息。
Future<Object> future = Patterns.ask(jobClientActor, new JobClientMessages.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT())); answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
該SubmitJobAndWait消息被JobClientActor接收後,最終經過調用tryToSubmitJob方法觸發真正的提交動做。當JobManager的actor接收到來自client端的請求後,會執行一個submitJob方法,主要作如下事情:
- 向BlobLibraryCacheManager註冊該Job;
- 構建ExecutionGraph對象;
- 對JobGraph中的每一個頂點進行初始化;
- 將DAG拓撲中從source開始排序,排序後的頂點集合附加到Exec> - utionGraph對象;
- 獲取檢查點相關的配置,並將其設置到ExecutionGraph對象;
- 向ExecutionGraph註冊相關的listener;
- 執行恢復操做或者將JobGraph信息寫入SubmittedJobGraphStore以在後續用於恢復目的;
- 響應給客戶端JobSubmitSuccess消息;
- 對ExecutionGraph對象進行調度執行;
最後,JobManger會返回消息給JobClient,通知該任務是否提交成功。
與StreamGraph和JobGraph不一樣,ExecutionGraph並非在咱們的客戶端程序生成,而是在服務端(JobManager處)生成的,順便flink只維護一個JobManager。其入口代碼是ExecutionGraphBuilder.buildGraph(...)
該方法長200多行,其中一大半是checkpoiont的相關邏輯,咱們暫且略過,直接看核心方法executionGraph.attachJobGraph(sortedTopology)
由於ExecutionGraph事實上只是改動了JobGraph的每一個節點,而沒有對整個拓撲結構進行變更,因此代碼裏只是挨個遍歷jobVertex並進行處理:
for (JobVertex jobVertex : topologiallySorted) { if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) { this.isStoppable = false; } //在這裏生成ExecutionGraph的每一個節點 //首先是進行了一堆賦值,將任務信息交給要生成的圖節點,以及設定並行度等等 //而後是建立本節點的IntermediateResult,根據本節點的下游節點的個數肯定建立幾份 //最後是根據設定好的並行度建立用於執行task的ExecutionVertex //若是job有設定inputsplit的話,這裏還要指定inputsplits ExecutionJobVertex ejv = new ExecutionJobVertex( this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp); //這裏要處理全部的JobEdge //對每一個edge,獲取對應的intermediateResult,並記錄到本節點的輸入上 //最後,把每一個ExecutorVertex和對應的IntermediateResult關聯起來 ejv.connectToPredecessors(this.intermediateResults); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); if (previousTask != null) { throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask)); } for (IntermediateResult res : ejv.getProducedDataSets()) { IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); if (previousDataSet != null) { throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet)); } } this.verticesInCreationOrder.add(ejv); this.numVerticesTotal += ejv.getParallelism(); newExecJobVertices.add(ejv); }
至此,ExecutorGraph就建立完成了。
關於flink的任務執行架構,官網的這兩張圖就是最好的說明:
Flink 集羣啓動後,首先會啓動一個 JobManger 和多個的 TaskManager。用戶的代碼會由JobClient 提交給 JobManager,JobManager 再把來自不一樣用戶的任務發給 不一樣的TaskManager 去執行,每一個TaskManager管理着多個task,task是執行計算的最小結構, TaskManager 將心跳和統計信息彙報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述除了task外的三者均爲獨立的 JVM 進程。
要注意的是,TaskManager和job並不是一一對應的關係。flink調度的最小單元是task而非TaskManager,也就是說,來自不一樣job的不一樣task可能運行於同一個TaskManager的不一樣線程上。
一個flink任務全部可能的狀態如上圖所示。圖上畫的很明白,就再也不贅述了。
Task slot是一個TaskManager內資源分配的最小載體,表明了一個固定大小的資源子集,每一個TaskManager會將其所佔有的資源平分給它的slot。
經過調整 task slot 的數量,用戶能夠定義task之間是如何相互隔離的。每一個 TaskManager 有一個slot,也就意味着每一個task運行在獨立的 JVM 中。每一個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。
而在同一個JVM進程中的task,能夠共享TCP鏈接(基於多路複用)和心跳消息,能夠減小數據的網絡傳輸,也能共享一些數據結構,必定程度上減小了每一個task的消耗。
每一個slot能夠接受單個task,也能夠接受多個連續task組成的pipeline,以下圖所示,FlatMap函數佔用一個taskslot,而key Agg函數和sink函數共用一個taskslot:
爲了達到共用slot的目的,除了能夠以chain的方式pipeline算子,咱們還能夠容許SlotSharingGroup,以下圖所示:
咱們能夠把不能被chain成一條的兩個操做如flatmap和key&sink放在一個TaskSlot裏執行,這樣作能夠得到如下好處:
JobManager負責接收 flink 的做業,調度 task,收集 job 的狀態、管理 TaskManagers。被實現爲一個 akka actor。
先列出JobManager啓動的核心代碼
def runJobManager( configuration: Configuration, executionMode: JobManagerMode, listeningAddress: String, listeningPort: Int) : Unit = { val numberProcessors = Hardware.getNumberCPUCores() val futureExecutor = Executors.newScheduledThreadPool( numberProcessors, new ExecutorThreadFactory("jobmanager-future")) val ioExecutor = Executors.newFixedThreadPool( numberProcessors, new ExecutorThreadFactory("jobmanager-io")) val timeout = AkkaUtils.getTimeout(configuration) // we have to first start the JobManager ActorSystem because this determines the port if 0 // was chosen before. The method startActorSystem will update the configuration correspondingly. val jobManagerSystem = startActorSystem( configuration, listeningAddress, listeningPort) val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, ioExecutor, AddressResolution.NO_ADDRESS_RESOLUTION) val metricRegistry = new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(configuration)) metricRegistry.startQueryService(jobManagerSystem, null) val (_, _, webMonitorOption, _) = try { startJobManagerActors( jobManagerSystem, configuration, executionMode, listeningAddress, futureExecutor, ioExecutor, highAvailabilityServices, metricRegistry, classOf[JobManager], classOf[MemoryArchivist], Option(classOf[StandaloneResourceManager]) ) } catch { case t: Throwable => futureExecutor.shutdownNow() ioExecutor.shutdownNow() throw t } // block until everything is shut down jobManagerSystem.awaitTermination() ....... }
startJobManagerActors()
方法中啓動JobManagerActors,以及webserver,TaskManagerActor,ResourceManager等等JobManager 是一個Actor,經過各類消息來完成核心邏輯:
override def handleMessage: Receive = { case GrantLeadership(newLeaderSessionID) => log.info(s"JobManager $getAddress was granted leadership with leader session ID " + s"$newLeaderSessionID.") leaderSessionID = newLeaderSessionID .......
有幾個比較重要的消息:
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { ...... executionGraph = ExecutionGraphBuilder.buildGraph( executionGraph, jobGraph, flinkConfiguration, futureExecutor, ioExecutor, scheduler, userCodeLoader, checkpointRecoveryFactory, Time.of(timeout.length, timeout.unit), restartStrategy, jobMetrics, numSlots, blobServer, log.logger) ...... if (leaderElectionService.hasLeadership) { log.info(s"Scheduling job $jobId ($jobName).") executionGraph.scheduleForExecution() } else { self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false)) log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " + "this. I am not scheduling the job for execution.") ...... }
首先作一些準備工做,而後獲取一個ExecutionGraph,判斷是不是恢復的job,而後將job保存下來,而且通知客戶端本地已經提交成功了,最後若是確認本JobManager是leader,則執行executionGraph.scheduleForExecution()
方法,這個方法通過一系列調用,把每一個ExecutionVertex傳遞給了Excution類的deploy方法:
public void deploy() throws JobException { ...... try { // good, we are allowed to deploy if (!slot.setExecutedVertex(this)) { throw new JobException("Could not assign the ExecutionVertex to the slot " + slot); } // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { slot.releaseSlot(); return; } if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation().getHostname())); } final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskState, attemptNumber); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); ...... } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); } }
咱們首先生成了一個TaskDeploymentDescriptor,而後交給了taskManagerGateway.submitTask()
方法執行。接下來的部分,就屬於TaskManager的範疇了。
TaskManager是flink中資源管理的基本組件,是全部執行任務的基本容器,提供了內存管理、IO管理、通訊管理等一系列功能,本節對各個模塊進行簡要介紹。
1. MemoryManager flink並無把全部內存的管理都委託給JVM,由於JVM廣泛存在着存儲對象密度低、大內存時GC對系統影響大等問題。因此flink本身抽象了一套內存管理機制,將全部對象序列化後放在本身的MemorySegment上進行管理。MemoryManger涉及內容較多,將在後續章節進行繼續剖析。
2. IOManager flink經過IOManager管理磁盤IO的過程,提供了同步和異步兩種寫模式,又進一步區分了block、buffer和bulk三種讀寫方式。
IOManager提供了兩種方式枚舉磁盤文件,一種是直接遍歷文件夾下全部文件,另外一種是計數器方式,對每一個文件名以遞增順序訪問。
在底層,flink將文件IO抽象爲FileIOChannle,封裝了底層實現。
能夠看到,flink在底層實際上都是以異步的方式進行讀寫。
3. NetworkEnvironment 是TaskManager的網絡 IO 組件,包含了追蹤中間結果和數據交換的數據結構。它的構造器會統一將配置的內存先分配出來,抽象成 NetworkBufferPool 統一管理內存的申請和釋放。意思是說,在輸入和輸出數據時,無論是保留在本地內存,等待chain在一塊兒的下個操做符進行處理,仍是經過網絡把本操做符的計算結果發送出去,都被抽象成了NetworkBufferPool。後續咱們還將對這個組件進行詳細分析。
對於TM來講,執行task就是把收到的TaskDeploymentDescriptor
對象轉換成一個task並執行的過程。TaskDeploymentDescriptor這個類保存了task執行所必須的全部內容,例如序列化的算子,輸入的InputGate和輸出的ResultPartition的定義,該task要做爲幾個subtask執行等等。
按照正常邏輯思惟,很容易想到TM的submitTask方法的行爲:首先是確認資源,如尋找JobManager和Blob,然後創建鏈接,解序列化算子,收集task相關信息,接下來就是建立一個新的Task
對象,這個task對象就是真正執行任務的關鍵所在。
val task = new Task( jobInformation, taskInformation, tdd.getExecutionAttemptId, tdd.getAllocationId, tdd.getSubtaskIndex, tdd.getAttemptNumber, tdd.getProducedPartitions, tdd.getInputGates, tdd.getTargetSlotNumber, tdd.getTaskStateHandles, memoryManager, ioManager, network, bcVarManager, taskManagerConnection, inputSplitProvider, checkpointResponder, blobCache, libCache, fileCache, config, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, context.dispatcher)
若是讀者是從頭開始看這篇blog,裏面有不少對象應該已經比較明確其做用了(除了那個brVarManager,這個是管理廣播變量的,廣播變量是一類會被分發到每一個任務中的共享變量)。接下來的主要任務,就是把這個task啓動起來,而後報告說已經啓動task了:
// all good, we kick off the task, which performs its own initialization task.startTaskThread() sender ! decorateMessage(Acknowledge.get())
在執行new Task()方法時,第一步是把構造函數裏的這些變量賦值給當前task的fields。
接下來是初始化ResultPartition和InputGate。這兩個類描述了task的輸出數據和輸入數據。
for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) { ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId); this.producedPartitions[counter] = new ResultPartition( taskNameWithSubtaskAndId, this, jobId, partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, ioManager, desc.sendScheduleOrUpdateConsumersMessage()); //爲每一個partition初始化對應的writer writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); ++counter; } // Consumed intermediate result partitions this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()]; this.inputGatesById = new HashMap<>(); counter = 0; for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) { SingleInputGate gate = SingleInputGate.create( taskNameWithSubtaskAndId, jobId, executionId, inputGateDeploymentDescriptor, networkEnvironment, this, metricGroup.getIOMetricGroup()); inputGates[counter] = gate; inputGatesById.put(gate.getConsumedResultId(), gate); ++counter; }
最後,建立一個Thread對象,並把本身放進該對象,這樣在執行時,本身就有了自身的線程的引用。
Task對象自己就是一個Runable,所以在其run方法裏定義了運行邏輯。
第一步是切換Task的狀態:
while (true) { ExecutionState current = this.executionState; ////若是當前的執行狀態爲CREATED,則將其設置爲DEPLOYING狀態 if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } //若是當前執行狀態爲FAILED,則發出通知並退出run方法 else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } //若是當前執行狀態爲CANCELING,則將其修改成CANCELED狀態,並退出run else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } //不然說明發生了異常 else { if (metrics != null) { metrics.close(); } throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } }
接下來,就是導入用戶類加載器並加載用戶代碼。
而後,是向網絡管理器註冊當前任務(flink的各個算子在運行時進行數據交換須要依賴網絡管理器),分配一些緩存以保存數據
而後,讀入指定的緩存文件。
而後,再把task建立時傳入的那一大堆變量用於建立一個執行環境Envrionment。
再而後,對於那些並非第一次執行的task(好比失敗後重啓的)要恢復其狀態。
接下來最重要的是
invokable.invoke();
方法。爲何這麼說呢,由於這個方法就是用戶代碼所真正被執行的入口。好比咱們寫的什麼new MapFunction()的邏輯,最終就是在這裏被執行的。這裏說一下這個invokable,這是一個抽象類,提供了能夠被TaskManager執行的對象的基本抽象。
這個invokable是在解析JobGraph的時候生成相關信息的,並在此處造成真正可執行的對象
// now load the task's invokable code //經過反射生成對象 invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
上圖顯示了flink提供的可被執行的Task類型。從名字上就能夠看出各個task的做用,在此再也不贅述。
接下來就是invoke方法了,由於咱們的wordcount例子用了流式api,在此咱們以StreamTask的invoke方法爲例進行說明。
先上部分核心代碼:
public final void invoke() throws Exception { boolean disposed = false; try { // -------- Initialize --------- //先作一些賦值操做 ...... // if the clock is not already set, then assign a default TimeServiceProvider //處理timer if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()); timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } //把以前JobGraph串起來的chain的信息造成實現 operatorChain = new OperatorChain<>(this); headOperator = operatorChain.getHeadOperator(); // task specific initialization //這個init操做的起名很是詭異,由於這裏主要是處理算子採用了自定義的checkpoint檢查機制的狀況,可是起了一個很是大衆臉的名字 init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // -------- Invoke -------- LOG.debug("Invoking {}", getName()); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. //初始化操做符狀態,主要是一些state啥的 initializeState(); //對於富操做符,執行其open操做 openAllOperators(); } // final check to exit early before starting to run f (canceled) { throw new CancelTaskException(); } // let the task do its work //真正開始執行的代碼 isRunning = true; run();
StreamTask.invoke()方法裏,第一個值得一說的是TimerService
。Flink在2015年決定向StreamTask類加入timer service的時候解釋到:
This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase.
第二個要注意的是chain操做。前面提到了,flink會出於優化的角度,把一些算子chain成一個總體的算子做爲一個task來執行。好比wordcount例子中,Source和FlatMap算子就被chain在了一塊兒。在進行chain操做的時候,會設定頭節點,而且指定輸出的RecordWriter。
接下來不出所料仍然是初始化,只不過初始化的對象變成了各個operator。若是是有checkpoint的,那就從state信息裏恢復,否則就做爲全新的算子處理。從源碼中能夠看到,flink針對keyed算子和普通算子作了不一樣的處理。keyed算子在初始化時須要計算出一個group區間,這個區間的值在整個生命週期裏都不會再變化,後面key就會根據hash的不一樣結果,分配到特定的group中去計算。順便提一句,flink的keyed算子保存的是對每一個數據的key的計算方法,而非真實的key,用戶須要本身保證對每一行數據提供的keySelector的冪等性。至於爲何要用KeyGroup的設計,這就牽扯到擴容的範疇了,將在後面的章節進行講述。
對於openAllOperators()
方法,就是對各類RichOperator執行其open方法,一般可用於在執行計算以前加載資源。
最後,run方法千呼萬喚始出來,該方法通過一系列跳轉,最終調用chain上的第一個算子的run方法。在wordcount的例子中,它最終調用了SocketTextStreamFunction的run,創建socket鏈接並讀入文本。
前面提到,Task對象在執行過程當中,把執行的任務交給了StreamTask這個類去執行。在咱們的wordcount例子中,實際初始化的是OneInputStreamTask的對象(參考上面的類圖)。那麼這個對象是如何執行用戶的代碼的呢?
protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
它作的,就是把任務直接交給了InputProcessor去執行processInput方法。這是一個StreamInputProcessor
的實例,該processor的任務就是處理輸入的數據,包括用戶數據、watermark和checkpoint數據等。咱們先來看看這個processor是如何產生的:
public void init() throws Exception { StreamConfig configuration = getConfiguration(); TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); int numberOfInputs = configuration.getNumberOfInputs(); if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); inputProcessor = new StreamInputProcessor<>( inputGates, inSerializer, this, configuration.getCheckpointMode(), getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), this.headOperator); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); } }
這是OneInputStreamTask的init方法,從configs裏面獲取StreamOperator信息,生成本身的inputProcessor。那麼inputProcessor是如何處理數據的呢?咱們接着跟進源碼:
public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } //這個while是用來處理單個元素的(不要想固然覺得是循環處理元素的) while (true) { //注意 1在下面 //2.接下來,會利用這個反序列化器獲得下一個數據記錄,並進行解析(是用戶數據仍是watermark等等),而後進行對應的操做 if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycle(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); //若是元素是watermark,就準備更新當前channel的watermark值(並非簡單賦值,由於有亂序存在), if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { //若是元素是status,就進行相應處理。能夠看做是一個flag,標誌着當前stream接下來即將沒有元素輸入(idle),或者當前即將由空閒狀態轉爲有元素狀態(active)。同時,StreamStatus還對如何處理watermark有影響。經過發送status,上游的operator能夠很方便的通知下游當前的數據流的狀態。 // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { //LatencyMarker是用來衡量代碼執行時間的。在Source處建立,攜帶建立時的時間戳,流到Sink時就能夠知道通過了多長時間 // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { //這裏就是真正的,用戶的代碼即將被執行的地方。從章節1到這裏足足用了三萬字,有點萬里長征的感受 // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } //1.程序首先獲取下一個buffer //這一段代碼是服務於flink的FaultTorrent機制的,後面我會講到,這裏只需理解到它會嘗試獲取buffer,而後賦值給當前的反序列化器 final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } } }
到此爲止,以上部分就是一個flink程序啓動後,到執行用戶代碼以前,flink框架所作的準備工做。回顧一下:
接下來,咱們挑幾個Operator看看flink是如何抽象這些算子的。
StreamSource抽象了一個數據源,而且指定了一些如何處理數據的模式。
public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { ...... public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { run(lockingObject, streamStatusMaintainer, output); } public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); LatencyMarksEmitter latencyEmitter = null; if (getExecutionConfig().isLatencyTrackingEnabled()) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, getExecutionConfig().getLatencyTrackingInterval(), getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } } ...... private static class LatencyMarksEmitter<OUT> { private final ScheduledFuture<?> latencyMarkTimer; public LatencyMarksEmitter( final ProcessingTimeService processingTimeService, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) { latencyMarkTimer = processingTimeService.scheduleAtFixedRate( new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { try { // ProcessingTimeService callbacks are executed under the checkpointing lock output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex)); } catch (Throwable t) { // we catch the Throwables here so that we don't trigger the processing // timer services async exception handler LOG.warn("Error while emitting latency marker.", t); } } }, 0L, latencyTrackingInterval); } public void close() { latencyMarkTimer.cancel(true); } } }
在StreamSource生成上下文以後,接下來就是把上下文交給SourceFunction去執行:
userFunction.run(ctx);
SourceFunction是對Function的一個抽象,就好像MapFunction,KeyByFunction同樣,用戶選擇實現這些函數,而後flink框架就能利用這些函數進行計算,完成用戶邏輯。
咱們的wordcount程序使用了flink提供的一個SocketTextStreamFunction
。咱們能夠看一下它的實現邏輯,對source如何運行有一個基本的認識:
public void run(SourceContext<String> ctx) throws Exception { final StringBuilder buffer = new StringBuilder(); long attempt = 0; while (isRunning) { try (Socket socket = new Socket()) { currentSocket = socket; LOG.info("Connecting to server socket " + hostname + ':' + port); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); char[] cbuf = new char[8192]; int bytesRead; //核心邏輯就是一直讀inputSocket,而後交給collect方法 while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { buffer.append(cbuf, 0, bytesRead); int delimPos; while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { String record = buffer.substring(0, delimPos); // truncate trailing carriage return if (delimiter.equals("\n") && record.endsWith("\r")) { record = record.substring(0, record.length() - 1); } //讀到數據後,把數據交給collect方法,collect方法負責把數據交到合適的位置(如發佈爲br變量,或者交給下個operator,或者經過網絡發出去) ctx.collect(record); buffer.delete(0, delimPos + delimiter.length()); } } } // if we dropped out of this loop due to an EOF, sleep and retry if (isRunning) { attempt++; if (maxNumRetries == -1 || attempt < maxNumRetries) { LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs..."); Thread.sleep(delayBetweenRetries); } else { // this should probably be here, but some examples expect simple exists of the stream source // throw new EOFException("Reached end of stream and reconnects are not enabled."); break; } } } // collect trailing data if (buffer.length() > 0) { ctx.collect(buffer.toString()); } }
整段代碼裏,只有collect方法有些複雜度,後面咱們在講到flink的對象機制時會結合來說,此處知道collect方法會收集結果,而後發送給接收者便可。在咱們的wordcount裏,這個算子的接收者就是被chain在一塊兒的flatmap算子,不記得這個示例程序的話,能夠返回第一章去看一下。
StreamSource是用來開啓整個流的算子,而承接輸入數據並進行處理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。
整個StreamOperator的繼承關係如上圖所示(圖很大,建議點開放大看)。
OneInputStreamOperator這個接口的邏輯很簡單:
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> { /** * Processes one element that arrived at this operator. * This method is guaranteed to not be called concurrently with other methods of the operator. */ void processElement(StreamRecord<IN> element) throws Exception; /** * Processes a {@link Watermark}. * This method is guaranteed to not be called concurrently with other methods of the operator. * * @see org.apache.flink.streaming.api.watermark.Watermark */ void processWatermark(Watermark mark) throws Exception; void processLatencyMarker(LatencyMarker latencyMarker) throws Exception; }
而實現了這個接口的StreamFlatMap算子也很簡單,沒什麼可說的:
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; private transient TimestampedCollector<OUT> collector; public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); } }
從類圖裏能夠看到,flink爲咱們封裝了一個算子的基類AbstractUdfStreamOperator
,提供了一些通用功能,好比把context賦給算子,保存快照等等,其中最爲你們瞭解的應該是這兩個:
@Override public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); } @Override public void close() throws Exception { super.close(); functionsClosed = true; FunctionUtils.closeFunction(userFunction); }
這兩個就是flink提供的Rich***Function
系列算子的open和close方法被執行的地方。
StreamSink着實沒什麼可說的,邏輯很簡單,值得一提的只有兩個方法:
@Override public void processElement(StreamRecord<IN> element) throws Exception { sinkContext.element = element; userFunction.invoke(element.getValue(), sinkContext); } @Override protected void reportOrForwardLatencyMarker(LatencyMarker maker) { // all operators are tracking latencies this.latencyGauge.reportLatency(maker, true); // sinks don't forward latency markers }
其中,processElement
是繼承自StreamOperator的方法。reportOrForwardLatencyMarker
是用來計算延遲的,前面提到StreamSource會產生LateMarker,用於記錄數據計算時間,就是在這裏完成了計算。
算子這部分邏輯相對簡單清晰,就講這麼多吧。
對於7×24小時不間斷運行的流程序來講,要保證fault tolerant是很難的,這不像是離線任務,若是失敗了只須要清空已有結果,從新跑一次就能夠了。對於流任務,若是要保證可以從新處理已處理過的數據,就要把數據保存下來;而這就面臨着幾個問題:好比一是保存多久的數據?二是重複計算的數據應該怎麼處理,怎麼保證冪等性?
對於一個流系統,咱們有如下但願:
storm的fault tolerant是這樣工做的:每個被storm的operator處理的數據都會向其上一個operator發送一份應答消息,通知其已被下游處理。storm的源operator保存了全部已發送的消息的每個下游算子的應答消息,當它收到來自sink的應答時,它就知道該消息已經被完整處理,能夠移除了。
若是沒有收到應答,storm就會重發該消息。顯而易見,這是一種at least once的邏輯。另外,這種方式面臨着嚴重的冪等性問題,例如對一個count算子,若是count的下游算子出錯,source重發該消息,那麼防止該消息被count兩遍的邏輯須要程序員本身去實現。最後,這樣一種處理方式很是低效,吞吐量很低。
前面提到,storm的實現方式就註定了與高吞吐量無緣。那麼,爲了提升吞吐量,把一批數據彙集在一塊兒處理就是很天然的選擇。Spark Streaming的實現就是基於這樣的思路:
咱們能夠在徹底的連續計算與徹底的分批計算中間取折中,經過控制每批計算數據的大小來控制延遲與吞吐量的制約,若是想要低延遲,就用小一點的batch,若是想要大吞吐量,就不得不忍受更高的延遲(更久的等待數據到來的時間和更多的計算),以下圖所示。
以這樣的方式,能夠在每一個batch中作到exactly-once,可是這種方式也有其弊端:
首先,batch的方式使得一些須要跨batch的操做變得很是困難,例如session window;用戶不得不本身想辦法去實現相關邏輯。
其次,batch模式很難作好背壓。當一個batch由於種種緣由處理慢了,那麼下一個batch要麼不得不容納更多的新來數據,要麼不得不堆積更多的batch,整個任務可能會被拖垮,這是一個很是致命的問題。
最後,batch的方式基本意味着其延遲是有比較高的下限的,實時性上很差。
咱們在傳統數據庫,如mysql中使用binlog來完成事務,這樣的思路也能夠被用在實現exactly-once模型中。例如,咱們能夠log下每一個數據元素每一次被處理時的結果和當時所處的操做符的狀態。這樣,當咱們須要fault tolerant時,咱們只須要讀一下log就能夠了。這種模式規避了storm和spark所面臨的問題,而且可以很好的實現exactly-once,惟一的弊端是:如何儘量的減小log的成本?Flink給了咱們答案。
實現exactly-once的關鍵是什麼?是可以準確的知道和快速記錄下來當前的operator的狀態、當前正在處理的元素(以及正處在不一樣算子之間傳遞的元素)。若是上面這些能夠作到,那麼fault tolerant無非就是從持久化存儲中讀取上次記錄的這些元信息,而且恢復到程序中。那麼Flink是如何實現的呢?
Flink的分佈式快照的核心是其輕量級異步分佈式快照機制。爲了實現這一機制,flink引入了一個概念,叫作Barrier。Barrier是一種標記,它被source產生而且插入到流數據中,被髮送到下游節點。當下遊節點處理到該barrier標誌時,這就意味着在該barrier插入到流數據時,已經進入系統的數據在當前節點已經被處理完畢。
如圖所示,每當一個barrier流過一個算子節點時,就說明了在該算子上,能夠觸發一次檢查點,用以保存當前節點的狀態和已經處理過的數據,這就是一份快照。(在這裏能夠聯想一下micro-batch,把barrier想象成分割每一個batch的邏輯,會好理解一點)這樣的方式下,記錄快照就像和前面提到的micro-batch同樣容易。
與此同時,該算子會向下遊發送該barrier。由於數據在算子之間是按順序發送的,因此當下遊節點收到該barrier時,也就意味着一樣的一批數據在下游節點上也處理完畢,能夠進行一次checkpoint,保存基於該節點的一份快照,快照完成後,會通知JobMananger本身完成了這個快照。這就是分佈式快照的基本含義。
再看這張圖:
有時,有的算子的上游節點和下游節點都不止一個,應該怎麼處理呢?若是有不止一個下游節點,就向每一個下游發送barrier。同理,若是有不止一個上游節點,那麼就要等到全部上游節點的同一批次的barrier到達以後,才能觸發checkpoint。由於每一個節點運算速度不一樣,因此有的上游節點可能已經在發下個barrier週期的數據了,有的上游節點還沒發送本次的barrier,這時候,當前算子就要緩存一下提早到來的數據,等比較慢的上游節點發送barrier以後,才能處理下一批數據。
當整個程序的最後一個算子sink都收到了這個barrier,也就意味着這個barrier和上個barrier之間所夾雜的這批元素已經所有落袋爲安。這時,最後一個算子通知JobManager整個流程已經完成,而JobManager隨後發出通知,要求全部算子刪除本次快照內容,以完成清理。這整個部分,就是Flink的兩階段提交的checkpoint過程,以下面四幅圖所示:
總之,經過這種方式,flink實現了咱們前面提到的六項對流處理框架的要求:exactly-once、低延遲、高吞吐、易用的模型、方便的恢復機制。
最後,貼一個美團作的flink與storm的性能對比:flink與storm的性能對比
接下來,咱們結合源碼來看看flink的checkpoint究竟是如何實現其生命週期的:
因爲flink提供的SocketSource並不支持checkpoint,因此這裏我以
FlinkKafkaConsumer010
做爲sourceFunction。
要完成一次checkpoint,第一步必然是發起checkpoint請求。那麼,這個請求是哪裏發出的,怎麼發出的,又由誰控制呢?
還記得若是咱們要設置checkpoint的話,須要指定checkpoint間隔吧?既然是一個指定間隔觸發的功能,那應該會有相似於Scheduler的東西存在,flink裏,這個負責觸發checkpoint的類是CheckpointCoordinator
。
flink在提交job時,會啓動這個類的startCheckpointScheduler
方法,以下所示
public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS); } } private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint.", e); } } }
啓動以後,就會以設定好的頻率調用triggerCheckPoint()
方法。這個方法太長,我大概說一下都作了什麼:
checkpointID = checkpointIdCounter.getAndIncrement();
以生成一個新的id,而後生成一個PendingCheckpoint
。PendingCheckpoint是一個啓動了的checkpoint,可是尚未被確認。等到全部的task都確認了本次checkpoint,那麼這個checkpoint對象將轉化爲一個CompletedCheckpoint
。// send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); }
這裏是調用了Execution的triggerCheckpoint方法,一個execution就是一個executionVertex的實際執行者。咱們看一下這個方法:
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final LogicalSlot slot = assignedResource; if (slot != null) { //TaskManagerGateway是用來跟taskManager進行通訊的組件 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is " + "no longer running."); } }
再往下跟就進入了Task
類的範疇,咱們將在下一小節進行解讀。本小節主要講了CheckpointCoordinator
類是如何觸發一次checkpoint,從其名字也能夠看出來其功能:檢查點協調器。
先說Task類中的部分,該類建立了一個CheckpointMetaData
的對象,而且生成了一個Runable匿名類用於執行checkpoint,而後以異步的方式觸發了該Runable:
public void triggerCheckpointBarrier( final long checkpointID, long checkpointTimestamp, final CheckpointOptions checkpointOptions) { ...... Runnable runnable = new Runnable() { @Override public void run() { // set safety net from the task's context for checkpointing thread LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName)); } } ...... } }; executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } }
上面代碼裏的invokable事實上就是咱們的StreamTask了。Task類其實是將checkpoint委託給了更具體的類去執行,而StreamTask也將委託給更具體的類,直到業務代碼。
StreamTask是這樣實現的:
private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { synchronized (lock) { if (isRunning) { operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { ...... } } }
完成broadcastCheckpointBarrier
方法後,在checkpointState()
方法中,StreamTask還作了不少別的工做:
public void executeCheckpointing() throws Exception { ...... try { //這裏,就是調用StreamOperator進行snapshotState的入口方法 for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op); } // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, operatorSnapshotsInProgress, checkpointMetaData, checkpointMetrics, startAsyncPartNano); owner.cancelables.registerCloseable(asyncCheckpointRunnable); //這裏註冊了一個Runnable,在執行完checkpoint以後向JobManager發出CompletedCheckPoint消息,這也是fault tolerant兩階段提交的一部分 owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable); ...... } }
說到checkpoint,咱們印象裏最直觀的感覺確定是咱們的一些作聚合的操做符的狀態保存,好比sum的和以及count的值等等。這些內容就是StreamOperator部分將要觸發保存的內容。能夠看到,除了咱們直觀的這些操做符的狀態保存外,flink的checkpoint作了大量的其餘工做。
接下來,咱們就把目光轉向操做符的checkpoint機制。
第四章時,咱們已經瞭解了StreamOperator的類關係,這裏,咱們就直接接着上一節的checkpointStreamOperator(op)
方法往下講。
順便,前面也提到了,在進行checkpoint以前,operator初始化時,會執行一個initializeState
方法,在該方法中,若是task是從失敗中恢復的話,其保存的state也會被restore進來。
傳遞barrier是在進行本operator的statesnapshot以前完成的,咱們先來看看其邏輯,其實和傳遞一條數據是相似的,就是生成一個CheckpointBarrier
對象,而後向每一個streamOutput寫進去:
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException { try { CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions); for (RecordWriterOutput<?> streamOutput : streamOutputs) { streamOutput.broadcastEvent(barrier); } } catch (InterruptedException e) { throw new IOException("Interrupted while broadcasting checkpoint barrier"); } }
下游的operator接收到本barrier,就會觸發其自身的checkpoint。
StreamTask在執行完broadcastCheckpointBarrier以後,
咱們當前的wordcount程序裏有兩個operator chain,分別是:
咱們就按這個順序來捋一下checkpoint的過程。
1.kafka source的checkpoint過程
public final void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("snapshotState() called on closed source"); } else { unionOffsetStates.clear(); final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue())); } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); } } else { HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); } for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { unionOffsetStates.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // truncate the map of pending offsets to commit, to prevent infinite growth while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { pendingOffsetsToCommit.remove(0); } } } }
kafka的snapshot邏輯就是記錄一下當前消費的offsets,而後作成tuple(partitiion,offset)放進一個StateBackend
裏。StateBackend是flink抽象出來的一個用於保存狀態的接口。
2.FlatMap算子的checkpoint過程
沒什麼可說的,就是調用了snapshotState()方法而已。
3.本operator chain的state保存過程
細心的同窗應該注意到了,各個算子的snapshot方法只把本身的狀態保存到了StateBackend裏,沒有寫入的持久化操做。這部分操做被放到了AbstractStreamOperator
中,由flink統一負責持久化。其實不須要看源碼咱們也能想出來,持久化無非就是把這些數據用一個流寫到磁盤或者別的地方,接下來咱們來看看是否是這樣:
//仍是AbstractStreamOperator.java的snapshotState方法 if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); }
那麼這個operatorStateBackend是怎麼保存狀態的呢?
咱們來看看這個寫入數據的方法:
public SnapshotResult<OperatorStateHandle> performOperation() throws Exception { long asyncStartTime = System.currentTimeMillis(); CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out; // get the registered operator state infos ... List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots = new ArrayList<>(registeredOperatorStatesDeepCopies.size()); for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) { operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } // ... write them all in the checkpoint stream ... DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); backendSerializationProxy.write(dov); ...... }
註釋寫的很清楚,我就很少說了。
4.後繼operatorChain的checkpoint過程
前面說到,在flink的流中,barrier流過期會觸發checkpoint。在上面第1步中,上游節點已經發出了Barrier,因此在咱們的keyed aggregation -> sink 這個operatorchain中,咱們將首先捕獲這個barrier。
捕獲barrier的過程其實就是處理input數據的過程,對應着StreamInputProcessor.processInput()
方法,該方法咱們在第四章已經講過,這裏咱們簡單回顧一下:
//每一個元素都會觸發這一段邏輯,若是下一個數據是buffer,則從外圍的while循環裏進入處理用戶數據的邏輯;這個方法裏默默的處理了barrier的邏輯 final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } }
處理barrier的過程在這段代碼裏沒有體現,由於被包含在了getNextNonBlocked()
方法中,咱們看下這個方法的核心邏輯:
//BarrierBuffer.getNextNonBlocked方法 else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { if (!endOfStream) { // process barriers only if there is a chance of the checkpoint completing processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } } else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent()); }
先提一嘴,你們還記得以前的部分也提到過CheckpointMarker吧,這裏正好也對上了。
處理barrier也是個麻煩事,你們回想一下5.1節提到的屏障的原理圖,一個opertor必須收到從每一個inputchannel發過來的同一序號的barrier以後才能發起本節點的checkpoint,若是有的channel的數據處理的快了,那該barrier後的數據還須要緩存起來,若是有的inputchannel被關閉了,那它就不會再發送barrier過來了:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); // fast path for single channel cases if (totalNumberOfInputChannels == 1) { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; notifyCheckpoint(receivedBarrier); } return; } // -- general code path for multiple input channels -- if (numBarriersReceived > 0) { // this is only true if some alignment is already progress and was not canceled if (barrierId == currentCheckpointId) { // regular case onBarrier(channelIndex); } else if (barrierId > currentCheckpointId) { // we did not complete the current checkpoint, another started before LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", barrierId, currentCheckpointId); // let the task know we are not completing this notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); // abort the current checkpoint releaseBlocksAndResetBarriers(); // begin a the new checkpoint beginNewAlignment(barrierId, channelIndex); } else { // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } } else if (barrierId > currentCheckpointId) { // first barrier of a new checkpoint beginNewAlignment(barrierId, channelIndex); } else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint return; } // check if we have all barriers - since canceled checkpoints always have zero barriers // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { LOG.debug("Received all barriers, triggering checkpoint {} at {}", receivedBarrier.getId(), receivedBarrier.getTimestamp()); } releaseBlocksAndResetBarriers(); notifyCheckpoint(receivedBarrier); } }
總之,當收到所有的barrier以後,就會觸發notifyCheckpoint()
,該方法又會調用StreamTask的triggerCheckpoint
,和以前的operator是同樣的。
若是還有後續的operator的話,就是徹底相同的循環,再也不贅述。
5.報告完成checkpoint事件
當一個operator保存完checkpoint數據後,就會啓動一個異步對象AsyncCheckpointRunnable
,用以報告該檢查點已完成,其具體邏輯在reportCompletedSnapshotStates中。這個方法把任務又最終委託給了RpcCheckpointResponder
這個類:
checkpointResponder.acknowledgeCheckpoint( jobId, executionAttemptID, checkpointId, checkpointMetrics, acknowledgedState);
從這個類也能夠看出來,它的邏輯是經過rpc的方式遠程調JobManager的相關方法完成報告事件,底層也是經過akka實現的。
那麼,誰響應了這個rpc調用呢?是該任務的JobMaster。
//JobMaster.java public void acknowledgeCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint( jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState); if (checkpointCoordinator != null) { getRpcService().execute(() -> { try { checkpointCoordinator.receiveAcknowledgeMessage(ackMessage); } catch (Throwable t) { log.warn("Error while processing checkpoint acknowledgement message"); } }); } else { log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", jobGraph.getJobID()); } }
JobMaster反手就是一巴掌就把任務又rpc給了CheckpointCoordinator.receiveAcknowledgeMessage()
方法。
以前提到,coordinator在觸發checkpoint時,生成了一個PendingCheckpoint
,保存了全部operator的id。
當PendingCheckpoint收到一個operator的完成checkpoint的消息時,它就把這個operator從未完成checkpoint的節點集合移動到已完成的集合。當全部的operator都報告完成了checkpoint時,CheckpointCoordinator會觸發completePendingCheckpoint()
方法,該方法作了如下事情:
本文裏,收到這個遠程調用的就是那兩個operator chain,咱們來看看其邏輯:
public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (lock) { if (isRunning) { LOG.debug("Notification of complete checkpoint for task {}", getName()); for (StreamOperator<?> operator : operatorChain.getAllOperators()) { if (operator != null) { operator.notifyCheckpointComplete(checkpointId); } } } else { LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName()); } } }
再接下來無非就是層層通知對應的算子作出響應罷了。
至此,flink的兩階段提交的checkpoint邏輯所有完成。
State是快照數據的載體,StateBackend是快照如何被保存的抽象。
State分爲 KeyedState和OperatorState,從名字就能夠看出來分別對應着keyedStream和其餘的oeprator。從State由誰管理上,也能夠區分爲raw state和Managed state。Flink管理的就是Managed state,用戶本身管理的就是raw state。Managed State又分爲ValueState、ListState、ReducingState、AggregatingState、FoldingState、MapState這麼幾種,看名字知用途。
StateBackend目前提供了三個backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,都是看名字知用途系列。
State接口、StateBackend接口及其實現都比較簡單,代碼就不貼了, 尤爲State本質上就是一層容器封裝。
貼個別人寫的狀態管理的文章吧:詳解Flink中的狀態管理
本章打算講一下flink底層是如何定義和在操做符之間傳遞數據的。
Flink做爲一個高效的流框架,爲了不JVM的固有缺陷(java對象存儲密度低,FGC影響吞吐和響應等),必然走上自主管理內存的道路。
這個MemorySegment
就是Flink的內存抽象。默認狀況下,一個MemorySegment能夠被看作是一個32kb大的內存塊的抽象。這塊內存既能夠是JVM裏的一個byte[],也能夠是堆外內存(DirectByteBuffer)。
若是說byte[]數組和direct memory是最底層的存儲,那麼memorysegment就是在其上覆蓋的一層統一抽象。它定義了一系列抽象方法,用於控制和底層內存的交互,如:
public abstract class MemorySegment { public abstract byte get(int index); public abstract void put(int index, byte b); public int size() ; public abstract ByteBuffer wrap(int offset, int length); ...... }
咱們能夠看到,它在提供了諸多直接操做內存的方法外,還提供了一個wrap()
方法,將本身包裝成一個ByteBuffer,咱們待會兒講這個ByteBuffer。
Flink爲MemorySegment提供了兩個實現類:HeapMemorySegment
和HybridMemorySegment
。他們的區別在於前者只能分配堆內存,然後者能用來分配堆內和堆外內存。事實上,Flink框架裏,只使用了後者。這是爲何呢?
若是HybridMemorySegment只能用於分配堆外內存的話,彷佛更合常理。可是在JVM的世界中,若是一個方法是一個虛方法,那麼每次調用時,JVM都要花時間去肯定調用的究竟是哪一個子類實現的該虛方法(方法重寫機制,不明白的去看JVM的invokeVirtual指令),也就意味着每次都要去翻方法表;而若是該方法雖然是個虛方法,但實際上整個JVM裏只有一個實現(就是說只加載了一個子類進來),那麼JVM會很聰明的把它去虛化處理,這樣就不用每次調用方法時去找方法表了,可以大大提高性能。可是隻分配堆內或者堆外內存不能知足咱們的須要,因此就出現了HybridMemorySegment同時能夠分配兩種內存的設計。
咱們能夠看看HybridMemorySegment的構造代碼:
HybridMemorySegment(ByteBuffer buffer, Object owner) { super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner); this.offHeapBuffer = buffer; } HybridMemorySegment(byte[] buffer, Object owner) { super(buffer, owner); this.offHeapBuffer = null; }
其中,第一個構造函數的checkBufferAndGetAddress()
方法可以獲得direct buffer的內存地址,所以能夠操做堆外內存。
在MemorySegment
這個抽象之上,Flink在數據從operator內的數據對象在向TaskManager上轉移,預備被髮給下個節點的過程當中,使用的抽象或者說內存對象是Buffer
。
注意,這個Buffer是個flink接口,不是java.nio提供的那個Buffer抽象類。Flink在這一層面同時使用了這兩個同名概念,用來存儲對象,直接看代碼時處處都是各類xxxBuffer很容易混淆:
HeapByteBuffer
,這個主要是當數據從jvm裏的一個對象被序列化成字節數組時用的;NetworkBuffer
,是對MemorySegment
的包裝。Flink在各個TaskManager之間傳遞數據時,使用的是這一層的抽象。由於Buffer的底層是MemorySegment,這可能不是JVM所管理的,因此爲了知道何時一個Buffer用完了能夠回收,Flink引入了引用計數的概念,當確認這個buffer沒有人引用,就能夠回收這一片MemorySegment用於別的地方了(JVM的垃圾回收爲啥不用引用計數?讀者思考一下):
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf { private volatile int refCnt = 1; ...... }
爲了方便管理NetworkBuffer
,Flink提供了BufferPoolFactory
,而且提供了惟一實現NetworkBufferPool
,這是個工廠模式的應用。
NetworkBufferPool在每一個TaskManager上只有一個,負責全部子task的內存管理。其實例化時就會嘗試獲取全部可由它管理的內存(對於堆內存來講,直接獲取全部內存並放入老年代,並令用戶對象只在新生代存活,能夠極大程度的減小Full GC),咱們看看其構造方法:
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { ...... try { this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate); } catch (OutOfMemoryError err) { throw new OutOfMemoryError("Could not allocate buffer queue of length " + numberOfSegmentsToAllocate + " - " + err.getMessage()); } try { for (int i = 0; i < numberOfSegmentsToAllocate; i++) { ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); } } ...... long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20; LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).", allocatedMb, availableMemorySegments.size(), segmentSize); }
因爲NetworkBufferPool只是個工廠,實際的內存池是LocalBufferPool
。每一個TaskManager都只有一個NetworkBufferPool工廠,可是上面運行的每一個task都要有一個和其餘task隔離的LocalBufferPool池,這從邏輯上很好理解。另外,NetworkBufferPool會計算本身所擁有的全部內存分片數,在分配新的內存池時對每一個內存池應該佔有的內存分片數重分配,步驟是:
實現代碼以下:
private void redistributeBuffers() throws IOException { assert Thread.holdsLock(factoryLock); // All buffers, which are not among the required ones final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; if (numAvailableMemorySegment == 0) { // in this case, we need to redistribute buffers so that every pool gets its minimum for (LocalBufferPool bufferPool : allBufferPools) { bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments()); } return; } long totalCapacity = 0; // long to avoid int overflow for (LocalBufferPool bufferPool : allBufferPools) { int excessMax = bufferPool.getMaxNumberOfMemorySegments() - bufferPool.getNumberOfRequiredMemorySegments(); totalCapacity += Math.min(numAvailableMemorySegment, excessMax); } // no capacity to receive additional buffers? if (totalCapacity == 0) { return; // necessary to avoid div by zero when nothing to re-distribute } final int memorySegmentsToDistribute = MathUtils.checkedDownCast( Math.min(numAvailableMemorySegment, totalCapacity)); long totalPartsUsed = 0; // of totalCapacity int numDistributedMemorySegment = 0; for (LocalBufferPool bufferPool : allBufferPools) { int excessMax = bufferPool.getMaxNumberOfMemorySegments() - bufferPool.getNumberOfRequiredMemorySegments(); // shortcut if (excessMax == 0) { continue; } totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax); final int mySize = MathUtils.checkedDownCast( memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment); numDistributedMemorySegment += mySize; bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize); } assert (totalPartsUsed == totalCapacity); assert (numDistributedMemorySegment == memorySegmentsToDistribute); }
接下來講說這個LocalBufferPool
內存池。
LocalBufferPool的邏輯想一想無非是增刪改查,值得說的是其fields:
/** 該內存池須要的最少內存片數目*/ private final int numberOfRequiredMemorySegments; /** * 當前已經得到的內存片中,尚未寫入數據的空白內存片 */ private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>(); /** * 註冊的全部監控buffer可用性的監聽器 */ private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>(); /** 能給內存池分配的最大分片數*/ private final int maxNumberOfMemorySegments; /** 當前內存池大小 */ private int currentPoolSize; /** * 全部經由NetworkBufferPool分配的,被本內存池引用到的(非直接得到的)分片數 */ private int numberOfRequestedMemorySegments;
承接NetworkBufferPool的重分配方法,咱們來看看LocalBufferPool的setNumBuffers()
方法,代碼很短,邏輯也至關簡單,就不展開說了:
public void setNumBuffers(int numBuffers) throws IOException { synchronized (availableMemorySegments) { checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least %s buffers, but tried to set to %s", numberOfRequiredMemorySegments, numBuffers); if (numBuffers > maxNumberOfMemorySegments) { currentPoolSize = maxNumberOfMemorySegments; } else { currentPoolSize = numBuffers; } returnExcessMemorySegments(); // If there is a registered owner and we have still requested more buffers than our // size, trigger a recycle via the owner. if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) { owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers); } } }
咱們接着往高層抽象走,剛剛提到了最底層內存抽象是MemorySegment,用於數據傳輸的是Buffer,那麼,承上啓下對接從Java對象轉爲Buffer的中間對象是什麼呢?是StreamRecord
。
從StreamRecord<T>
這個類名字就能夠看出來,這個類就是個wrap,裏面保存了原始的Java對象。另外,StreamRecord還保存了一個timestamp。
那麼這個對象是怎麼變成LocalBufferPool內存池裏的一個大號字節數組的呢?藉助了StreamWriter
這個類。
咱們直接來看把數據序列化交出去的方法:
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer<T> serializer = serializers[targetChannel]; SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be // a problem per se, but it can lead to stalls in the // pipeline). if (result.isFullRecord()) { break; } } BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { targetPartition.flush(targetChannel); } }
先說最後一行,若是配置爲flushAlways,那麼會馬上把元素髮送出去,可是這樣吞吐量會降低;Flink的默認設置其實也不是一個元素一個元素的發送,是單獨起了一個線程,每隔固定時間flush一次全部channel,較真起來也算是mini batch了。
再說序列化那一句:SerializationResult result = serializer.addRecord(record);
。在這行代碼中,Flink把對象調用該對象所屬的序列化器序列化爲字節數組。
上一節講了各層數據的抽象,這一節講講數據在各個task之間exchange的過程。
看這張圖:
本節講一下算子之間具體的數據傳輸過程。也先上一張圖:
數據在task之間傳遞有以下幾步:
RecordWriter
。每條記錄都要選擇一個下游節點,因此要通過ChannelSelector
。InputChannel
數據在不一樣機器的算子之間傳遞的步驟就是以上這些。
瞭解了步驟以後,再來看一下部分關鍵代碼:
首先是把數據交給recordwriter。
//RecordWriterOutput.java @Override public void collect(StreamRecord<OUT> record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } //這裏能夠看到把記錄交給了recordwriter pushToRecordWriter(record); }
而後recordwriter把數據發送到對應的通道。
//RecordWriter.java public void emit(T record) throws IOException, InterruptedException { //channelselector登場了 for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { sendToTarget(record, targetChannel); } } private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { //選擇序列化器並序列化數據 RecordSerializer<T> serializer = serializers[targetChannel]; SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be // a problem per se, but it can lead to stalls in the // pipeline). if (result.isFullRecord()) { break; } } BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); //寫入channel result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { targetPartition.flush(targetChannel); } }
接下來是把數據推給底層設施(netty)的過程:
//ResultPartition.java @Override public void flushAll() { for (ResultSubpartition subpartition : subpartitions) { subpartition.flush(); } } //PartitionRequestQueue.java void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { //這裏交給了netty server線程去推 ctx.executor().execute(new Runnable() { @Override public void run() { ctx.pipeline().fireUserEventTriggered(reader); } }); }
netty相關的部分:
//AbstractChannelHandlerContext.java public ChannelHandlerContext fireUserEventTriggered(final Object event) { if (event == null) { throw new NullPointerException("event"); } else { final AbstractChannelHandlerContext next = this.findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); } else { executor.execute(new OneTimeTask() { public void run() { next.invokeUserEventTriggered(event); } }); } return this; } }
最後真實的寫入:
//PartittionRequesetQueue.java private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception { if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) { return; } // Queue an available reader for consumption. If the queue is empty, // we try trigger the actual write. Otherwise this will be handled by // the writeAndFlushNextMessageIfPossible calls. boolean triggerWrite = availableReaders.isEmpty(); registerAvailableReader(reader); if (triggerWrite) { writeAndFlushNextMessageIfPossible(ctx.channel()); } } private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException { ...... next = reader.getNextBuffer(); if (next == null) { if (!reader.isReleased()) { continue; } markAsReleased(reader.getReceiverId()); Throwable cause = reader.getFailureCause(); if (cause != null) { ErrorResponse msg = new ErrorResponse( new ProducerFailedException(cause), reader.getReceiverId()); ctx.writeAndFlush(msg); } } else { // This channel was now removed from the available reader queue. // We re-add it into the queue if it is still available if (next.moreAvailable()) { registerAvailableReader(reader); } BufferResponse msg = new BufferResponse( next.buffer(), reader.getSequenceNumber(), reader.getReceiverId(), next.buffersInBacklog()); if (isEndOfPartitionEvent(next.buffer())) { reader.notifySubpartitionConsumed(); reader.releaseAllResources(); markAsReleased(reader.getReceiverId()); } // Write and flush and wait until this is done before // trying to continue with the next buffer. channel.writeAndFlush(msg).addListener(writeListener); return; } ...... }
上面這段代碼裏第二個方法中調用的writeAndFlush(msg)
就是真正往netty的nio通道里寫入的地方了。在這裏,寫入的是一個RemoteInputChannel,對應的就是下游節點的InputGate的channels。
有寫就有讀,nio通道的另外一端須要讀入buffer,代碼以下:
//CreditBasedPartitionRequestClientHandler.java private void decodeMsg(Object msg) throws Throwable { final Class<?> msgClazz = msg.getClass(); // ---- Buffer -------------------------------------------------------- if (msgClazz == NettyMessage.BufferResponse.class) { NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); if (inputChannel == null) { bufferOrEvent.releaseBuffer(); cancelRequestFor(bufferOrEvent.receiverId); return; } decodeBufferOrEvent(inputChannel, bufferOrEvent); } ...... }
插一句,Flink其實作阻塞和獲取數據的方式很是天然,利用了生產者和消費者模型,當獲取不到數據時,消費者天然阻塞;當數據被加入隊列,消費者被notify。Flink的背壓機制也是藉此實現。
而後在這裏又反序列化成StreamRecord
:
//StreamElementSerializer.java public StreamElement deserialize(DataInputView source) throws IOException { int tag = source.readByte(); if (tag == TAG_REC_WITH_TIMESTAMP) { long timestamp = source.readLong(); return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp); } else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { return new StreamRecord<T>(typeSerializer.deserialize(source)); } else if (tag == TAG_WATERMARK) { return new Watermark(source.readLong()); } else if (tag == TAG_STREAM_STATUS) { return new StreamStatus(source.readInt()); } else if (tag == TAG_LATENCY_MARKER) { return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt()); } else { throw new IOException("Corrupt stream, found tag: " + tag); } }
而後再次在StreamInputProcessor.processInput()
循環中獲得處理。
至此,數據在跨jvm的節點之間的流轉過程就講完了。
在看上一部分的代碼時,有一個小細節不知道讀者有沒有注意到,咱們的數據發送端的代碼叫作PartittionRequesetQueue.java
,而咱們的接收端卻起了一個徹底不相干的名字:CreditBasedPartitionRequestClientHandler.java
。爲何前面加了CreditBased的前綴呢?
在流模型中,咱們期待數據是像水流同樣平滑的流過咱們的引擎,但現實生活不會這麼美好。數據的上游可能由於各類緣由數據量暴增,遠遠超出了下游的瞬時處理能力(回憶一下98年大洪水),致使系統崩潰。
那麼框架應該怎麼應對呢?和人類處理天然災害的方式相似,咱們修建了三峽大壩,當洪水來臨時把大量的水囤積在大壩裏;對於Flink來講,就是在數據的接收端和發送端放置了緩存池,用以緩衝數據,而且設置閘門阻止數據向下流。
那麼Flink又是如何處理背壓的呢?答案也是靠這些緩衝池。
這張圖說明了Flink在生產和消費數據時的大體狀況。ResultPartition
和InputGate
在輸出和輸入數據時,都要向NetworkBufferPool
申請一塊MemorySegment
做爲緩存池。
接下來的狀況和生產者消費者很相似。當數據發送太多,下游處理不過來了,那麼首先InputChannel會被填滿,而後是InputChannel能申請到的內存達到最大,因而下游中止讀取數據,上游負責發送數據的nettyServer會獲得響應,中止從ResultSubPartition讀取緩存,那麼ResultPartition很快也將存滿數據不能被消費,從而生產數據的邏輯被阻塞在獲取新buffer上,很是天然地造成背壓的效果。
Flink本身作了個試驗用以說明這個機制的效果:
咱們首先設置生產者的發送速度爲60%,而後下游的算子以一樣的速度處理數據。而後咱們將下游算子的處理速度下降到30%,能夠看到上游的生產者的數據產生曲線幾乎與消費者同步下滑。然後當咱們解除限速,整個流的速度馬上提升到了100%。
上文已經提到,對於流量控制,一個樸素的思路就是在長江上建三峽鏈路上創建一個攔截的dam,以下圖所示:
基於Credit的流控就是這樣一種創建在信用(消費數據的能力)上的,面向每一個虛鏈路(而非端到端的)流模型,以下圖所示:
首先,下游會向上遊發送一條credit message,用以通知其目前的信用(可聯想信用卡的可用額度),而後上游會根據這個信用消息來決定向下游發送多少數據。當上遊把數據發送給下游時,它就從下游的信用卡上划走相應的額度(credit balance):
下游總共得到的credit數目是Buf_Alloc,已經消費的數據是Fwd_Cnt,上游發送出來的數據是Tx_Cnt,那麼剩下的那部分就是Crd_Bal:
Crd_Bal = Buf_Alloc - ( Tx_Cnt - Fwd_Cnt )
上面這個式子應該很好理解。
能夠看到,Credit Based Flow Control的關鍵是buffer分配。這種分配能夠在數據的發送端完成,也能夠在接收端完成。對於下游可能有多個上游節點的狀況(好比Flink),使用接收端的credit分配更加合理:
上圖中,接收者能夠觀察到每一個上游鏈接的帶寬狀況,而上游的節點Snd1卻不可能輕易知道發往同一個下游節點的其餘Snd2的帶寬狀況,從而若是在上游控制流量將會很困難,而在下游控制流量將會很方便。
所以,這就是爲什麼Flink在接收端有一個基於Credit的Client,而不是在發送端有一個CreditServer的緣由。
最後,再講一下Credit的面向虛鏈路的流設計和端到端的流設計的區別:
如上圖所示,a是面向鏈接的流設計,b是端到端的流設計。其中,a的設計使得當下游節點3因某些狀況必須緩存數據暫緩處理時,每一個上游節點(1和2)均可以利用其緩存保存數據;而端到端的設計b裏,只有節點3的緩存才能夠用於保存數據(讀者能夠從如何實現上想一想爲何)。
對流控制感興趣的讀者,能夠看這篇文章:Traffic Management For High-Speed Networks。
截至第六章,和執行過程相關的部分就所有講完,告一段落了。第七章主要講一點雜七雜八的內容,有時間就不按期更新。
flink有三種時間模型:ProcessingTime,EventTime和IngestionTime。
關於時間模型看這張圖:
從這張圖裏能夠很清楚的看到三種Time模型的區別。
例如,我在寫這段話的時間是2018年5月13日03點47分,可是我引用的這張EventTime的圖片,是2015年畫出來的,那麼這張圖的EventTime是2015年,而ProcessingTime是如今。
Flink官網對於時間戳的解釋很是詳細:點我
Flink對於EventTime模型的實現,依賴的是一種叫作watermark
的對象。watermark是攜帶有時間戳的一個對象,會按照程序的要求被插入到數據流中,用以標誌某個事件在該時間發生了。
我再作一點簡短的說明,仍是以官網的圖爲例:
對於有序到來的數據,假設咱們在timestamp爲11的元素後加入一個watermark,時間記錄爲11,則下個元素收到該watermark時,認爲全部早於11的元素均已到達。這是很是理想的狀況。
而在現實生活中,常常會遇到亂序的數據。這時,咱們雖然在timestamp爲7的元素後就收到了11,可是咱們一直等到了收到元素12以後,才插入了watermark爲11的元素。與上面的圖相比,若是咱們仍然在11後就插入11的watermark,那麼元素9就會被丟棄,形成數據丟失。而咱們在12以後插入watermark11,就保證了9仍然會被下一個operator處理。固然,咱們不可能無限制的永遠等待遲到元素,因此要在哪一個元素後插入11須要根據實際場景權衡。
對於來自多個數據源的watermark,能夠看這張圖:
能夠看到,當一個operator收到多個watermark時,它遵循最小原則(或者說最先),即算子的當前watermark是流經該算子的最小watermark,以允許來自不一樣的source的亂序數據到來。
關於事件時間模型,更多內容能夠參考Stream 101 和谷歌的這篇論文:Dataflow Model paper
就在老白寫這篇blog的時候,Flink發佈了其1.5 RELEASE版本,號稱實現了其部署及處理模型(也就是FLIP-6),因此打算簡略地說一下FLIP-6的主要內容。
1.5以前的Flink模型有不少不足,包括:
就我我的而言,我以爲Flink有一個這裏徹底沒提到的不足纔是最應該修改的:針對任務的徹底的資源隔離。尤爲是若是用Standalone集羣,一個用戶的task跑掛了TaskManager,而後拖垮了整個集羣的狀況簡直不要太多。
Single Job JobManager
最重要的變動是一個JobManager只處理一個job。當咱們生成JobGraph時就順便起一個JobManager,這顯然更加天然。
ResourceManager
其職責包括獲取新的TM和slot,通知失敗,釋放資源以及緩存TM以用於重用等。重要的是,這個組件要能作到掛掉時不要搞垮正在運行的好好的任務。其職責和與JobManager、TaskManager的交互圖以下:
TaskManager
TM要與上面的兩個組件交互。與JobManager交互時,要能提供slot,要能與全部給出slot的JM交互。丟失與JM的鏈接時要能試圖把本TM上的slot的狀況通告給新JM,若是這一步失敗,就要能從新分配slot。
與ResourceManager交互時,要通知RM本身的資源和當前的Job分配狀況,能按照RM的要求分配資源或者關閉自身。
JobManager Slot Pool
這個pool要持有全部分配給當前job的slot資源,而且能在RM掛掉的狀況下管理當前已經持有的slot。
Dispatcher
須要一個Job的分發器的主要緣由是在有的集羣環境下咱們可能須要一個統一的提交和監控點,以及替代以前的Standalone模式下的JobManager。未來對分發器的指望可能包括權限控制等。
YARN
新的基於YARN的架構主要包括再也不須要先在容器裏啓動集羣,而後提交任務;用戶代碼再也不使用動態ClassLoader加載;不用的資源能夠釋放;能夠按需分配不一樣大小的容器等。其執行過程以下:
無Dispatcher時
有Dispatcher時
Mesos
與基於YARN的模式很像,可是隻有帶Dispatcher模式,由於只有這樣才能在Mesos集羣裏跑其RM。
Mesos的Fault Tolerance是相似這樣的:
必須用相似Marathon之類的技術保證Dispatcher的HA。
Standalone
其實沒啥可說的,把之前的JobManager的職責換成如今的Dispatcher就好了。
未來可能會實現一個相似於輕量級Yarn的模式。
Docker/k8s
用戶定義好容器,至少有一個是job specific的(否則怎麼啓動任務);還有用於啓動TM的,能夠不是job specific的。啓動過程以下
分配slot相關細節
重新的TM取slot過程:
從Cached TM取slot過程:
失敗處理
TM失敗
TM失敗時,RM要能檢測到失敗,更新本身的狀態,發送消息給JM,重啓一份TM;JM要能檢測到失敗,從狀態移除失效slot,標記該TM的task爲失敗,並在沒有足夠slot繼續任務時調整規模;TM自身則要能從Checkpoint恢復
RM失敗
此時TM要能檢測到失敗,並準備向新的RM註冊自身,而且向新的RM傳遞自身的資源狀況;JM要能檢測到失敗而且等待新的RM可用,從新請求須要的資源;丟失的數據要能從Container、TM等處恢復。
JM失敗
TM釋放全部task,向新JM註冊資源,而且若是不成功,就向RM報告這些資源可用於重分配;RM坐等;JM丟失的數據從持久化存儲中得到,已完成的checkpoints從HA恢復,從最近的checkpoint重啓task,並申請資源。
JM & RM 失敗
TM將在一段時間內試圖把資源交給新上任的JM,若是失敗,則把資源交給新的RM
TM & RM失敗JM若是正在申請資源,則要等到新的RM啓動後才能得到;JM可能須要調整其規模,由於損失了TM的slot。