Spark RDD學習: aggregate函數

最近在作項目的時候遇到了Spark RDD裏面的一個aggregate函數,以爲它的用法挺有意思的,在此記錄一下。分佈式

Spark 文檔中對 aggregate的函數定義以下:函數

def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U)(implicit arg0: ClassTag[U]): U

註釋:this

Aggregate the elements of each partition, and then the results for 
all the partitions, using given combine functions and a neutral 
"zero value". 
This function can return a different result type, U, 
than the type of this RDD, T. 
Thus, we need one operation for merging a T into an U 
and one operation for merging two U's, as in 
Scala.TraversableOnce. Both of these functions are allowed to 
modify and return their first argument instead of creating a new U 
to avoid memory allocation.

aggregate函數首先對每一個分區裏面的元素進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個操做返回的類型不須要和RDD中元素類型一致,因此在使用 aggregate()時,須要提供咱們期待的返回類型的初始值,而後經過一個函數把RDD中的元素累加起來??放入累加器?。考慮到每一個節點是在本地進行累加的,最終還須要提供第二個函數來將累加器兩兩合併。code

其中seqOp操做會聚合各分區中的元素,而後combOp操做會把全部分區的聚合結果再次聚合,兩個操做的初始值都是zeroValue. seqOp的操做是遍歷分區中的全部元素(T),第一個T跟zeroValue作操做,結果再做爲與第二個T作操做的zeroValue,直到遍歷完整個分區。combOp操做是把各分區聚合的結果,再聚合。aggregate函數返回一個跟RDD不一樣類型的值。所以,須要一個操做seqOp來把分區中的元素T合併成一個U,另一個操做combOp把全部U聚合。ci

下面舉一個利用aggreated求平均數的例子:element

val rdd = List(1,2,3,4)
val input = sc.parallelize(rdd)
val result = input.aggregate((0,0))(
(acc,value) => (acc._1 + value, acc._2 + 1),
(acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
result: (Int, Int) = (10, 4)
val avg = result._1 / result._2
avg: Int = 2.5

程序的詳細過程大概以下:文檔

  1. 首先定義一個初始值 (0, 0),即咱們期待的返回類型的初始值。input

  2. (acc,value) => (acc._1 + value, acc._2 + 1)value是函數定義裏面的T,這裏是List裏面的元素。因此acc._1 + value, acc._2 + 1的過程以下:it

    1. 0+1, 0+1io

    2. 1+2, 1+1

    3. 3+3, 2+1

    4. 6+4, 3+1

  3. 結果爲 (10,4)。在實際Spark執行中是分佈式計算,可能會把List分紅多個分區,假如3個,p1(1,2), p2(3), p3(4),通過計算各分區的的結果 (3,2), (3,1), (4,1),這樣,執行 (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 就是 (3+3+4,2+1+1)(10,4),而後再計算平均值。

相關文章
相關標籤/搜索