[源碼解析] Flink的groupBy和reduce究竟作了什麼

[源碼解析] Flink的groupBy和reduce究竟作了什麼

0x00 摘要

Groupby和reduce是大數據領域常見的算子,可是不少同窗應該對其背後機制不甚瞭解。本文將從源碼入手,爲你們解析Flink中Groupby和reduce的原理,看看他們在背後作了什麼。java

0x01 問題和歸納

1.1 問題

探究的緣由是想到了幾個問題 :算法

  • groupby的算子會對數據進行排序嘛。
  • groupby和reduce過程當中究竟有幾回排序。
  • 若是有多個groupby task,什麼機制保證全部這些grouby task的輸出中,一樣的key都分配給同一個reducer。
  • groupby和reduce時候,有沒有Rebalance 從新分配。
  • reduce算子會不會從新劃分task。
  • reduce算子有沒有可能和先後的其餘算子組成Operator Chain。

1.2 歸納

爲了便於你們理解,咱們先總結下,對於一個Groupby + Reduce的操做,Flink作了以下處理:apache

  • Group其實沒有真實對應的算子,它只是在在reduce過程以前的一箇中間步驟或者輔助步驟。
  • 在Flink生成批處理執行計劃後,有意義的結果是Reduce算子。
  • 爲了更好的reduce,Flink在reduce以前大量使用了Combine操做。Combine能夠理解爲是在map端的reduce的操做,對單個map任務的輸出結果數據進行合併的操做。
  • 在Flink生成批處理優化計劃(Optimized Plan)以後,會把reduce分割成兩段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
  • SORTED_PARTIAL_REDUCE就是Combine。
  • Flink生成JobGraph以後,Flink造成了一個Operator Chain:Reduce(SORTED_PARTIAL_REDUCE)和其上游合併在一塊兒。
  • Flink用Partitioner來保證多個 grouby task 的輸出中一樣的key都分配給同一個reducer。
  • groupby和reduce過程當中至少有三次排序:
    • combine
    • sort + merge
    • reduce

這樣以前的疑問就基本獲得瞭解釋。編程

0x02 背景概念

2.1 MapReduce細分

MapReduce是一種編程模型,用於大規模數據集的並行運算。概念 "Map(映射)"和"Reduce(歸約)" 是它們的主要思想,其是從函數式編程語言,矢量編程語言裏借來的特性。api

咱們目前使用的Flink,Spark都出自於MapReduce,因此咱們有必有追根溯源,看看MapReduce是如何區分各個階段的。網絡

2.2 MapReduce細分

若是把MapReduce細分,能夠分爲一下幾大過程:數據結構

  • Input-Split(輸入分片):此過程是將從HDFS上讀取的文件分片,而後送給Map端。有多少分片就有多少Mapper,通常分片的大小和HDFS中的塊大小一致。
  • Shuffle-Spill(溢寫):每一個Map任務都有一個環形緩衝區。一旦緩衝區達到閾值80%,一個後臺線程便開始把內容「溢寫」-「spill」到磁盤。在溢寫過程當中,map將繼續輸出到剩餘的20%空間中,互不影響,若是緩衝區被填滿map會被堵塞直到寫磁盤完成。
  • Shuffle-Partition(分區):因爲每一個Map可能處理的數據量不一樣,因此到達reduce有可能會致使數據傾斜。分區能夠幫助咱們解決這一問題,在shuffle過程當中會按照默認key的哈希碼對分區數量取餘,reduce便根據分區號來拉取對應的數據,達到數據均衡。分區數量對應Reduce個數。
  • Shuffle-Sort(排序):在分區後,會對此分區的數據進行內排序,排序過程會穿插在整個MapReduce中,在不少地方都存在。
  • Shuffle-Group(分組):分組過程會把key相同的value分配到一個組中,wordcount程序就利用了分組這一過程。
  • Shuffle-Combiner(組合):這一過程咱們能夠理解爲一個小的Reduce階段,當數據量大的時候能夠在map過程當中執行一次combine,這樣就至關於在map階段執行了一次reduce。因爲reduce和map在不一樣的節點上運行,因此reduce須要遠程拉取數據,combine就能夠有效下降reduce拉取數據的量,減小網絡負荷(這一過程默認是不開啓的,在如求平均值的mapreduce程序中不要使用combine,由於會影響結果)。
  • Compress(壓縮):在緩衝區溢寫磁盤的時候,能夠對數據進行壓縮,節約磁盤空間,一樣減小給reducer傳遞的數據量。
  • Reduce-Merge(合併):reduce端會拉取各個map輸出結果對應的分區文件,這樣reduce端就會有不少文件,因此在此階段,reduce再次將它們合併/排序再送入reduce執行。
  • Output(輸出):在reduce階段,對已排序輸出中的每一個鍵調用reduce函數。此階段的輸出直接寫到輸出文件系統,通常爲HDFS。

2.3 Combine

Combine是咱們須要特殊注意的。在mapreduce中,map多,reduce少。在reduce中因爲數據量比較多,因此咱們乾脆在map階段中先把本身map裏面的數據歸類,這樣到了reduce的時候就減輕了壓力。app

Combine能夠理解爲是在map端的reduce的操做,對單個map任務的輸出結果數據進行合併的操做。combine是對一個map的,而reduce合併的對象是對於多個map框架

map函數操做所產生的鍵值對會做爲combine函數的輸入,經combine函數處理後再送到reduce函數進行處理,減小了寫入磁盤的數據量,同時也減小了網絡中鍵值對的傳輸量。在Map端,用戶自定義實現的Combine優化機制類Combiner在執行Map端任務的節點自己運行,至關於對map函數的輸出作了一次reduce。編程語言

集羣上的可用帶寬每每是有限的,產生的中間臨時數據量很大時就會出現性能瓶頸,所以應該儘可能避免Map端任務和Reduce端任務之間大量的數據傳輸。使用Combine機制的意義就在於使Map端輸出更緊湊,使得寫到本地磁盤和傳給Reduce端的數據更少。

2.4 Partition

Partition是分割map每一個節點的結果,按照key分別映射給不一樣的reduce,mapreduce使用哈希HashPartitioner幫咱們歸類了。這個咱們也能夠自定義。

這裏其實能夠理解歸類。咱們對於錯綜複雜的數據歸類。好比在動物園裏有牛羊雞鴨鵝,他們都是混在一塊兒的,可是到了晚上他們就各自牛回牛棚,羊回羊圈,雞回雞窩。partition的做用就是把這些數據歸類。只不過是在寫程序的時候,

在通過mapper的運行後,咱們得知mapper的輸出是這樣一個key/value對: key是「aaa」, value是數值1。由於當前map端只作加1的操做,在reduce task裏纔去合併結果集。假如咱們知道這個job有3個reduce task,到底當前的「aaa」應該交由哪一個reduce task去作呢,是須要馬上決定的。

MapReduce提供Partitioner接口,它的做用就是根據key或value及reduce task的數量來決定當前的這對輸出數據最終應該交由哪一個reduce task處理。默認對key hash後再以reduce task數量取模。默認的取模方式只是爲了平均reduce的處理能力,若是用戶本身對Partitioner有需求,能夠訂製並設置到job上。

在咱們的例子中,假定 「aaa」通過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。

2.5 Shuffle

shuffle就是map和reduce之間的過程,包含了兩端的combine和partition。它比較難以理解,由於咱們摸不着,看不到它。它屬於mapreduce的框架,編程的時候,咱們用不到它。

Shuffle的大體範圍就是:怎樣把map task的輸出結果有效地傳送到reduce端。也能夠這樣理解, Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。

2.6 Reducer

簡單地說,reduce task在執行以前的工做就是不斷地拉取當前job裏每一個map task的最終結果,而後對從不一樣地方拉取過來的數據不斷地作merge,最終造成一個文件做爲reduce task的輸入文件。

0x03 代碼

咱們以Flink的KMeans算法做爲樣例,具體摘要以下:

public class WordCountExampleReduce {

    DataStream ds;

    public static void main(String[] args) throws Exception {
        //構建環境
        final ExecutionEnvironment env =
                ExecutionEnvironment.getExecutionEnvironment();
        //經過字符串構建數據集
        DataSet<String> text = env.fromElements(
                "Who‘s there?",
                "I think I hear them. Stand, ho! Who‘s there?");
        //分割字符串、按照key進行分組、統計相同的key個數
        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
                                          Tuple2<String, Integer> value2) throws Exception {
                        return new Tuple2(value1.f0, value1.f1 + value2.f1);
                    }
                });
        //打印
        wordCounts.print();
    }
    //分割字符串的方法
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

輸出是:

(hear,1)
(ho!,1)
(them.,1)
(I,2)
(Stand,,1)
(Who‘s,2)
(there?,2)
(think,1)

首先,咱們從Flink基本JAVA API來入手開始挖掘。

4.1 GroupBy是個輔助概念

4.1.1 Grouping

咱們須要留意的是:GroupBy並無對應的Operator。GroupBy只是生成DataSet轉換的一箇中間步驟或者輔助步驟

GroupBy功能的基類是Grouping,其只是DataSet轉換的一箇中間步驟。其幾個主要成員是:

  • 對應的輸入數據DataSet
  • 分組所基於的keys
  • 用戶自定義的Partitioner
// Grouping is an intermediate step for a transformation on a grouped DataSet.
public abstract class Grouping<T> {
   protected final DataSet<T> inputDataSet;
   protected final Keys<T> keys;
   protected Partitioner<?> customPartitioner;
}

Grouping並無任何業務相關的API,具體API都是在其派生類中,好比UnsortedGrouping。

4.1.2 UnsortedGrouping

咱們代碼中對應的就是UnsortedGrouping類。咱們看到它提供了不少業務API,好比:sum,max,min,reduce,aggregate,reduceGroup,combineGroup.....

回到咱們的示例,groupBy作了以下操做

  • 首先,groupBy返回的就是一個UnsortedGrouping,這個UnsortedGrouping是用來轉換DataSet。
  • 其次,.groupBy(0).reduce(new CentroidAccumulator()) 返回的是ReduceOperator。這就對應了前面咱們提到的,groupBy只是中間步驟,reduce才能返回一個Operator
public class UnsortedGrouping<T> extends Grouping<T> {
  
    // groupBy返回一個UnsortedGrouping
    public UnsortedGrouping<T> groupBy(int... fields) {
       return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
    }
  
    // reduce返回一個ReduceOperator
 		public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
      return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName());
    } 
}

4.2 reduce纔是算子

對於業務來講,reduce纔是真正有意義的邏輯算子。

從前文的函數調用和ReduceOperator定義能夠看出,.groupBy(0).reduce() 的調用結果是生成一個ReduceOperator,而 UnsortedGrouping 被設置爲 ReduceOperator 的 grouper 成員變量,做爲輔助操做

public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
  
	private final ReduceFunction<IN> function;
	private final Grouping<IN> grouper; // UnsortedGrouping被設置在這裏,後續reduce操做中會用到。

	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, 
                        String defaultName) {
		this.function = function;
		this.grouper = input; // UnsortedGrouping被設置在這裏,後續reduce操做中會用到。
    this.hint = CombineHint.OPTIMIZER_CHOOSES; // 優化時候會用到。
	}
}

讓咱們順着Flink程序執行階段繼續看看系統都作了些什麼。

0x05 批處理執行計劃(Plan)

程序執行的第一步是:當程序運行時候,首先會根據java API的結果來生成執行plan。

public JobClient executeAsync(String jobName) throws Exception {
   final Plan plan = createProgramPlan(jobName);
}

其中重要的函數是translateToDataFlow,由於在translateToDataFlow方法中,會從批處理Java API模塊中operators包往核心模塊中operators包的轉換

對於咱們的示例程序,在生成 Graph時,translateToDataFlow會生成一個 SingleInputOperator,爲後續runtime使用。下面是代碼縮減版。

protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
    
    ......
      
    // UnsortedGrouping中的keys被取出,  
		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {

			// reduce with field positions
			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
					new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);

			po.setCustomPartitioner(grouper.getCustomPartitioner());
			po.setInput(input);
			po.setParallelism(getParallelism()); // 沒有並行度的變化

			return po;//translateToDataFlow會生成一個 SingleInputOperator,爲後續runtime使用
		}	    
  }  
}

咱們代碼最終生成的執行計劃以下,咱們能夠看出來,執行計劃基本符合咱們的估計:簡單的從輸入到輸出。中間有意義的算子其實只有Reduce

GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase

具體在代碼中體現以下是:

plan = {Plan@1296} 
 sinks = {ArrayList@1309}  size = 1
  0 = {GenericDataSinkBase@1313} "collect()"
   formatWrapper = {UserCodeObjectWrapper@1315} 
   input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)"
    hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES"
    customPartitioner = null
    input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)"
     input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"

0x06 批處理優化計劃(Optimized Plan)

程序執行的第二步是:Flink對於Plan會繼續優化,生成Optimized Plan。其核心代碼位於PlanTranslator.compilePlan 函數,這裏獲得了Optimized Plan。

這個編譯的過程不做任何決策與假設,也就是說做業最終如何被執行早已被優化器肯定,而編譯也是在此基礎上作肯定性的映射。因此咱們將集中精力看如何優化plan。

private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
   Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
   OptimizedPlan optimizedPlan = optimizer.compile(plan);

   JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
   return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
}

在內部調用plan的accept方法遍歷它。accept會挨個在每一個sink上調用accept。對於每一個sink會先preVisit,而後 postVisit。

這裏優化時候有幾個注意點:

  1. 在 GraphCreatingVisitor.preVisit 中,當發現Operator是 ReduceOperatorBase 類型的時候,會創建ReduceNode。

    else if (c instanceof ReduceOperatorBase) {
       n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
    }
  2. ReduceNode是Reducer Operator的Optimizer表示。

    public class ReduceNode extends SingleInputNode {
    	private final List<OperatorDescriptorSingle> possibleProperties;	
    	private ReduceNode preReduceUtilityNode;
    }
  3. 生成ReduceNode時候,會根據以前提到的 hint 來決定 combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;

    public ReduceNode(ReduceOperatorBase<?, ?> operator) {
    			DriverStrategy combinerStrategy;
    			switch(operator.getCombineHint()) {
    				case OPTIMIZER_CHOOSES:
    					combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
    					break;
          }  
    }

生成的優化執行計劃以下,咱們能夠看到,這時候設置了並行度,也把reduce分割成兩段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE

Data Source  ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE)   ——> Reduce(SORTED_REDUCE)  ——> Data Sink

具體在代碼中體現以下是:

optimizedPlan = {OptimizedPlan@1506} 
 
 allNodes = {HashSet@1510}  size = 5
   
  0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

0x07 JobGraph

程序執行的第三步是:創建JobGraph。LocalExecutor.execute中會生成JobGraph。Optimized Plan 通過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。

主要的優化爲,將多個符合條件的節點 chain 在一塊兒做爲一個節點,這樣能夠減小數據在節點之間流動所須要的序列化/反序列化/傳輸消耗。

JobGraph是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
   final JobGraph jobGraph = getJobGraph(pipeline, configuration);
}

咱們能夠看出來,這一步造成了一個Operator Chain:

CHAIN DataSource -> FlatMap -> Combine (Reduce)

因而咱們看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合併在一塊兒

具體在程序中打印出來:

jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)"
 taskVertices = {LinkedHashMap@1742}  size = 3
  
  {JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)"
  
  {JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)"
  
  {JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"

0x08 Runtime

Job提交以後,就是程序正式運行了。這裏實際上涉及到了三次排序,

  • 一次是在FlatMap發送時候調用到了ChainedReduceCombineDriver.sortAndCombine。這部分對應了咱們以前提到的MapReduce中的Combine和Partition。
  • 一次是在 ReduceDriver 所在的 BatchTask中,由UnilateralSortMerger完成了sort & merge操做。
  • 一次是在ReduceDriver,這裏作了最後的reducer排序。

8.1 FlatMap

這裏是第一次排序

當一批數據處理完成以後,在ChainedFlatMapDriver中調用到close函數進行發送數據給下游。

public void close() {
   this.outputCollector.close();
}

Operator Chain會調用到ChainedReduceCombineDriver.close

public void close() {
   // send the final batch
   try {
      switch (strategy) {
         case SORTED_PARTIAL_REDUCE:
            sortAndCombine(); // 咱們是在這裏
            break;
         case HASHED_PARTIAL_REDUCE:
            reduceFacade.emit();
            break;
      }
   } catch (Exception ex2) {
      throw new ExceptionInChainedStubException(taskName, ex2);
   }

   outputCollector.close();
   dispose(false);
}

8.1.1 Combine

sortAndCombine中先排序,而後作combine,最後會不斷髮送數據

private void sortAndCombine() throws Exception {
   final InMemorySorter<T> sorter = this.sorter;

   if (!sorter.isEmpty()) {
      sortAlgo.sort(sorter); // 這裏會先排序

      final TypeSerializer<T> serializer = this.serializer;
      final TypeComparator<T> comparator = this.comparator;
      final ReduceFunction<T> function = this.reducer;
      final Collector<T> output = this.outputCollector;
      final MutableObjectIterator<T> input = sorter.getIterator();

      if (objectReuseEnabled) {
        ......
      } else {
         T value = input.next();

         // 這裏就是combine
         // iterate over key groups
         while (running && value != null) {
            comparator.setReference(value);
            T res = value;

            // iterate within a key group
            while ((value = input.next()) != null) {
               if (comparator.equalToReference(value)) {
                  // same group, reduce
                  res = function.reduce(res, value);
               } else {
                  // new key group
                  break;
               }
            }

            output.collect(res); //發送數據
         }
      }
   }
}

8.1.2 Partition

最後發送給哪一個下游,是由OutputEmitter.selectChannel決定的。有以下幾種決定方式:

hash-partitioning, broadcasting, round-robin, custom partition functions。這裏採用的是PARTITION_HASH。

每一個task都會把一樣字符串統計結果發送給一樣的下游ReduceDriver。這就保證了下游Reducer必定不會出現統計出錯。

public final int selectChannel(SerializationDelegate<T> record) {
   switch (strategy) {
   ...
   case PARTITION_HASH:
      return hashPartitionDefault(record.getInstance(), numberOfChannels);
   ...
   }
}

private int hashPartitionDefault(T record, int numberOfChannels) {
	int hash = this.comparator.hash(record);
	return MathUtils.murmurHash(hash) % numberOfChannels;
}

具體調用棧:

hash:50, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hash:30, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hashPartitionDefault:187, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:147, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
sortAndCombine:254, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:266, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:40, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
close:88, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
invoke:215, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

8.2 UnilateralSortMerger

這裏是第二次排序

在 BatchTask中,會先Sort, Merge輸入,而後纔會交由Reduce來具體完成過。sort & merge操做具體是在UnilateralSortMerger類中完成的。

getIterator:646, UnilateralSortMerger (org.apache.flink.runtime.operators.sort)
getInput:1110, BatchTask (org.apache.flink.runtime.operators)
prepare:95, ReduceDriver (org.apache.flink.runtime.operators)
run:474, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

UnilateralSortMerger是一個full fledged sorter,它實現了一個多路merge sort。其內部的邏輯被劃分到三個線程上(read, sort, spill),這三個線程彼此之間經過一系列blocking queues來構成了一個閉環。

其內存經過MemoryManager分配,因此這個組件不會超過給其分配的內存。

該類主要變量摘錄以下:

public class UnilateralSortMerger<E> implements Sorter<E> {
	// ------------------------------------------------------------------------
	//                                  Threads
	// ------------------------------------------------------------------------

	/** The thread that reads the input channels into buffers and passes them on to the merger. */
	private final ThreadBase<E> readThread;

	/** The thread that merges the buffer handed from the reading thread. */
	private final ThreadBase<E> sortThread;

	/** The thread that handles spilling to secondary storage. */
	private final ThreadBase<E> spillThread;
	
	// ------------------------------------------------------------------------
	//                                   Memory
	// ------------------------------------------------------------------------
	
	/** The memory segments used first for sorting and later for reading/pre-fetching
	 * during the external merge. */
	protected final List<MemorySegment> sortReadMemory;
	
	/** The memory segments used to stage data to be written. */
	protected final List<MemorySegment> writeMemory;
	
	/** The memory manager through which memory is allocated and released. */
	protected final MemoryManager memoryManager;
	
	// ------------------------------------------------------------------------
	//                            Miscellaneous Fields
	// ------------------------------------------------------------------------
	/**
	 * Collection of all currently open channels, to be closed and deleted during cleanup.
	 */
	private final HashSet<FileIOChannel> openChannels;
	
	/**
	 * The monitor which guards the iterator field.
	 */
	protected final Object iteratorLock = new Object();
	
	/**
	 * The iterator to be returned by the sort-merger. This variable is null, while receiving and merging is still in
	 * progress and it will be set once we have &lt; merge factor sorted sub-streams that will then be streamed sorted.
	 */
	protected volatile MutableObjectIterator<E> iterator; 	// 若是你們常常調試,就會發現driver中的input都是這個兄弟。

	private final Collection<InMemorySorter<?>> inMemorySorters;
}

8.2.1 三種線程

ReadingThread:這種線程持續讀取輸入,而後把數據放入到一個待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.

SortingThread : 這種線程對於上游填充好的buffer進行排序。The thread that sorts filled buffers.

SpillingThread:這種線程進行歸併操做。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.

8.2.2 MutableObjectIterator

UnilateralSortMerger有一個特殊變量:

protected volatile MutableObjectIterator<E> iterator;

這個變量就是最終sort-merger的輸出。若是你們調試過算子,就會發現這個變量就是具體算子的輸入input類型。最終算子的輸入就是來自於此。

8.3 ReduceDriver

這裏是第三次排序,咱們能夠看出來reduce是怎麼和groupby一塊兒運做的。

  1. 針對 .groupBy(0),ReduceDriver就是單純獲取輸入的第一個數值 T value = input.next();
  2. 後續代碼中有嵌套的兩個while,分別是 :遍歷各類key,以及某一key中reduce。
  3. 遍歷 group keys的時候,把value賦於比較算子comparator(這個算子概念不是Flink算子,就是爲了說明邏輯概念) comparator.setReference(value); 由於groubBy只是指定按照第一個位置比較,沒有指定具體key數值,因此這個value就是key了。此處記爲while (1) ,代碼中有註解。
  4. 從輸入中讀取後續的數值value,若是下一個數值是同一個key,就reduce;若是下一個數值不是同一個key,就跳出循環。放棄比較,把reduce結果輸出。此處記爲 while (2)
  5. 跳出 while (2) 以後,代碼依然在 while (1) ,此時value是新值,因此繼續在 while (1)中運行 。把value繼續賦於比較算子 comparator.setReference(value);,因而進行新的key比較
public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
	@Override
	public void run() throws Exception {

		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();

		// cache references on the stack
		final MutableObjectIterator<T> input = this.input;
		final TypeSerializer<T> serializer = this.serializer;
		final TypeComparator<T> comparator = this.comparator;		
		final ReduceFunction<T> function = this.taskContext.getStub();		
		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);

		if (objectReuseEnabled) {
      ......
		} else {
      // 針對 `.groupBy(0)`,ReduceDriver就是單純獲取輸入的第一個數值 `T value = input.next();`
			T value = input.next();

      // while (1)
			// iterate over key groups
			while (this.running && value != null) {
				numRecordsIn.inc();
        // 把value賦於比較算子,這個value就是key了。
				comparator.setReference(value);
				T res = value;

        // while (2)
				// iterate within a key group,循環比較這個key
				while ((value = input.next()) != null) {
					numRecordsIn.inc();
					if (comparator.equalToReference(value)) {
						// same group, reduce,若是下一個數值是同一個key,就reduce
						res = function.reduce(res, value);
					} else {
						// new key group,若是下一個數值不是同一個key,就跳出循環,放棄比較。
						break;
					}
				}
        // 把reduce結果輸出
				output.collect(res);
			}
		}
	}  
}

0x09 參考

mapreduce裏的shuffle 裏的 sort merge 和combine

實戰錄 | Hadoop Mapreduce shuffle之Combine探討

Hadoop中MapReduce中combine、partition、shuffle的做用是什麼?在程序中怎麼運用?

Flink運行時之生成做業圖

mapreduce過程

相關文章
相關標籤/搜索