Aggregatephp
本文主要是講解兩個常見的聚合操做:aggregate vs treeAggregate安全
首先講解aggregate,該函數的方法具體名稱以下:運維
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }
參數定義:機器學習
首先能夠看到,有個U類型的參數叫作zeroValue,而後有兩個方法參數,第一個是seqOp: (U, T) => U將U和T類型的數據轉化爲T類型的數據,第二個函數combOp: (U, U) => U將兩個U類型的數據轉化爲U類型,返回的是一個U類型的數據。ide
參數做用:函數
zeroValue是給定的初始值,該值將會在seqOp和combOp兩個函數中都使用。學習
seqOp在Executor端對每一個分區進行操做,會用到初始值zeroValue。大數據
combOp在driver端執行,也會用到初始值。優化
源碼簡介:ui
片斷一:
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
這個源碼就是針對每一個RDD分區,進行執行的時候的函數,由於實際上每一個分區最終都是一個迭代器,而後執行迭代器的aggregate,參數也是咱們給定的參數。Iterator 的aggregate方法實際上三個參數是沒有用到的,也即CombOp沒有用到。
片斷二:
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
該段代碼是在Driver端執行combOp操做。
具體的執行邏輯不是本文要講解的主要內容,後面有機會浪尖會逐步給你們分析。
由上面咱們能夠總結,aggregate執行結構圖,以下:
這種聚合操做是有缺陷的,就是全部SeqOp操做對分區的執行結果都只能所有返回給Driver端,而後在對返回的結果和初始值執行CombOp操做,這樣數據量大的時候很容易致使Driver端內存溢出,因此,就出現了優化函數treeAggregate。
treeAggregate
treeAggregate函數的具體內容以下:
def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") if (partitions.length == 0) { Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) } else { val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. // Don't trigger TreeAggregation when it doesn't save wall-clock time while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) } }
參數定義:
首先能夠看到,有個U類型的參數叫作zeroValue,而後有兩個方法參數,第一個是seqOp: (U, T) => U將U和T類型的數據轉化爲T類型的數據,第二個函數combOp: (U, U) => U將兩個U類型的數據轉化爲U類型,返回的是一個U類型的數據。
參數做用:
zeroValue是給定的初始值,該值將會在seqOp和combOp兩個函數中都使用。
seqOp在Executor端對每一個分區進行操做,會用到初始值zeroValue。
combOp在Executor端和driver端都會執行,不會用到初始值。
源碼簡介:
片斷一:
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
在Executor端執行的第一層任務,主要操做是對源數據和初始值zeroValue執行seqOp操做。
片斷二:
var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values }
在執行完成第一層任務以後,執行combOp操做,主要是逐漸下降分區數,來逐層進行combOp操做,該操做是在Executor端執行,而且該操做並未用到初始值。
片斷三:
partiallyAggregated.reduce(cleanCombOp)
在Executor端初步聚合後,對結果數據使用combOp操做再次執行reduce操做。
由上面咱們能夠總結,aggregate執行結構圖,以下:
aggregate VS treeAggregate
1, aggregate和treeAggregate的做用同樣,最終的結果區別是treeAggregate執行combOp並無用到初始值zeroValue。
2,treeAggregate比aggregate多執行了n次任務,n可計算。
3,treeAggregate下降了aggregate在driver端內存溢出的風險。
能夠舉個例子:
def seq(a:Int,b:Int):Int={ println("seq:"+a+":"+b) a+b } def comb(a:Int,b:Int):Int={ println("comb:"+a+":"+b) a+b } val res = sc.parallelize(List(1,2,4,5,8,9,7,2),3) res.aggregate(1)(seq,comb) res.treeAggregate(1)(seq,comb)
aggregate結果應該是:1+2+4+5+8+9+7+2+3*1 +1=42
treeAggregate結果應該是:1+2+4+5+8+9+7+2+3*1=41
關於Spark學習技巧
kafka,hbase,spark,Flink等入門到深刻源碼,spark機器學習,大數據安全,大數據運維