StreamGraph是flink四層執行圖中的第一層圖,代碼在org.apache.flink.streaming.api.graph包中,第一層graph主要作的事情是將全部的stransformation添加到DAG中,並設置並行度,設置slot槽位java
具體涉及到的transformation大概有11個,繼承圖以下apache
首先咱們來看一下如何獲取StreamTransformationapi
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { if (typeInfo == null) { if (function instanceof ResultTypeQueryable) { typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType(); } else { try { typeInfo = TypeExtractor.createTypeInfo( SourceFunction.class, function.getClass(), 0, null, null); } catch (final InvalidTypesException e) { typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e); } } } boolean isParallel = function instanceof ParallelSourceFunction; clean(function); StreamSource<OUT, ?> sourceOperator; if (function instanceof StoppableFunction) { sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); } else { sourceOperator = new StreamSource<>(function); } return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); }
最終返回的是DataStreamSource,內部封裝了SourceTransformation,下面看一下DataStream的類圖結構app
能夠看到DataStreamSource是DataStream的子類ide
DataStreamSource是DataStream的數據流抽象,StreamSource是StreamOperator的抽象,在 flink 中一個 DataStream 封裝了一次數據流轉換,一個 StreamOperator 封裝了一個函數接口,好比 map、reduce、keyBy等。下面咱們在看一下StreamOperator的類圖關係函數
能夠看到StreamMap/StreamFlatMap都是operator的子類,下面來看一段具體生成operator和transformation的代碼this
/** * Applies a Map transformation on a {@link DataStream}. The transformation * calls a {@link MapFunction} for each element of the DataStream. Each * MapFunction call returns exactly one element. The user can also extend * {@link RichMapFunction} to gain access to other features provided by the * {@link org.apache.flink.api.common.functions.RichFunction} interface. * * @param mapper * The MapFunction that is called for each element of the * DataStream. * @param <R> * output type * @return The transformed {@link DataStream}. */ public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true); return transform("Map", outType, new StreamMap<>(clean(mapper))); } /** * Method for passing user defined operators along with the type * information that will transform the DataStream. * * @param operatorName * name of the operator, for logging purposes * @param outTypeInfo * the output type of the operator * @param operator * the object containing the transformation logic * @param <R> * type of the return stream * @return the data stream constructed */ @PublicEvolving public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
到這裏基本說完了DataStream和StreamOperator,包含transformation的產生,DataStream的操做等,下一篇咱們在來講一下transformation的轉換3d