Spark RDD Transformation 簡單用例(一)

map(func

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] 

 

map(func) Return a new distributed dataset formed by passing each element of the source through a function func

將原RDD中的每個元素通過func函數映射爲一個新的元素造成一個新的RDD。es6

示例:apache

其中sc.parallelize第二個參數標識RDD的分區數量app

val rdd = sc.parallelize(1 to 9,2)
val rdd1=rdd.map(x=>x+1)
scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.take(20)
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val rdd1=rdd.map(x=>x+1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

scala> rdd1.take(20)
res5: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10)

 

filter(func

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/

def filter(f: T => Boolean): RDD[T]

filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true. 

原RDD中經過func函數結果爲true的元素轉換成一個新的RDD。less

val rdd = sc.parallelize(1 to 9,2)
val rdd1 = rdd.filter(_>=5)

 

scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd1 = rdd.filter(_>=5)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at filter at <console>:26

scala> rdd1.take(10)
res13: Array[Int] = Array(5, 6, 7, 8, 9)

 

flatMap(func)

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

 

flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). 

和map相似,可是每個元素可能被映射爲0個或多個元素(func函數應該返回一個Seq,而不是單個的元素);實際上就是先進行map,而後再進行一次平滑(flat)處理。dom

val rdd = sc.parallelize(1 to 3,2)
val rdd1 = rdd.flatMap( _ to 5)
scala> val rdd = sc.parallelize(1 to 3,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val rdd1 = rdd.flatMap( _ to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at flatMap at <console>:26

scala> rdd1.take(100)
res14: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5)

 

mapPartitions(func

/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. 

和map相似,該函數和map函數相似,只不過映射函數的參數由RDD中的每個元素變成了RDD中每個分區的迭代器。若是在映射的過程當中須要頻繁建立額外的對象,使用mapPartitions要比map高效的過。ide

計算每個partition中元素個數函數

 

def countPartitionEle(it : Iterator[Int]) = {
    var result = List[Int]()
     var i = 0
     while(it.hasNext){
       i += 1
       it.next
     }
     result.::(i).iterator//::在列表開頭增長元素i,元素i必須用小括號包含,而後建立一個迭代器
}

val rdd = sc.parallelize(1 to 10, 3)
val rdd1 = rdd.mapPartitions(countPartitionEle(_))
rdd1.take(10)

 

 

scala> def countPartitionEle(it : Iterator[Int]) = {
     | var result = List[Int]()
     | var i = 0
     | while(it.hasNext){
     | i += 1
     | it.next
     | }
     | result.::(i).iterator
     | }
countPartitionEle: (it: Iterator[Int])Iterator[Int]

scala> val rdd = sc.parallelize(1 to 10, 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd1 = rdd.mapPartitions(countPartitionEle(_))
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at mapPartitions at <console>:30

scala> rdd1.take(10)
res8: Array[Int] = Array(3, 3, 4)

 mapPartitionsWithIndex(func

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]this

mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. 

和mapPartitions相似,也是針對每一個分區處理,可是func函數須要兩個入參,第一個表示partition分區索引,第二個入參表示每一個分區的迭代器。lua

def func(index :Int, it : Iterator[Int]) = {
    var result = List[String]()
     var i = ""
     while(it.hasNext){
       i += it.next + ","
     }
     result.::(i.dropRight(1) + " at partition "+index+".").iterator
}

val rdd = sc.parallelize(1 to 10, 3)
val rdd1 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))
rdd1.take(3)
scala> def func(index :Int, it : Iterator[Int]) = {
     |     var result = List[String]()
     |      var i = ""
     |      while(it.hasNext){
     |        i += it.next + ","
     |      }
     |      result.::(i.dropRight(1) + " at partition "+index+".").iterator
     | }
func: (index: Int, it: Iterator[Int])Iterator[String]



scala> val rdd = sc.parallelize(1 to 10, 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd1 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:28


scala> rdd1.take(3)
res2: Array[String] = Array(1,2,3 at partition 0. 4,5,6 at partition 1. 7,8,9,10 at partition 2.)

 

sample(withReplacement, fraction, seed

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]es5

sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. 

對原RDD進行採樣,其中withReplacement表示是否有放回的抽樣,fraction表示採樣大小是原RDD的百分比,seed表示隨機數生成器

scala> val rdd = sc.parallelize(1 to 10, 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val rdd1 = rdd.sample(true,0.5,0)
rdd1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[4] at sample at <console>:26

scala> val rdd2 = rdd.sample(false,0.5,0)
rdd2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[5] at sample at <console>:26

scala> rdd1.collect
res3: Array[Int] = Array(2)

scala> rdd2.collect
res4: Array[Int] = Array(1, 2, 4, 5, 6, 9)                                      


scala> val rdd1 = rdd.sample(true,0.5,1)
rdd1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[7] at sample at <console>:26

scala> rdd1.collect
res6: Array[Int] = Array(1, 3, 7, 7, 8, 8, 9, 10)

 

union(otherDataset) 

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T]

union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument. 

將兩個RDD合併成一個RDD,相同的元素可能出現屢次,可使用distinct去重。

val rdd1 = sc.parallelize(1 to 5,2)
val rdd2 = sc.parallelize(1 to 5,3)
val rdd3 = sc.parallelize(2 to 8,3)
val rdd = rdd1.union(rdd2).union(rdd3)
rdd.collect
rdd.distinct.collect
scala> val rdd1 = sc.parallelize(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(1 to 5,3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val rdd3 = sc.parallelize(2 to 8,3)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24


scala> val rdd = rdd1.union(rdd2).union(rdd3)
rdd: org.apache.spark.rdd.RDD[Int] = UnionRDD[12] at union at <console>:30

scala> rdd.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 2, 3, 4, 5, 6, 7, 8)     

scala> rdd.distinct.collect
res8: Array[Int] = Array(8, 1, 2, 3, 4, 5, 6, 7)   

 

intersection(otherDataset

/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*/
def intersection(other: RDD[T]): RDD[T]

intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument. 

兩個RDD共同的元素組合一個新的RDD

val rdd1 = sc.parallelize(1 to 5,2)
val rdd2 = sc.parallelize(4 to 8,3)
val rdd = rdd1.intersection(rdd2)
rdd.collect
scala> val rdd1 = sc.parallelize(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(4 to 8,3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> val rdd = rdd1.intersection(rdd2)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at intersection at <console>:28


scala> rdd.collect
res9: Array[Int] = Array(4, 5)

  scala> rdd.partitions.length
  res10: Int = 3

 

/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did. Performs a hash partition across the cluster
*
* Note that this method performs a shuffle internally.
*
* @param numPartitions How many partitions to use in the resulting RDD
*/
def intersection(other: RDD[T], numPartitions: Int): RDD[T]

def intersection(other: RDD[T]): RDD[T],numPartitions表示結果RDD的分區數量

scala> val rdd = rdd1.intersection(rdd2,1)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at intersection at <console>:28

scala> rdd.partitions.length
res12: Int = 1

 

/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*
* @param partitioner Partitioner to use for the resulting RDD
*/
def intersection(
other: RDD[T],
partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

 

自定義分區

 自定義分區類必須繼承Partitioner,方法numPartitions設置分區數量,getPartition獲取分區索引。

class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    key.toString.toInt%numPartitions
  }
}

 

val rdd1 = sc.parallelize(1 to 15,2)
val rdd2 = sc.parallelize(5 to 25,2)
val rdd = rdd1.intersection(rdd2,new MyPartitioner(5))
rdd.collect
rdd.partitions.length
def func(index :Int, it : Iterator[Int]) = {
     var result = List[String]()
     var i = ""
     while(it.hasNext){
       i += it.next + ","
     }
     result.::(i.dropRight(1) + " at partition "+index+".").iterator
}

val rdd3 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))
rdd3.collect

 

scala> val rdd1 = sc.parallelize(1 to 15,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:27

scala> val rdd2 = sc.parallelize(5 to 25,2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at <console>:27

scala> val rdd = rdd1.intersection(rdd2,newMyPartitioner(5))
<console>:31: error: not found: value newMyPartitioner
       val rdd = rdd1.intersection(rdd2,newMyPartitioner(5))
                                        ^

scala> val rdd = rdd1.intersection(rdd2,new MyPartitioner(5))
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at intersection at <console>:32

scala> rdd.collect
res25: Array[Int] = Array(15, 10, 5, 11, 6, 7, 12, 13, 8, 14, 9)


scala> rdd.partitions.length
res26: Int = 5

scala> def func(index :Int, it : Iterator[Int]) = {
     |     var result = List[String]()
     |      var i = ""
     |      while(it.hasNext){
     |        i += it.next + ","
     |      }
     |      result.::(i.dropRight(1) + " at partition "+index+".").iterator
     | }
func: (index: Int, it: Iterator[Int])Iterator[String]

scala> val rdd3 = rdd.mapPartitionsWithIndex((x,it) => func(x,it))
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at mapPartitionsWithIndex at <console>:36

scala> rdd3.collect
res27: Array[String] = Array(15,10,5 at partition 0., 11,6 at partition 1., 7,12 at partition 2., 13,8 at partition 3., 14,9 at partition 4.)

 

distinct([numTasks]) 

distinct([numTasks]) Return a new dataset that contains the distinct elements of the source dataset.

使用原RDD中的元素組成一個沒有重複元素的RDD

/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

numPartitions表示結果RDD的分區數量

 

val a = Array(1,1,1,2,2,3,4,5)
val rdd = sc.parallelize(a,2)
rdd.collect
val rdd1 = rdd.distinct(1)
rdd1.collect
rdd1.partitions.length

 

scala> val a = Array(1,1,1,2,2,3,4,5)
a: Array[Int] = Array(1, 1, 1, 2, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(a)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at <console>:26

scala> val rdd = sc.parallelize(a,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:26

scala> rdd.collect
res13: Array[Int] = Array(1, 1, 1, 2, 2, 3, 4, 5)                               

scala> val rdd1 = rdd.distinct(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at distinct at <console>:28

scala> rdd1.collect
res14: Array[Int] = Array(4, 1, 3, 5, 2)

scala> rdd1.partitions.length
res15: Int = 1

 

/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T]

distinct(numPartitions: Int),不一樣的是結果RDD中partition數量依賴父RDD。

scala> val rdd1 = rdd.distinct()
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[49] at distinct at <console>:28

scala> rdd1.partitions.length
res16: Int = 2

scala> rdd1.collect
res17: Array[Int] = Array(4, 2, 1, 3, 5)

 

keyBy(func)

/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)]

使用func爲RDD每個元素建立一個key-value對元素

val rdd = sc.parallelize(1 to 9 ,2)
val rdd1 = rdd.keyBy(_%3)
rdd1.collect

 

scala> val rdd = sc.parallelize(1 to 9 ,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd1 = rdd.keyBy(_%3)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at keyBy at <console>:26

scala> rdd1.collect
res0: Array[(Int, Int)] = Array((1,1), (2,2), (0,3), (1,4), (2,5), (0,6), (1,7), (2,8), (0,9))

 

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements
* within each group is not guaranteed, and may even differ each time the resulting RDD is
* evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupByKey(): RDD[(K, Iterable[V])]
val rdd = sc.parallelize(1 to 9 ,2)
val rdd1 = rdd.keyBy(_%3)
val rdd2 = rdd1.groupByKey()
rdd2.collect

 

scala> val rdd = sc.parallelize(1 to 9 ,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val rdd1 = rdd.keyBy(_%3)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at keyBy at <console>:26

scala> val rdd2 = rdd1.groupByKey()
rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:28

scala> rdd2.collect
res1: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7)))

 

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
同groupByKey(),只是指定了分區數量。
scala> val rdd = sc.parallelize(1 to 9 ,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rdd1 = rdd.keyBy(_%3)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[6] at keyBy at <console>:26

scala> val rdd2 = rdd1.groupByKey(3)
rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[7] at groupByKey at <console>:28

scala> rdd2.partitions.length
res2: Int = 3

 

/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    key.toString.toInt%numPartitions
  }
}

val rdd = sc.parallelize(1 to 9 ,2)
val rdd1 = rdd.keyBy(_%3)
rdd1.collect
val rdd2 = rdd1.groupByKey(new MyPartitioner(2))
rdd2.collect
 
scala> class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
     |   override def numPartitions: Int = numParts
     |   override def getPartition(key: Any): Int = {
     |     key.toString.toInt%numPartitions
     |   }
     | }
defined class MyPartitioner

scala> val rdd = sc.parallelize(1 to 9 ,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> val rdd1 = rdd.keyBy(_%3)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[9] at keyBy at <console>:26

scala> val rdd2 = rdd1.groupByKey(new MyPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[10] at groupByKey at <console>:29

scala> rdd2.collect
res3: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7)))

scala> rdd1.collect
res4: Array[(Int, Int)] = Array((1,1), (2,2), (0,3), (1,4), (2,5), (0,6), (1,7), (2,8), (0,9))

scala> rdd2.partitions.length
res5: Int = 2
 

groupBy(func)

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

def func(x:Int) = {x%3}
val rdd = sc.parallelize(1 to 10,2)
val rdd1 = rdd.groupBy(func(_))
rdd1.collect
scala> def func(x:Int) = {x%3}
func: (x: Int)Int

scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[64] at parallelize at <console>:27

scala> val rdd1 = rdd.groupBy(func(_))
rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[66] at groupBy at <console>:31

scala> rdd1.collect
res29: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7, 10)))

 

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

同groupBy[K](f: T => K),只是指定了分區數量。

def func(x:Int) = {x%3}
val rdd = sc.parallelize(1 to 10,2)
val rdd1 = rdd.groupBy(func(_),3)
rdd1.collect

 

scala> def func(x:Int) = {x%3}
func: (x: Int)Int

scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> val rdd1 = rdd.groupBy(func(_),3)
rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[13] at groupBy at <console>:28

scala> rdd1.partitions.length
res6: Int = 3

scala> rdd1.collect
res7: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 4, 7, 10)), (2,CompactBuffer(2, 5, 8)))

 

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])]

class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    key.toString.toInt%numPartitions
  }
}
def func(x:Int) = {x%3}
val rdd = sc.parallelize(1 to 10,2)
val rdd1 = rdd.groupBy(func(_),new MyPartitioner(3))
rdd1.collect
rdd1.partitions.length

 

scala> class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
     |   override def numPartitions: Int = numParts
     |   override def getPartition(key: Any): Int = {
     |     key.toString.toInt%numPartitions
     |   }
     | }
defined class MyPartitioner

scala> def func(x:Int) = {x%3}
func: (x: Int)Int

scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> val rdd1 = rdd.groupBy(func(_),new MyPartitioner(3))
rdd1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[16] at groupBy at <console>:29

scala> rdd1.collect
res8: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 4, 7, 10)), (2,CompactBuffer(2, 5, 8)))

scala> rdd1.partitions.length
res9: Int = 3

 

reduceByKey(func, [numTasks])

reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. 
對每個key的全部value使用func函數進行聚合

/**

* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
val words = Array("one", "two", "two", "three", "three", "three")
val rdd = sc.parallelize(words).map(word => (word, 1))
val rdd1 = rdd.reduceByKey(_ + _)
rdd1.collect

 

scala> val words = Array("one", "two", "two", "three", "three", "three")  
words: Array[String] = Array(one, two, two, three, three, three)

scala> val rdd = sc.parallelize(words).map(word => (word, 1))  
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:26

scala> val rdd1 = rdd.reduceByKey(_ + _) 
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:28

scala> rdd1.collect
res10: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

 

/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
同上,只是指定了分區數量
scala> val rdd2 = rdd.reduceByKey(_ + _,3) 
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:28

scala> rdd2.collect
res11: Array[(String, Int)] = Array((two,2), (one,1), (three,3))                

scala> rdd2.partitions.length
res12: Int = 3
 
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
同上,使用partitioner自定義分區
相關文章
相關標籤/搜索