一。基本介紹函數
rdd.aggregateByKey(3, seqFunc, combFunc) 其中第一個函數是初始值 ui
3表明每次分完組以後的每一個組的初始值。spa
seqFunc表明combine的聚合邏輯rest
每個mapTask的結果的聚合成爲combinecode
combFunc reduce端大聚合的邏輯blog
ps:aggregateByKey默認分組it
二。代碼spark
from pyspark import SparkConf,SparkContext from __builtin__ import str conf = SparkConf().setMaster("local").setAppName("AggregateByKey") sc = SparkContext(conf = conf) rdd = sc.parallelize([(1,1),(1,2),(2,1),(2,3),(2,4),(1,7)],2) def f(index,items): print "partitionId:%d" %index for val in items: print val return items rdd.mapPartitionsWithIndex(f, False).count() def seqFunc(a,b): print "seqFunc:%s,%s" %(a,b) return max(a,b) #取最大值 def combFunc(a,b): print "combFunc:%s,%s" %(a ,b) return a + b #累加起來 ''' aggregateByKey這個算子內部確定有分組 ''' aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc) rest = aggregateRDD.collectAsMap() for k,v in rest.items(): print k,v sc.stop()
三。詳細邏輯io
PS:ast
seqFunc函數 combine篇。
3是每一個分組的最大值,因此把3傳進來,在combine函數中也就是seqFunc中第一次調用 3表明a,b即1,max(a,b)即3 第二次再調用則max(3.1)中的最大值3即輸入值,2即b值 因此結果則爲(1,3)
底下相似。combine函數調用的次數與分組內的數據個數一致。
combFunc函數 reduce聚合
在reduce端大聚合,拉完數據後也是先分組,而後再調用combFunc函數
四。結果
持續更新中。。。。,歡迎你們關注個人公衆號LHWorld.