本文將從FlatMap概念和如何使用開始入手,深刻到Flink是如何實現FlatMap。但願能讓你們對這個概念有更深刻的理解。html
首先咱們先從概念入手。前端
自從響應式編程慢慢壯大以來,這兩個單詞如今愈來愈被你們熟悉了。前端能見到它們的身影,後臺也能見到;安卓裏面有,iOS也有。不少兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給你們簡單講解下。java
它把數組流
中的每個值,使用所提供的函數執行一遍,一一對應。獲得與元素個數相同的數組流
。而後返回這個新數據流。apache
flat是扁平的意思。因此這個操做是:先映射(map),再拍扁(join)。編程
flatMap輸入多是多個子數組流
。因此flatMap先針對 每一個子數組流
的每一個元素進行映射操做。而後進行扁平化處理,最後聚集全部進行扁平化處理的結果集造成一個新的列表(扁平化簡而言之就是去除全部的修飾)。api
flatMap與map另一個不同的地方就是傳入的函數在處理完後返回值必須是List。數組
好比拿到一個文本文件以後,咱們是按行讀取,按行處理。若是要對每一行的單詞數進行計數,那麼應該選擇Map方法,若是是統計詞頻,就應該選擇flatMap方法。數據結構
若是還不清楚,能夠看看下面這個例子:app
梁山新進一批好馬,準備給每一個馬軍頭領配置一批。因而獲得函數以及頭領名單以下: 函數 = ( 頭領 => 頭領 + 好馬 ) 五虎將 = List(關勝、林沖、秦明、呼延灼、董平 ) 八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 ) // Map函數的例子 利用map函數,咱們能夠獲得 五虎將馬軍 五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 ) 結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 ) // flatMap函數的例子 可是爲了獲得統一的馬軍,則能夠用flatMap: 馬軍頭領 = List(五虎將,八驃騎) 馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 ) 結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )
如今你們應該清楚了吧。接下來看看幾個FlatMap的實例。框架
Scala自己對於List類型就有map和flatMap操做。舉例以下:
val names = List("Alice","James","Apple") val strings = names.map(x => x.toUpperCase) println(strings) // 輸出 List(ALICE, JAMES, APPLE) val chars = names.flatMap(x=> x.toUpperCase()) println(chars) // 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
以上是scala語言層面的實現。下面咱們看看Flink框架是如何使用FlatMap的。
網上常見的一個Flink應用的例子:
//加載數據源 val source = env.fromElements("china is the best country","beijing is the capital of china") //轉化處理數據 val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
case class WordWithCount(word: String, count: Long) val text = env.socketTextStream(host, port, '\n') val windowCounts = text.flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") windowCounts.print()
上面提到的都是簡單的使用,若是有複雜需求,在Flink中,咱們能夠經過繼承FlatMapFunction和RichFlatMapFunction來自定義算子。
FlatMapFunction
對於不涉及到狀態的使用,能夠直接繼承 FlatMapFunction,其定義以下:
@Public @FunctionalInterface public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; }
如何自定義算子呢,這個能夠直接看看Flink中的官方例子
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens. public class Tokenizer implements FlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) { for (String token : value.split("\\W")) { out.collect(token); } } } // [...] DataSet<String> textLines = // [...] DataSet<String> words = textLines.flatMap(new Tokenizer());
RichFlatMapFunction
對於涉及到狀態的狀況,用戶可使用繼承 RichFlatMapFunction 類的方式來實現UDF。
RichFlatMapFunction屬於Flink的Rich函數類。從名稱上來看,這種函數類在普通的函數類上增長了Rich前綴,好比RichMapFunction
、RichFlatMapFunction
或RichReduceFunction
等等。比起普通的函數類,Rich函數類增長了:
open()
方法:Flink在算子調用前會執行這個方法,能夠用來進行一些初始化工做。close()
方法:Flink在算子最後一次調用結束後執行這個方法,能夠用來釋放一些資源。getRuntimeContext
方法:獲取運行時上下文。每一個並行的算子子任務都有一個運行時上下文,上下文記錄了這個算子運行過程當中的一些信息,包括算子當前的並行度、算子子任務序號、廣播數據、累加器、監控數據。最重要的是,咱們能夠從上下文裏獲取狀態數據。FlatMap對應的RichFlatMapFunction以下:
@Public public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> { @Override public abstract void flatMap(IN value, Collector<OUT> out) throws Exception; }
其基類 AbstractRichFunction 以下,能夠看到主要是和運行時上下文創建了聯繫,而且有初始化和退出操做:
@Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private transient RuntimeContext runtimeContext; @Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } @Override public RuntimeContext getRuntimeContext() { return this.runtimeContext; } @Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } } @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {} }
如何最好的使用? 固然仍是官方文檔和例子最靠譜。
由於涉及到狀態,因此若是使用,你必須建立一個 StateDescriptor
,才能獲得對應的狀態句柄。 這保存了狀態名稱(你能夠建立多個狀態,而且它們必須具備惟一的名稱以即可以引用它們),狀態所持有值的類型,而且可能包含用戶指定的函數,例如ReduceFunction
。 根據不一樣的狀態類型,能夠建立ValueStateDescriptor
,ListStateDescriptor
, ReducingStateDescriptor
,FoldingStateDescriptor
或 MapStateDescriptor
。
狀態經過 RuntimeContext
進行訪問,所以只能在 rich functions 中使用。 可是咱們也會看到一個例子。RichFunction
中 RuntimeContext
提供以下方法:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState getAggregatingState(AggregatingStateDescriptor)
FoldingState getFoldingState(FoldingStateDescriptor)
MapState getMapState(MapStateDescriptor)
下面是一個 FlatMapFunction
的例子,展現瞭如何將這些部分組合起來:
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { private var sum: ValueState[(Long, Long)] = _ override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = { // access the state value val tmpCurrentSum = sum.value // If it hasn't been used before, it will be null val currentSum = if (tmpCurrentSum != null) { tmpCurrentSum } else { (0L, 0L) } // update the count val newSum = (currentSum._1 + 1, currentSum._2 + input._2) // update the state sum.update(newSum) // if the count reaches 2, emit the average and clear the state if (newSum._1 >= 2) { out.collect((input._1, newSum._2 / newSum._1)) sum.clear() } } override def open(parameters: Configuration): Unit = { sum = getRuntimeContext.getState( new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) ) } } object ExampleCountWindowAverage extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() // the printed output will be (1,4) and (1,5) env.execute("ExampleManagedState") }
這個例子實現了一個簡單的計數窗口。 咱們把元組的第一個元素看成 key(在示例中都 key 都是 「1」)。 該函數將出現的次數以及總和存儲在 「ValueState」 中。 一旦出現次數達到 2,則將平均值發送到下游,並清除狀態從新開始。 請注意,咱們會爲每一個不一樣的 key(元組中第一個元素)保存一個單獨的值。
FlatMap從Flink編程模型角度講屬於一個算子,用來對數據流或者數據集進行轉換。從框架角度說,FlatMap是怎麼實現的呢? 或者說FlatMap是怎麼從用戶代碼轉換到Flink運行時呢 ?
首先說說 DataSet相關這套系統中FlatMap的實現。
請注意,DataSteam對應的那套系統中,operator名字都是帶着Stream的,好比StreamOperator。
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
這段代碼調用的就是DataSet中的API。具體以下:
public abstract class DataSet<T> { public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) { String callLocation = Utils.getCallLocationName(); TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation); } }
能夠看出,flatMap @ DataSet
主要就是生成了一個FlatMapOperator,這個能夠理解爲是邏輯算子。其定義以下:
public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> { protected final FlatMapFunction<IN, OUT> function; protected final String defaultName; public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) { super(input, resultType); this.function = function; this.defaultName = defaultName; } @Override protected FlatMapFunction<IN, OUT> getFunction() { return function; } // 這個translateToDataFlow就是生成計劃(Plan)的關鍵代碼 @Override protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "FlatMap at " + defaultName; // create operator FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); // set input po.setInput(input); // set parallelism if (this.getParallelism() > 0) { // use specified parallelism po.setParallelism(this.getParallelism()); } else { // if no parallelism has been specified, use parallelism of input operator to enable chaining po.setParallelism(input.getParallelism()); } return po; } }
FlatMapOperator的基類以下:
public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> { } // Base class for operations that operates on a single input data set. public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> { private final DataSet<IN> input; }
DataSet API所編寫的批處理程序跟DataStream API所編寫的流處理程序在生成做業圖(JobGraph)以前的實現差異很大。流處理程序是生成流圖(StreamGraph),而批處理程序是生成計劃(Plan)並由優化器對其進行優化並生成優化後的計劃(OptimizedPlan)。
計劃(Plan)以數據流(dataflow)的形式來表示批處理程序,但它只是批處理程序最初的表示,在一個批處理程序生成做業圖以前,計劃還會被進行優化以產生更高效的方案。Plan不一樣於流圖(StreamGraph),它以sink爲入口,由於一個批處理程序可能存在若干個sink,因此Plan採用集合來存儲它。另外Plan還封裝了批處理做業的一些基本屬性:jobId、jobName以及defaultParallelism等。
生成Plan的核心部件是算子翻譯器(OperatorTranslation),createProgramPlan方法經過它來」翻譯「出計劃,核心代碼以下
public class OperatorTranslation { // 接收每一個需遍歷的DataSink對象,而後將其轉換成GenericDataSinkBase對象 public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) { List<GenericDataSinkBase<?>> planSinks = new ArrayList<>(); //遍歷sinks集合 for (DataSink<?> sink : sinks) { //將翻譯生成的GenericDataSinkBase加入planSinks集合*,對每一個sink進行」翻譯「 planSinks.add(translate(sink)); } //以planSins集合構建Plan對象 Plan p = new Plan(planSinks); p.setJobName(jobName); return p; } private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) { //會調用到 FlatMapOperator 的 translateToDataFlow org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input); } }
FlatMapOperatorBase就是生成的plan中的一員。
public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { @Override protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, parameters); ArrayList<OUT> result = new ArrayList<OUT>(input.size()); TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer); for (IN element : input) { IN inCopy = inSerializer.copy(element); function.flatMap(inCopy, resultCollector); } FunctionUtils.closeFunction(function); return result; } }
而最後優化時候,則FlatMapOperatorBase會被優化成FlatMapNode。
public class GraphCreatingVisitor implements Visitor<Operator<?>> { public boolean preVisit(Operator<?> c) { else if (c instanceof FlatMapOperatorBase) { n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); } } }
自此,FlatMap就被組合到 DataSet的 OptimizedPlan 中。下一步Flink會依據OptimizedPlan來生成 JobGraph。
做業圖(JobGraph)是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。
在運行狀態下,若是上游有數據流入,則FlatMap這個算子就會發揮做用。
對於DataStream,則是另一套體系結構。首先咱們找一個使用DataStream的例子看看。
//獲取數據: 從socket中獲取 val textDataStream = env.socketTextStream("127.0.0.1", 8888, '\n') val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1)) //groupby: 按照指定的字段聚合 val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1)) windowDstram.sum("count").print()
上面例子中,flatMap 調用的是DataStream中的API,具體以下:
public class DataStream<T> { public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { //clean函數用來移除FlatMapFunction類對象的外部類部分,這樣就能夠進行序列化 //getType用來獲取類對象的輸出類型 TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); return flatMap(flatMapper, outType); } // 構建了一個StreamFlatMap的Operator public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) { return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper))); } // 依次調用下去 @PublicEvolving public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator)); } protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // 構建Transform對象,Transform對象中包含其上游Transform對象,這樣上游下游就串成了一個Transform鏈。 OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); // 將這Transform對象放入env的transform對象列表。 getExecutionEnvironment().addOperator(resultTransform); // 返回流 return returnStream; } }
上面源碼中的幾個概念須要澄清。
Transformation:首先,FlatMap在FLink編程模型中是算子API,在DataStream中會生成一個Transformation,即邏輯算子。
邏輯算子Transformation最後會對應到物理算子Operator,這個概念對應的就是StreamOperator。
StreamOperator:DataStream 上的每個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。
processElement()
方法也是UDF的邏輯被調用的地方,例如FlatMapFunction
裏的flatMap()
方法。
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private transient TimestampedCollector<OUT> collector; public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); // 調用用戶定義的FlatMap userFunction.flatMap(element.getValue(), collector); } }
咱們能夠看到,StreamFlatMap繼承了AbstractUdfStreamOperator,從而間接繼承了StreamOperator。
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable { }
StreamOperator是根接口。對於 Streaming 來講全部的算子都繼承自 StreamOperator。繼承了StreamOperator的擴展接口則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractUdfStreamOperator。
從 API 到 邏輯算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink會依據StreamOperator來生成 JobGraph。
做業圖(JobGraph)是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。
【Flink】Flink基礎之實現WordCount程序(Java與Scala版本)