Groupby和reduce是大數據領域常見的算子,可是不少同窗應該對其背後機制不甚瞭解。本文將從源碼入手,爲你們解析Flink中Groupby和reduce的原理,看看他們在背後作了什麼。java
探究的緣由是想到了幾個問題 :算法
爲了便於你們理解,咱們先總結下,對於一個Groupby + Reduce的操做,Flink作了以下處理:apache
這樣以前的疑問就基本獲得瞭解釋。編程
MapReduce是一種編程模型,用於大規模數據集的並行運算。概念 "Map(映射)"和"Reduce(歸約)" 是它們的主要思想,其是從函數式編程語言,矢量編程語言裏借來的特性。api
咱們目前使用的Flink,Spark都出自於MapReduce,因此咱們有必有追根溯源,看看MapReduce是如何區分各個階段的。網絡
若是把MapReduce細分,能夠分爲一下幾大過程:數據結構
Combine是咱們須要特殊注意的。在mapreduce中,map多,reduce少。在reduce中因爲數據量比較多,因此咱們乾脆在map階段中先把本身map裏面的數據歸類,這樣到了reduce的時候就減輕了壓力。app
Combine能夠理解爲是在map端的reduce的操做,對單個map任務的輸出結果數據進行合併的操做。combine是對一個map的,而reduce合併的對象是對於多個map。框架
map函數操做所產生的鍵值對會做爲combine函數的輸入,經combine函數處理後再送到reduce函數進行處理,減小了寫入磁盤的數據量,同時也減小了網絡中鍵值對的傳輸量。在Map端,用戶自定義實現的Combine優化機制類Combiner在執行Map端任務的節點自己運行,至關於對map函數的輸出作了一次reduce。編程語言
集羣上的可用帶寬每每是有限的,產生的中間臨時數據量很大時就會出現性能瓶頸,所以應該儘可能避免Map端任務和Reduce端任務之間大量的數據傳輸。使用Combine機制的意義就在於使Map端輸出更緊湊,使得寫到本地磁盤和傳給Reduce端的數據更少。
Partition是分割map每一個節點的結果,按照key分別映射給不一樣的reduce,mapreduce使用哈希HashPartitioner幫咱們歸類了。這個咱們也能夠自定義。
這裏其實能夠理解歸類。咱們對於錯綜複雜的數據歸類。好比在動物園裏有牛羊雞鴨鵝,他們都是混在一塊兒的,可是到了晚上他們就各自牛回牛棚,羊回羊圈,雞回雞窩。partition的做用就是把這些數據歸類。只不過是在寫程序的時候,
在通過mapper的運行後,咱們得知mapper的輸出是這樣一個key/value對: key是「aaa」, value是數值1。由於當前map端只作加1的操做,在reduce task裏纔去合併結果集。假如咱們知道這個job有3個reduce task,到底當前的「aaa」應該交由哪一個reduce task去作呢,是須要馬上決定的。
MapReduce提供Partitioner接口,它的做用就是根據key或value及reduce task的數量來決定當前的這對輸出數據最終應該交由哪一個reduce task處理。默認對key hash後再以reduce task數量取模。默認的取模方式只是爲了平均reduce的處理能力,若是用戶本身對Partitioner有需求,能夠訂製並設置到job上。
在咱們的例子中,假定 「aaa」通過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。
shuffle就是map和reduce之間的過程,包含了兩端的combine和partition。它比較難以理解,由於咱們摸不着,看不到它。它屬於mapreduce的框架,編程的時候,咱們用不到它。
Shuffle的大體範圍就是:怎樣把map task的輸出結果有效地傳送到reduce端。也能夠這樣理解, Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。
簡單地說,reduce task在執行以前的工做就是不斷地拉取當前job裏每一個map task的最終結果,而後對從不一樣地方拉取過來的數據不斷地作merge,最終造成一個文件做爲reduce task的輸入文件。
咱們以Flink的KMeans算法做爲樣例,具體摘要以下:
public class WordCountExampleReduce { DataStream ds; public static void main(String[] args) throws Exception { //構建環境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //經過字符串構建數據集 DataSet<String> text = env.fromElements( "Who‘s there?", "I think I hear them. Stand, ho! Who‘s there?"); //分割字符串、按照key進行分組、統計相同的key個數 DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } }); //打印 wordCounts.print(); } //分割字符串的方法 public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
輸出是:
(hear,1) (ho!,1) (them.,1) (I,2) (Stand,,1) (Who‘s,2) (there?,2) (think,1)
首先,咱們從Flink基本JAVA API來入手開始挖掘。
咱們須要留意的是:GroupBy並無對應的Operator。GroupBy只是生成DataSet轉換的一箇中間步驟或者輔助步驟。
GroupBy功能的基類是Grouping,其只是DataSet轉換的一箇中間步驟。其幾個主要成員是:
// Grouping is an intermediate step for a transformation on a grouped DataSet. public abstract class Grouping<T> { protected final DataSet<T> inputDataSet; protected final Keys<T> keys; protected Partitioner<?> customPartitioner; }
Grouping並無任何業務相關的API,具體API都是在其派生類中,好比UnsortedGrouping。
咱們代碼中對應的就是UnsortedGrouping類。咱們看到它提供了不少業務API,好比:sum,max,min,reduce,aggregate,reduceGroup,combineGroup.....
回到咱們的示例,groupBy作了以下操做
.groupBy(0).reduce(new CentroidAccumulator())
返回的是ReduceOperator。這就對應了前面咱們提到的,groupBy只是中間步驟,reduce才能返回一個Operator。public class UnsortedGrouping<T> extends Grouping<T> { // groupBy返回一個UnsortedGrouping public UnsortedGrouping<T> groupBy(int... fields) { return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType())); } // reduce返回一個ReduceOperator public ReduceOperator<T> reduce(ReduceFunction<T> reducer) { return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName()); } }
對於業務來講,reduce纔是真正有意義的邏輯算子。
從前文的函數調用和ReduceOperator定義能夠看出,.groupBy(0).reduce()
的調用結果是生成一個ReduceOperator,而 UnsortedGrouping 被設置爲 ReduceOperator 的 grouper 成員變量,做爲輔助操做。
public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> { private final ReduceFunction<IN> function; private final Grouping<IN> grouper; // UnsortedGrouping被設置在這裏,後續reduce操做中會用到。 public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) { this.function = function; this.grouper = input; // UnsortedGrouping被設置在這裏,後續reduce操做中會用到。 this.hint = CombineHint.OPTIMIZER_CHOOSES; // 優化時候會用到。 } }
讓咱們順着Flink程序執行階段繼續看看系統都作了些什麼。
程序執行的第一步是:當程序運行時候,首先會根據java API的結果來生成執行plan。
public JobClient executeAsync(String jobName) throws Exception { final Plan plan = createProgramPlan(jobName); }
其中重要的函數是translateToDataFlow,由於在translateToDataFlow方法中,會從批處理Java API模塊中operators包往核心模塊中operators包的轉換。
對於咱們的示例程序,在生成 Graph時,translateToDataFlow會生成一個 SingleInputOperator,爲後續runtime使用。下面是代碼縮減版。
protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) { ...... // UnsortedGrouping中的keys被取出, else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { // reduce with field positions ReduceOperatorBase<IN, ReduceFunction<IN>> po = new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name); po.setCustomPartitioner(grouper.getCustomPartitioner()); po.setInput(input); po.setParallelism(getParallelism()); // 沒有並行度的變化 return po;//translateToDataFlow會生成一個 SingleInputOperator,爲後續runtime使用 } } }
咱們代碼最終生成的執行計劃以下,咱們能夠看出來,執行計劃基本符合咱們的估計:簡單的從輸入到輸出。中間有意義的算子其實只有Reduce。
GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase
具體在代碼中體現以下是:
plan = {Plan@1296} sinks = {ArrayList@1309} size = 1 0 = {GenericDataSinkBase@1313} "collect()" formatWrapper = {UserCodeObjectWrapper@1315} input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)" hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES" customPartitioner = null input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)" input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"
程序執行的第二步是:Flink對於Plan會繼續優化,生成Optimized Plan。其核心代碼位於PlanTranslator.compilePlan 函數,這裏獲得了Optimized Plan。
這個編譯的過程不做任何決策與假設,也就是說做業最終如何被執行早已被優化器肯定,而編譯也是在此基礎上作肯定性的映射。因此咱們將集中精力看如何優化plan。
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) { Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration); OptimizedPlan optimizedPlan = optimizer.compile(plan); JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration); return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId()); }
在內部調用plan的accept方法遍歷它。accept會挨個在每一個sink上調用accept。對於每一個sink會先preVisit,而後 postVisit。
這裏優化時候有幾個注意點:
在 GraphCreatingVisitor.preVisit 中,當發現Operator是 ReduceOperatorBase 類型的時候,會創建ReduceNode。
else if (c instanceof ReduceOperatorBase) { n = new ReduceNode((ReduceOperatorBase<?, ?>) c); }
ReduceNode是Reducer Operator的Optimizer表示。
public class ReduceNode extends SingleInputNode { private final List<OperatorDescriptorSingle> possibleProperties; private ReduceNode preReduceUtilityNode; }
生成ReduceNode時候,會根據以前提到的 hint 來決定 combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
public ReduceNode(ReduceOperatorBase<?, ?> operator) { DriverStrategy combinerStrategy; switch(operator.getCombineHint()) { case OPTIMIZER_CHOOSES: combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE; break; } }
生成的優化執行計劃以下,咱們能夠看到,這時候設置了並行度,也把reduce分割成兩段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
Data Source ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE) ——> Reduce(SORTED_REDUCE) ——> Data Sink
具體在代碼中體現以下是:
optimizedPlan = {OptimizedPlan@1506} allNodes = {HashSet@1510} size = 5 0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4 4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]" parallelism = 4
程序執行的第三步是:創建JobGraph。LocalExecutor.execute中會生成JobGraph。Optimized Plan 通過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。
主要的優化爲,將多個符合條件的節點 chain 在一塊兒做爲一個節點,這樣能夠減小數據在節點之間流動所須要的序列化/反序列化/傳輸消耗。
JobGraph是惟一被Flink的數據流引擎所識別的表述做業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception { final JobGraph jobGraph = getJobGraph(pipeline, configuration); }
咱們能夠看出來,這一步造成了一個Operator Chain:
CHAIN DataSource -> FlatMap -> Combine (Reduce)
因而咱們看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合併在一塊兒。
具體在程序中打印出來:
jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)" taskVertices = {LinkedHashMap@1742} size = 3 {JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)" {JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)" {JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
Job提交以後,就是程序正式運行了。這裏實際上涉及到了三次排序,
這裏是第一次排序。
當一批數據處理完成以後,在ChainedFlatMapDriver中調用到close函數進行發送數據給下游。
public void close() { this.outputCollector.close(); }
Operator Chain會調用到ChainedReduceCombineDriver.close
public void close() { // send the final batch try { switch (strategy) { case SORTED_PARTIAL_REDUCE: sortAndCombine(); // 咱們是在這裏 break; case HASHED_PARTIAL_REDUCE: reduceFacade.emit(); break; } } catch (Exception ex2) { throw new ExceptionInChainedStubException(taskName, ex2); } outputCollector.close(); dispose(false); }
sortAndCombine中先排序,而後作combine,最後會不斷髮送數據。
private void sortAndCombine() throws Exception { final InMemorySorter<T> sorter = this.sorter; if (!sorter.isEmpty()) { sortAlgo.sort(sorter); // 這裏會先排序 final TypeSerializer<T> serializer = this.serializer; final TypeComparator<T> comparator = this.comparator; final ReduceFunction<T> function = this.reducer; final Collector<T> output = this.outputCollector; final MutableObjectIterator<T> input = sorter.getIterator(); if (objectReuseEnabled) { ...... } else { T value = input.next(); // 這裏就是combine // iterate over key groups while (running && value != null) { comparator.setReference(value); T res = value; // iterate within a key group while ((value = input.next()) != null) { if (comparator.equalToReference(value)) { // same group, reduce res = function.reduce(res, value); } else { // new key group break; } } output.collect(res); //發送數據 } } } }
最後發送給哪一個下游,是由OutputEmitter.selectChannel決定的。有以下幾種決定方式:
hash-partitioning, broadcasting, round-robin, custom partition functions。這裏採用的是PARTITION_HASH。
每一個task都會把一樣字符串統計結果發送給一樣的下游ReduceDriver。這就保證了下游Reducer必定不會出現統計出錯。
public final int selectChannel(SerializationDelegate<T> record) { switch (strategy) { ... case PARTITION_HASH: return hashPartitionDefault(record.getInstance(), numberOfChannels); ... } } private int hashPartitionDefault(T record, int numberOfChannels) { int hash = this.comparator.hash(record); return MathUtils.murmurHash(hash) % numberOfChannels; }
具體調用棧:
hash:50, TupleComparator (org.apache.flink.api.java.typeutils.runtime) hash:30, TupleComparator (org.apache.flink.api.java.typeutils.runtime) hashPartitionDefault:187, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:147, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping) emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer) collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) sortAndCombine:254, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining) close:266, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining) close:40, CountingCollector (org.apache.flink.runtime.operators.util.metrics) close:88, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining) invoke:215, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
這裏是第二次排序。
在 BatchTask中,會先Sort, Merge輸入,而後纔會交由Reduce來具體完成過。sort & merge操做具體是在UnilateralSortMerger類中完成的。
getIterator:646, UnilateralSortMerger (org.apache.flink.runtime.operators.sort) getInput:1110, BatchTask (org.apache.flink.runtime.operators) prepare:95, ReduceDriver (org.apache.flink.runtime.operators) run:474, BatchTask (org.apache.flink.runtime.operators) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
UnilateralSortMerger是一個full fledged sorter,它實現了一個多路merge sort。其內部的邏輯被劃分到三個線程上(read, sort, spill),這三個線程彼此之間經過一系列blocking queues來構成了一個閉環。
其內存經過MemoryManager分配,因此這個組件不會超過給其分配的內存。
該類主要變量摘錄以下:
public class UnilateralSortMerger<E> implements Sorter<E> { // ------------------------------------------------------------------------ // Threads // ------------------------------------------------------------------------ /** The thread that reads the input channels into buffers and passes them on to the merger. */ private final ThreadBase<E> readThread; /** The thread that merges the buffer handed from the reading thread. */ private final ThreadBase<E> sortThread; /** The thread that handles spilling to secondary storage. */ private final ThreadBase<E> spillThread; // ------------------------------------------------------------------------ // Memory // ------------------------------------------------------------------------ /** The memory segments used first for sorting and later for reading/pre-fetching * during the external merge. */ protected final List<MemorySegment> sortReadMemory; /** The memory segments used to stage data to be written. */ protected final List<MemorySegment> writeMemory; /** The memory manager through which memory is allocated and released. */ protected final MemoryManager memoryManager; // ------------------------------------------------------------------------ // Miscellaneous Fields // ------------------------------------------------------------------------ /** * Collection of all currently open channels, to be closed and deleted during cleanup. */ private final HashSet<FileIOChannel> openChannels; /** * The monitor which guards the iterator field. */ protected final Object iteratorLock = new Object(); /** * The iterator to be returned by the sort-merger. This variable is null, while receiving and merging is still in * progress and it will be set once we have < merge factor sorted sub-streams that will then be streamed sorted. */ protected volatile MutableObjectIterator<E> iterator; // 若是你們常常調試,就會發現driver中的input都是這個兄弟。 private final Collection<InMemorySorter<?>> inMemorySorters; }
ReadingThread:這種線程持續讀取輸入,而後把數據放入到一個待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.
SortingThread : 這種線程對於上游填充好的buffer進行排序。The thread that sorts filled buffers.
SpillingThread:這種線程進行歸併操做。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.
UnilateralSortMerger有一個特殊變量:
protected volatile MutableObjectIterator<E> iterator;
這個變量就是最終sort-merger的輸出。若是你們調試過算子,就會發現這個變量就是具體算子的輸入input類型。最終算子的輸入就是來自於此。
這裏是第三次排序,咱們能夠看出來reduce是怎麼和groupby一塊兒運做的。
.groupBy(0)
,ReduceDriver就是單純獲取輸入的第一個數值 T value = input.next();
comparator.setReference(value);
由於groubBy只是指定按照第一個位置比較,沒有指定具體key數值,因此這個value就是key了。此處記爲while (1)
,代碼中有註解。while (2)
while (2)
以後,代碼依然在 while (1)
,此時value是新值,因此繼續在 while (1)
中運行 。把value繼續賦於比較算子 comparator.setReference(value);
,因而進行新的key比較。public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { @Override public void run() throws Exception { final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter(); final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); // cache references on the stack final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; final TypeComparator<T> comparator = this.comparator; final ReduceFunction<T> function = this.taskContext.getStub(); final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut); if (objectReuseEnabled) { ...... } else { // 針對 `.groupBy(0)`,ReduceDriver就是單純獲取輸入的第一個數值 `T value = input.next();` T value = input.next(); // while (1) // iterate over key groups while (this.running && value != null) { numRecordsIn.inc(); // 把value賦於比較算子,這個value就是key了。 comparator.setReference(value); T res = value; // while (2) // iterate within a key group,循環比較這個key while ((value = input.next()) != null) { numRecordsIn.inc(); if (comparator.equalToReference(value)) { // same group, reduce,若是下一個數值是同一個key,就reduce res = function.reduce(res, value); } else { // new key group,若是下一個數值不是同一個key,就跳出循環,放棄比較。 break; } } // 把reduce結果輸出 output.collect(res); } } } }
mapreduce裏的shuffle 裏的 sort merge 和combine
實戰錄 | Hadoop Mapreduce shuffle之Combine探討