[源碼分析] 從FlatMap用法到Flink的內部實現

[源碼分析] 從FlatMap用法到Flink的內部實現

0x00 摘要

本文將從FlatMap概念和如何使用開始入手,深刻到Flink是如何實現FlatMap。但願能讓你們對這個概念有更深刻的理解。html

0x01 Map vs FlatMap

首先咱們先從概念入手。前端

自從響應式編程慢慢壯大以來,這兩個單詞如今愈來愈被你們熟悉了。前端能見到它們的身影,後臺也能見到;安卓裏面有,iOS也有。不少兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給你們簡單講解下。java

map

它把數組流中的每個值,使用所提供的函數執行一遍,一一對應。獲得與元素個數相同的數組流。而後返回這個新數據流。apache

flatMap

flat是扁平的意思。因此這個操做是:先映射(map),再拍扁(join)。編程

flatMap輸入多是多個子數組流。因此flatMap先針對 每一個子數組流的每一個元素進行映射操做。而後進行扁平化處理,最後聚集全部進行扁平化處理的結果集造成一個新的列表(扁平化簡而言之就是去除全部的修飾)。api

flatMap與map另一個不同的地方就是傳入的函數在處理完後返回值必須是List。數組

實例

好比拿到一個文本文件以後,咱們是按行讀取,按行處理。若是要對每一行的單詞數進行計數,那麼應該選擇Map方法,若是是統計詞頻,就應該選擇flatMap方法。數據結構

若是還不清楚,能夠看看下面這個例子:app

梁山新進一批好馬,準備給每一個馬軍頭領配置一批。因而獲得函數以及頭領名單以下:

函數 = ( 頭領 => 頭領 + 好馬 )
五虎將 = List(關勝、林沖、秦明、呼延灼、董平 )
八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 )

// Map函數的例子
利用map函數,咱們能夠獲得 五虎將馬軍

五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 )
結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 )

// flatMap函數的例子
可是爲了獲得統一的馬軍,則能夠用flatMap:

馬軍頭領 = List(五虎將,八驃騎)
馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 ) 

結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )

如今你們應該清楚了吧。接下來看看幾個FlatMap的實例。框架

Scala語言的實現

Scala自己對於List類型就有map和flatMap操做。舉例以下:

val names = List("Alice","James","Apple")
val strings = names.map(x => x.toUpperCase)
println(strings)
// 輸出 List(ALICE, JAMES, APPLE)

val chars = names.flatMap(x=> x.toUpperCase())
println(chars)
// 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)

Flink的例子

以上是scala語言層面的實現。下面咱們看看Flink框架是如何使用FlatMap的。

網上常見的一個Flink應用的例子:

//加載數據源
val source = env.fromElements("china is the best country","beijing is the capital of china")

//轉化處理數據
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)

Flink源碼中的例子

case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

windowCounts.print()

上面提到的都是簡單的使用,若是有複雜需求,在Flink中,咱們能夠經過繼承FlatMapFunction和RichFlatMapFunction來自定義算子。

函數類FlatMapFunction

對於不涉及到狀態的使用,能夠直接繼承 FlatMapFunction,其定義以下:

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
	void flatMap(T value, Collector<O> out) throws Exception;
}

如何自定義算子呢,這個能夠直接看看Flink中的官方例子

// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
  @Override
  public void flatMap(String value, Collector<String> out) {
    for (String token : value.split("\\W")) {
      out.collect(token);
    }
  }
}

// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());

Rich函數類RichFlatMapFunction

對於涉及到狀態的狀況,用戶可使用繼承 RichFlatMapFunction 類的方式來實現UDF。

RichFlatMapFunction屬於Flink的Rich函數類。從名稱上來看,這種函數類在普通的函數類上增長了Rich前綴,好比RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函數類,Rich函數類增長了:

  • open()方法:Flink在算子調用前會執行這個方法,能夠用來進行一些初始化工做。
  • close()方法:Flink在算子最後一次調用結束後執行這個方法,能夠用來釋放一些資源。
  • getRuntimeContext方法:獲取運行時上下文。每一個並行的算子子任務都有一個運行時上下文,上下文記錄了這個算子運行過程當中的一些信息,包括算子當前的並行度、算子子任務序號、廣播數據、累加器、監控數據。最重要的是,咱們能夠從上下文裏獲取狀態數據

FlatMap對應的RichFlatMapFunction以下:

@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
	@Override
	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
}

其基類 AbstractRichFunction 以下,能夠看到主要是和運行時上下文創建了聯繫,而且有初始化和退出操做

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
  
	private transient RuntimeContext runtimeContext;

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		this.runtimeContext = t;
	}

	@Override
	public RuntimeContext getRuntimeContext() {
			return this.runtimeContext;
	}

	@Override
	public IterationRuntimeContext getIterationRuntimeContext() {
    if (this.runtimeContext instanceof IterationRuntimeContext) {
			return (IterationRuntimeContext) this.runtimeContext;
		} 
	}

	@Override
	public void open(Configuration parameters) throws Exception {}

	@Override
	public void close() throws Exception {}
}

如何最好的使用? 固然仍是官方文檔和例子最靠譜。

由於涉及到狀態,因此若是使用,你必須建立一個 StateDescriptor,才能獲得對應的狀態句柄。 這保存了狀態名稱(你能夠建立多個狀態,而且它們必須具備惟一的名稱以即可以引用它們),狀態所持有值的類型,而且可能包含用戶指定的函數,例如ReduceFunction。 根據不一樣的狀態類型,能夠建立ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

狀態經過 RuntimeContext 進行訪問,所以只能在 rich functions 中使用。 可是咱們也會看到一個例子。RichFunctionRuntimeContext 提供以下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

下面是一個 FlatMapFunction 的例子,展現瞭如何將這些部分組合起來:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

這個例子實現了一個簡單的計數窗口。 咱們把元組的第一個元素看成 key(在示例中都 key 都是 「1」)。 該函數將出現的次數以及總和存儲在 「ValueState」 中。 一旦出現次數達到 2,則將平均值發送到下游,並清除狀態從新開始。 請注意,咱們會爲每一個不一樣的 key(元組中第一個元素)保存一個單獨的值。

0x03 從Flink源碼入手看FlatMap實現

FlatMap從Flink編程模型角度講屬於一個算子,用來對數據流或者數據集進行轉換。從框架角度說,FlatMap是怎麼實現的呢? 或者說FlatMap是怎麼從用戶代碼轉換到Flink運行時呢 ?

1. DataSet

首先說說 DataSet相關這套系統中FlatMap的實現。

請注意,DataSteam對應的那套系統中,operator名字都是帶着Stream的,好比StreamOperator。

DataSet

val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 這段代碼調用的就是DataSet中的API。具體以下:

public abstract class DataSet<T> {
  
	public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
    
		String callLocation = Utils.getCallLocationName();
    
		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
		return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
	}
}

FlatMapOperator

能夠看出,flatMap @ DataSet 主要就是生成了一個FlatMapOperator,這個能夠理解爲是邏輯算子。其定義以下:

public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {

	protected final FlatMapFunction<IN, OUT> function;
	protected final String defaultName;

	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
		super(input, resultType);
		this.function = function;
		this.defaultName = defaultName;
	}

	@Override
	protected FlatMapFunction<IN, OUT> getFunction() {
		return function;
	}

  // 這個translateToDataFlow就是生成計劃(Plan)的關鍵代碼
	@Override
	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
		// create operator
		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,
			new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
		// set input
		po.setInput(input);
		// set parallelism
		if (this.getParallelism() > 0) {
			// use specified parallelism
			po.setParallelism(this.getParallelism());
		} else {
			// if no parallelism has been specified, use parallelism of input operator to enable chaining
			po.setParallelism(input.getParallelism());
		}
		return po;
	}
}

FlatMapOperator的基類以下:

public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {

}

// Base class for operations that operates on a single input data set.
public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> {
  	private final DataSet<IN> input;
}

生成計劃

DataSet API所編寫的批處理程序跟DataStream API所編寫的流處理程序在生成做業圖(JobGraph)以前的實現差異很大。流處理程序是生成流圖(StreamGraph),而批處理程序是生成計劃(Plan)並由優化器對其進行優化並生成優化後的計劃(OptimizedPlan)。

計劃(Plan)以數據流(dataflow)的形式來表示批處理程序,但它只是批處理程序最初的表示,在一個批處理程序生成做業圖以前,計劃還會被進行優化以產生更高效的方案。Plan不一樣於流圖(StreamGraph),它以sink爲入口,由於一個批處理程序可能存在若干個sink,因此Plan採用集合來存儲它。另外Plan還封裝了批處理做業的一些基本屬性:jobId、jobName以及defaultParallelism等。

生成Plan的核心部件是算子翻譯器(OperatorTranslation),createProgramPlan方法經過它來」翻譯「出計劃,核心代碼以下

public class OperatorTranslation {
  
   // 接收每一個需遍歷的DataSink對象,而後將其轉換成GenericDataSinkBase對象
   public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
       List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
       //遍歷sinks集合
       for (DataSink<?> sink : sinks) {
            //將翻譯生成的GenericDataSinkBase加入planSinks集合*,對每一個sink進行」翻譯「
            planSinks.add(translate(sink));
        }
       //以planSins集合構建Plan對象
       Plan p = new Plan(planSinks);
       p.setJobName(jobName);
       return p;
    }

	private <I, O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
    //會調用到 FlatMapOperator 的 translateToDataFlow
	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);    
  }
  
}

FlatMapOperatorBase就是生成的plan中的一員。

public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
	@Override
	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
		
		FunctionUtils.setFunctionRuntimeContext(function, ctx);
		FunctionUtils.openFunction(function, parameters);

		ArrayList<OUT> result = new ArrayList<OUT>(input.size());

		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);

		for (IN element : input) {
			IN inCopy = inSerializer.copy(element);
			function.flatMap(inCopy, resultCollector);
		}

		FunctionUtils.closeFunction(function);

		return result;
	}
}

而最後優化時候,則FlatMapOperatorBase會被優化成FlatMapNode。

public class GraphCreatingVisitor implements Visitor<Operator<?>> {
	public boolean preVisit(Operator<?> c) {
    else if (c instanceof FlatMapOperatorBase) {
			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
		}
  }
}

自此,FlatMap就被組合到 DataSet的 OptimizedPlan 中。下一步Flink會依據OptimizedPlan來生成 JobGraph。

做業圖(JobGraph)是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。

在運行狀態下,若是上游有數據流入,則FlatMap這個算子就會發揮做用。

2. DataStream

對於DataStream,則是另一套體系結構。首先咱們找一個使用DataStream的例子看看。

//獲取數據: 從socket中獲取
val textDataStream = env.socketTextStream("127.0.0.1", 8888, '\n')
val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))

//groupby: 按照指定的字段聚合
val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))
windowDstram.sum("count").print()

上面例子中,flatMap 調用的是DataStream中的API,具體以下:

public class DataStream<T> {
  
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    //clean函數用來移除FlatMapFunction類對象的外部類部分,這樣就能夠進行序列化
    //getType用來獲取類對象的輸出類型
		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
				getType(), Utils.getCallLocationName(), true);
		return flatMap(flatMapper, outType);
	}
  
  // 構建了一個StreamFlatMap的Operator
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
	}  
  
  // 依次調用下去
	@PublicEvolving
	public <R> SingleOutputStreamOperator<R> transform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			OneInputStreamOperator<T, R> operator) {
		return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
	}
  
	protected <R> SingleOutputStreamOperator<R> doTransform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			StreamOperatorFactory<R> operatorFactory) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
    // 構建Transform對象,Transform對象中包含其上游Transform對象,這樣上游下游就串成了一個Transform鏈。
		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
				this.transformation,
				operatorName,
				operatorFactory,
				outTypeInfo,
				environment.getParallelism());
		@SuppressWarnings({"unchecked", "rawtypes"})
		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    // 將這Transform對象放入env的transform對象列表。
		getExecutionEnvironment().addOperator(resultTransform);
    // 返回流
		return returnStream;
	}  
}

上面源碼中的幾個概念須要澄清。

Transformation:首先,FlatMap在FLink編程模型中是算子API,在DataStream中會生成一個Transformation,即邏輯算子。

邏輯算子Transformation最後會對應到物理算子Operator,這個概念對應的就是StreamOperator

StreamOperator:DataStream 上的每個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。

processElement()方法也是UDF的邏輯被調用的地方,例如FlatMapFunction裏的flatMap()方法。

public class StreamFlatMap<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private transient TimestampedCollector<OUT> collector;

	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
		super(flatMapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		collector = new TimestampedCollector<>(output);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		collector.setTimestamp(element);
    // 調用用戶定義的FlatMap
		userFunction.flatMap(element.getValue(), collector);
	}
}

咱們能夠看到,StreamFlatMap繼承了AbstractUdfStreamOperator,從而間接繼承了StreamOperator。

public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
}

StreamOperator是根接口。對於 Streaming 來講全部的算子都繼承自 StreamOperator。繼承了StreamOperator的擴展接口則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractUdfStreamOperator。

從 API 到 邏輯算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink會依據StreamOperator來生成 JobGraph。

做業圖(JobGraph)是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。

0x04 參考

Flink中richfunction的一點小做用

【淺顯易懂】scala中map與flatMap的區別

Working with State

flink簡單應用: scala編寫wordcount

【Flink】Flink基礎之實現WordCount程序(Java與Scala版本)

Flink進階教程:以flatMap爲例,如何進行算子自定義

Flink運行時之批處理程序生成計劃

相關文章
相關標籤/搜索