[源碼解析] GroupReduce,GroupCombine 和 Flink SQL group by

[源碼解析] GroupReduce,GroupCombine和Flink SQL group by

0x00 摘要

本文從源碼和實例入手,爲你們解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的內部實現。html

0x01 原因

在前文[源碼解析] Flink的Groupby和reduce究竟作了什麼中,咱們剖析了Group和reduce都作了些什麼,也對combine有了一些瞭解。可是總感受意猶未盡,由於在Flink還提出了若干新算子,好比GroupReduce和GroupCombine。這幾個算子不搞定,總以爲如鯁在喉,但沒有找到一個良好的例子來進行剖析說明。java

本文是筆者在探究Flink SQL UDF問題的一個副產品。起初是爲了調試一段sql代碼,結果發現Flink自己給出了一個GroupReduce和GroupCombine使用的完美例子。因而就拿出來和你們共享,一塊兒分析看看究竟如何使用這兩個算子。mysql

請注意:這個例子是Flink SQL,因此本文中將涉及Flink SQL goup by內部實現的知識。sql

0x02 概念

Flink官方對於這兩個算子的使用說明以下:apache

2.1 GroupReduce

GroupReduce算子應用在一個已經分組了的DataSet上,其會對每一個分組都調用到用戶定義的group-reduce函數。它與Reduce的區別在於用戶定義的函數會當即得到整個組。api

Flink將在組的全部元素上使用Iterable調用用戶自定義函數,而且能夠返回任意數量的結果元素。數據結構

2.2 GroupCombine

GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被歸納爲容許將輸入類型 I 組合到任意輸出類型O。與此相對的是,GroupReduce中的組合步驟僅容許從輸入類型 I 到輸出類型 I 的組合。這是由於GroupReduceFunction的 "reduce步驟" 指望本身的輸入類型爲 I。app

在一些應用中,咱們指望在執行附加變換(例如,減少數據大小)以前將DataSet組合成中間格式。這能夠經過CombineGroup轉換能以很是低的成本實現。ide

注意:分組數據集上的GroupCombine在內存中使用貪婪策略執行,該策略可能不會一次處理全部數據,而是以多個步驟處理。它也能夠在各個分區上執行,而無需像GroupReduce轉換那樣進行數據交換。這可能會致使輸出的是部分結果,因此GroupCombine是不能替代GroupReduce操做的,儘管它們的操做內容可能看起來都同樣。函數

2.3 例子

是否是有點暈?仍是直接讓代碼來講話吧。如下官方示例演示瞭如何將CombineGroup和GroupReduce轉換用於WordCount實現。即經過combine操做先對單詞數目進行初步排序,而後經過reduceGroup對combine產生的結果進行最終排序。由於combine進行了初步排序,因此在算子之間傳輸的數據量就少多了

DataSet<String> input = [..] // The words received as input

// 這裏經過combine操做先對單詞數目進行初步排序,其優點在於用戶定義的combine函數只調用一次,由於runtime已經把輸入數據一次性都提供給了自定義函數。  
DataSet<Tuple2<String, Integer>> combinedWords = input
  .groupBy(0) // group identical words
  .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {

    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
        String key = null;
        int count = 0;

        for (String word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

// 這裏對combine的結果進行第二次排序,其優點在於用戶定義的reduce函數只調用一次,由於runtime已經把輸入數據一次性都提供給了自定義函數。  
DataSet<Tuple2<String, Integer>> output = combinedWords
  .groupBy(0)                              // group by words again
  .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange

    public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
        String key = null;
        int count = 0;

        for (Tuple2<String, Integer> word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

看到這裏,有的兄弟已經明白了,這和mapPartition很相似啊,都是runtime作了大量工做。爲了讓你們這兩個算子的使用情形有深入的認識,咱們再經過一個sql的例子,向你們展現Flink內部是怎麼應用這兩個算子的,也能看出來他們的強大之處

0x03 代碼

下面代碼主要參考自 flink 使用問題彙總。咱們能夠看到這裏經過groupby進行了聚合操做。其中collect方法,相似於mysql的group_concat。

public class UdfExample {
    public static class MapToString extends ScalarFunction {

        public String eval(Map<String, Integer> map) {
            if(map==null || map.size()==0) {
                return "";
            }
            StringBuffer sb=new StringBuffer();
            for(Map.Entry<String, Integer> entity : map.entrySet()) {
                sb.append(entity.getKey()+",");
            }
            String result=sb.toString();
            return result.substring(0, result.length()-1);
        }
    }

    public static void main(String[] args) throws Exception {
        MemSourceBatchOp src = new MemSourceBatchOp(new Object[][]{
                new Object[]{"1", "a", 1L},
                new Object[]{"2", "b33", 2L},
                new Object[]{"2", "CCC", 2L},
                new Object[]{"2", "xyz", 2L},
                new Object[]{"1", "u", 1L}
        }, new String[]{"f0", "f1", "f2"});

        BatchTableEnvironment environment = MLEnvironmentFactory.getDefault().getBatchTableEnvironment();
        Table table = environment.fromDataSet(src.getDataSet());
        environment.registerTable("myTable", table);
        BatchOperator.registerFunction("MapToString",  new MapToString());
        BatchOperator.sqlQuery("select f0, mapToString(collect(f1)) as type from myTable group by f0").print();
    }
}

程序輸出是

f0|type
--|----
1|a,u
2|CCC,b33,xyz

這個SQL語句的重點是group by。這個是程序猿常用的操做。可是你們有沒有想過這個group by在真實運行起來時候是怎麼操做的呢?針對大數據環境有沒有作了什麼優化呢?其實,Flink正是使用了GroupReduce和GroupCombine來實現而且優化了group by的功能。優化之處在於:

  • GroupReduce和GroupCombine的函數調用次數要遠低於正常的reduce算子,若是reduce操做中涉及到頻繁建立額外的對象或者外部資源操做,則會至關省時間。
  • 由於combine進行了初步排序,因此在算子之間傳輸的數據量就少多了。

SQL生成Flink的過程十分錯綜複雜,因此咱們只能找最關鍵處。其是在 DataSetAggregate.translateToPlan 完成的。咱們能夠看到,對於SQL語句 「select f0, mapToString(collect(f1)) as type from myTable group by f0」,Flink系統把它翻譯成以下階段,即

  • pre-aggregation :排序 + combine;
  • final aggregation :排序 + reduce;

從以前的文章咱們能夠知道,groupBy這個其實不是一個算子,它只是排序過程當中的一個輔助步驟而已,因此咱們重點仍是要看combineGroup和reduceGroup。這偏偏是咱們想要的完美例子。

input ----> (groupBy + combineGroup) ----> (groupBy + reduceGroup) ----> output

SQL生成的Scala代碼以下,其中 combineGroup在後續中將生成GroupCombineOperator,reduceGroup將生成GroupReduceOperator。

override def translateToPlan(
      tableEnv: BatchTableEnvImpl,
      queryConfig: BatchQueryConfig): DataSet[Row] = {

    if (grouping.length > 0) {
      // grouped aggregation
      ...... 
      if (preAgg.isDefined) { // 咱們的例子是在這裏
        inputDS          
          // pre-aggregation
          .groupBy(grouping: _*)
          .combineGroup(preAgg.get) // 將生成GroupCombineOperator算子
          .returns(preAggType.get)
          .name(aggOpName)
          // final aggregation
          .groupBy(grouping.indices: _*) //將生成GroupReduceOperator算子。
          .reduceGroup(finalAgg.right.get)
          .returns(rowTypeInfo)
          .name(aggOpName)
      } else {
        ......
      }
    }
    else {
      ......
    }
  }
}

// 程序變量打印以下
this = {DataSetAggregate@5207} "Aggregate(groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))"
 cluster = {RelOptCluster@5220}

0x05 JobGraph

LocalExecutor.execute中會生成JobGraph。JobGraph是提交給 JobManager 的數據結構,是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。

在生成JobGraph時候,系統獲得以下JobVertex。

jobGraph = {JobGraph@5652} "JobGraph(jobId: 6aae8b5e5ad32f588136bef26f8b65f6)"
 taskVertices = {LinkedHashMap@5655}  size = 4

{JobVertexID@5677} "c625209bb7fb9a098807551840aeaa99" -> {InputOutputFormatVertex@5678} "CHAIN DataSource (at initializeDataSource(MemSourceBatchOp.java:98) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (select: (f0, f1)) (org.apache.flink.runtime.operators.DataSourceTask)"

{JobVertexID@5679} "b56ace4acd7a2f69ea110a9f262ff80a" -> {JobVertex@5680} "CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map (Map at linkFrom(MapBatchOp.java:35)) (org.apache.flink.runtime.operators.BatchTask)"
 
{JobVertexID@5681} "3f5e2a0f700421d80ce85e02a6d9db73" -> {InputOutputFormatVertex@5682} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
 
{JobVertexID@5683} "ad29dc5b2e0a39ad2cd1d164b6f859f7" -> {JobVertex@5684} "GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) (org.apache.flink.runtime.operators.BatchTask)"

咱們能夠看到,在JobGraph中就生成了對應的兩個算子。其中這裏的FlatMap就是用戶的UDF函數MapToString的映射生成。

GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))  
  
CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map

0x06 Runtime

最後,讓咱們看看runtime會如何處理這兩個算子。

6.1 ChainedFlatMapDriver

首先,Flink會在ChainedFlatMapDriver.collect中對record進行處理,這是從Table中提取數據所必須經歷的,與後續的group by關係不大。

@Override
public void collect(IT record) {
   try {
      this.numRecordsIn.inc();
      this.mapper.flatMap(record, this.outputCollector);
   } catch (Exception ex) {
      throw new ExceptionInChainedStubException(this.taskName, ex);
   }
}

// 這裏可以看出來,咱們獲取了第一列記錄
record = {Row@9317} "1,a,1"
 fields = {Object[3]@9330} 
this.taskName = "FlatMap (select: (f0, f1))"

// 程序堆棧打印以下
collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

6.2 GroupReduceCombineDriver

其次,GroupReduceCombineDriver.run()中會進行combine操做。

  1. 會經過this.sorter.write(value)把數據寫到排序緩衝區。
  2. 會經過sortAndCombineAndRetryWrite(value)進行實際的排序,合併。

由於是系統實現,因此Combine的用戶自定義函數就是由Table API提供的,好比org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate

@Override
public void run() throws Exception {
   final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
   final TypeSerializer<IN> serializer = this.serializer;

   if (objectReuseEnabled) {
    .....
   }
   else {
      IN value;
      while (running && (value = in.next()) != null) {
         // try writing to the sorter first
         if (this.sorter.write(value)) {
            continue;
         }

         // do the actual sorting, combining, and data writing
         sortAndCombineAndRetryWrite(value);
      }
   }

   // sort, combine, and send the final batch
   if (running) {
      sortAndCombine();
   }
}

// 程序變量以下
value = {Row@9494} "1,a"
 fields = {Object[2]@9503}

sortAndCombine是具體排序/合併的過程。

  1. 排序是經過 org.apache.flink.runtime.operators.sort.QuickSort 完成的。
  2. 合併是經過 org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate 完成的。
  3. 給下游是由 org.apache.flink.table.runtime.aggregate.DataSetPreAggFunction.combine 調用 out.collect(output) 完成的。
private void sortAndCombine() throws Exception {
   final InMemorySorter<IN> sorter = this.sorter;
   // 這裏進行實際的排序
   this.sortAlgo.sort(sorter);
   final GroupCombineFunction<IN, OUT> combiner = this.combiner;
   final Collector<OUT> output = this.output;

   // iterate over key groups
   if (objectReuseEnabled) {
			......		
   } else {
      final NonReusingKeyGroupedIterator<IN> keyIter = 
            new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
      // 這裏是歸併操做
      while (this.running && keyIter.nextKey()) {
         // combiner.combiner 是用戶定義操做,runtime把某key對應的數據一次性傳給它
         combiner.combine(keyIter.getValues(), output);
      }
   }
}

具體調用棧以下:

accumulate:57, CollectAggFunction (org.apache.flink.table.functions.aggfunctions)
accumulate:-1, DataSetAggregatePrepareMapHelper$5
combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)
sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

6.3 GroupReduceDriver & ChainedFlatMapDriver

這兩個放在一塊兒,是由於他們組成了Operator Chain。

GroupReduceDriver.run中完成了reduce。具體reduce 操做是在 org.apache.flink.table.runtime.aggregate.DataSetFinalAggFunction.reduce 完成的,而後在其中直接發送給下游 out.collect(output)

@Override
public void run() throws Exception {
   // cache references on the stack
   final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
 
   if (objectReuseEnabled) {
       ......	
   }
   else {
      final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.comparator);
      // run stub implementation
      while (this.running && iter.nextKey()) {
         // stub.reduce 是用戶定義操做,runtime把某key對應的數據一次性傳給它
         stub.reduce(iter.getValues(), output);
      }
   }
}

從前文咱們能夠,這裏已經配置成了Operator Chain,因此out.collect(output)會調用到CountingCollector。CountingCollector的成員變量collector已經配置成了ChainedFlatMapDriver。

public void collect(OUT record) {
   this.numRecordsOut.inc();
   this.collector.collect(record);
}

this.collector = {ChainedFlatMapDriver@9643} 
 mapper = {FlatMapRunner@9610} 
 config = {TaskConfig@9655} 
 taskName = "FlatMap (select: (f0, mapToString($f1) AS type))"

因而程序就調用到了 ChainedFlatMapDriver.collect

public void collect(IT record) {
   try {
      this.numRecordsIn.inc();
      this.mapper.flatMap(record, this.outputCollector);
   } catch (Exception ex) {
      throw new ExceptionInChainedStubException(this.taskName, ex);
   }
}

最終調用棧如以下:

eval:21, UdfExample$MapToString (com.alibaba.alink)
flatMap:-1, DataSetCalcRule$14
flatMap:52, FlatMapRunner (org.apache.flink.table.runtime)
flatMap:31, FlatMapRunner (org.apache.flink.table.runtime)
collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
reduce:80, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)
run:131, GroupReduceDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

0x07 總結

由此咱們能夠看到:

  • GroupReduce,GroupCombine和mapPartition十分相似,都是從系統層面對算子進行優化,把循環操做放到用戶自定義函數來處理。
  • 對於group by這個SQL語句,Flink將其翻譯成 GroupReduce + GroupCombine,採用兩階段優化的方式來完成了對大數據下的處理。

0x08 參考

flink 使用問題彙總

相關文章
相關標籤/搜索