結合源碼完全講解Aggregate vs treeAggregate

Aggregatephp

本文主要是講解兩個常見的聚合操做:aggregate vs treeAggregate安全

首先講解aggregate,該函數的方法具體名稱以下:運維

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
 // Clone the zero value since we will also be serializing it as part of tasks
 var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
 val cleanSeqOp = sc.clean(seqOp)
 val cleanCombOp = sc.clean(combOp)
 val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
 val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
 sc.runJob(this, aggregatePartition, mergeResult)
 jobResult
}

參數定義:機器學習

首先能夠看到,有個U類型的參數叫作zeroValue,而後有兩個方法參數,第一個是seqOp: (U, T) => U將U和T類型的數據轉化爲T類型的數據,第二個函數combOp: (U, U) => U將兩個U類型的數據轉化爲U類型,返回的是一個U類型的數據。ide

參數做用:函數

zeroValue是給定的初始值,該值將會在seqOp和combOp兩個函數中都使用。學習

seqOp在Executor端對每一個分區進行操做,會用到初始值zeroValue。大數據

combOp在driver端執行,也會用到初始值。優化

源碼簡介:ui

片斷一:

val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

 

這個源碼就是針對每一個RDD分區,進行執行的時候的函數,由於實際上每一個分區最終都是一個迭代器,而後執行迭代器的aggregate,參數也是咱們給定的參數。Iterator 的aggregate方法實際上三個參數是沒有用到的,也即CombOp沒有用到。

片斷二:

val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)

該段代碼是在Driver端執行combOp操做。

具體的執行邏輯不是本文要講解的主要內容,後面有機會浪尖會逐步給你們分析。

由上面咱們能夠總結,aggregate執行結構圖,以下:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

這種聚合操做是有缺陷的,就是全部SeqOp操做對分區的執行結果都只能所有返回給Driver端,而後在對返回的結果和初始值執行CombOp操做,這樣數據量大的時候很容易致使Driver端內存溢出,因此,就出現了優化函數treeAggregate。

treeAggregate

treeAggregate函數的具體內容以下:

def treeAggregate[U: ClassTag](zeroValue: U)(
   seqOp: (U, T) => U,
   combOp: (U, U) => U,
   depth: Int = 2): U = withScope {
 require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
 if (partitions.length == 0) {
   Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
 } else {
   val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
     (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
   var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
   // If creating an extra level doesn't help reduce
   // the wall-clock time, we stop tree aggregation.

   // Don't trigger TreeAggregation when it doesn't save wall-clock time
   while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
     numPartitions /= scale
     val curNumPartitions = numPartitions
     partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
       (i, iter) => iter.map((i % curNumPartitions, _))
     }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
   }
   partiallyAggregated.reduce(cleanCombOp)
 }
}

參數定義:

首先能夠看到,有個U類型的參數叫作zeroValue,而後有兩個方法參數,第一個是seqOp: (U, T) => U將U和T類型的數據轉化爲T類型的數據,第二個函數combOp: (U, U) => U將兩個U類型的數據轉化爲U類型,返回的是一個U類型的數據。

參數做用:

zeroValue是給定的初始值,該值將會在seqOp和combOp兩個函數中都使用。

seqOp在Executor端對每一個分區進行操做,會用到初始值zeroValue。

combOp在Executor端和driver端都會執行,不會用到初始值。

源碼簡介:

片斷一:

val aggregatePartition =
 (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))

在Executor端執行的第一層任務,主要操做是對源數據和初始值zeroValue執行seqOp操做。

片斷二:

var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
 numPartitions /= scale
 val curNumPartitions = numPartitions
 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
   (i, iter) => iter.map((i % curNumPartitions, _))
 }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
}

在執行完成第一層任務以後,執行combOp操做,主要是逐漸下降分區數,來逐層進行combOp操做,該操做是在Executor端執行,而且該操做並未用到初始值。

片斷三:

partiallyAggregated.reduce(cleanCombOp)

在Executor端初步聚合後,對結果數據使用combOp操做再次執行reduce操做。

由上面咱們能夠總結,aggregate執行結構圖,以下:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

aggregate VS treeAggregate

1, aggregate和treeAggregate的做用同樣,最終的結果區別是treeAggregate執行combOp並無用到初始值zeroValue。

2,treeAggregate比aggregate多執行了n次任務,n可計算。

3,treeAggregate下降了aggregate在driver端內存溢出的風險。

 

能夠舉個例子:

def seq(a:Int,b:Int):Int={
 println("seq:"+a+":"+b)
 a+b
}

def comb(a:Int,b:Int):Int={
 println("comb:"+a+":"+b)
 a+b
}

val res = sc.parallelize(List(1,2,4,5,8,9,7,2),3)
res.aggregate(1)(seq,comb)
res.treeAggregate(1)(seq,comb)

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

aggregate結果應該是:1+2+4+5+8+9+7+2+3*1 +1=42

treeAggregate結果應該是:1+2+4+5+8+9+7+2+3*1=41

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

關於Spark學習技巧

kafkahbasespark,Flink等入門到深刻源碼,spark機器學習,大數據安全,大數據運維

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相關文章
相關標籤/搜索