Spark學習之Spark RDD算子

我的主頁zicesun.comes6

這裏,從源碼的角度總結一下Spark RDD算子的用法。shell

單值型Transformation算子

map

/** * Return a new RDD by applying a function to all elements of this RDD. */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
複製代碼

源碼中有一個 sc.clean() 函數,它的所用是去除閉包中不能序列話的外部引用變量。Scala支持閉包,閉包會把它對外的引用(閉包裏面引用了閉包外面的對像)保存到本身內部,這個閉包就能夠被單獨使用了,而不用擔憂它脫離了當前的做用域;可是在spark這種分佈式環境裏,這種做法會帶來問題,若是對外部的引用是不可serializable的,它就不能正確被髮送到worker節點上去了;還有一些引用,可能根本沒有用到,這些沒有使用到的引用是不須要被髮到worker上的; 實際上sc.clean函數調用的是ClosureCleaner.clean();ClosureCleaner.clean()經過遞歸遍歷閉包裏面的引用,檢查不能serializable的, 去除unused的引用;apache

map函數是一個粗粒度的操做,對於一個RDD來講,會使用迭代器對分區進行遍歷,而後針對一個分區使用你想要執行的操做f, 而後返回一個新的RDD。其實能夠理解爲rdd的每個元素都會執行一樣的操做。數組

scala> val array = Array(1,2,3,4,5,6)
array: Array[Int] = Array(1, 2, 3, 4, 5, 6)
 scala> val rdd = sc.app
appName   applicationAttemptId   applicationId
 scala> val rdd = sc.parallelize(array, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
 scala> val mapRdd = rdd.map(x => x * 2) 
mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
 scala> mapRdd.collect().foreach(println)
2
4
6
8
10
12
複製代碼

flatMap

flatMap方法與map方法相似,可是容許一次map方法中輸出多個對象,而不是map中的一個對象通過函數轉換成另外一個對象。緩存

/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }
複製代碼
scala> val a = sc.parallelize(1 to 10, 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
 scala> a.flatMap(num => 1 to num).collect
res1: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

複製代碼

mapPartitions

mapPartitions是map的另外一個實現,map的輸入函數應用與RDD的每一個元素,可是mapPartitions的輸入函數做用於每一個分區,也就是每一個分區的內容做爲總體。bash

/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }
複製代碼
scala> def myfunc[T](iter: Iterator[T]):Iterator[(T,T)]={ 
     | var res = List[(T,T)]()
     | var pre = iter.next
     | while(iter.hasNext){
     |   var cur = iter.next
     |   res .::= (pre, cur)
     |   pre = cur
     | }
     | res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
 scala> val a = sc.parallelize(1 to 9,3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
 scala> a.mapPartitions
mapPartitions   mapPartitionsWithIndex
 scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
複製代碼

mapPartitionWithIndex

mapPartitionWithIndex方法與mapPartitions方法相似,不一樣的是mapPartitionWithIndex會對原始分區的索引進行追蹤,這樣就能夠知道分區所對應的元素,方法的參數爲一個函數,函數的輸入爲整型索引和迭代器。閉包

/** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
      preservesPartitioning)
  }
複製代碼
scala> val x = sc.parallelize(1 to 10, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
 scala> def myFunc(index:Int, iter:Iterator[Int]):Iterator[String]={
     |   iter.toList.map(x => index + "," + x).iterator
     | }
myFunc: (index: Int, iter: Iterator[Int])Iterator[String]
 scala> x.mapPartitions
mapPartitions   mapPartitionsWithIndex
 scala> x.mapPartitionsWithIndex(myFunc).collect
res1: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
複製代碼

foreach

foreach主要對每個輸入的數據對象執行循環操做,能夠用來執行對RDD元素的輸出操做。app

/** * Applies a function f to all elements of this RDD. */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }
複製代碼
scala> var x = sc.parallelize(List(1 to 9), 3)
x: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[5] at parallelize at <console>:24
 scala> x.foreach(print)
Range(1, 2, 3, 4, 5, 6, 7, 8, 9)
複製代碼

foreachPartition

foreachPartition方法和mapPartition的做用同樣,經過迭代器參數對RDD中每個分區的數據對象應用函數,區別在於使用的參數是否有返回值。less

/** * Applies a function f to each partition of this RDD. */
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }
複製代碼
scala> val b = sc.parallelize(List(1,2,3,4,5,6), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
 scala> b.foreachPartition(x => println(x.reduce((a,b) => a +b)))
7
3
11
複製代碼

glom

glom的做用與collec相似,collect是將RDD直接轉化爲數組的形式,而glom則是將RDD分區數據組裝到數組類型的RDD中,每個返回的數組包含一個分區的全部元素,按分區轉化爲數組,有幾個分區就返回幾個數組類型的RDD。dom

/** * Return an RDD created by coalescing all elements within each partition into an array. */
   
  def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }
複製代碼

下面的例子中,RDD a有三個分區,glom將a轉化爲由三個數組構成的RDD。

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
 scala> a.glom.collect
res5: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
 scala> a.glom
res6: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[4] at glom at <console>:26
複製代碼

union

union方法與++方法是等價的,將兩個RDD去並集,取並集的過程當中不會去重。

/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */
  def union(other: RDD[T]): RDD[T] = withScope {
    sc.union(this, other)
  }

  /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */
  def ++(other: RDD[T]): RDD[T] = withScope {
    this.union(other)
  }
複製代碼
scala> val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
 scala> val b = sc.parallelize(2 to 5,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
 scala> a.un
union   unpersist
 scala> a.union(b).collect
res7: Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 5)

複製代碼

cartesian

計算兩個RDD中每一個對象的笛卡爾積

/** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
    new CartesianRDD(sc, this, other)
  }

複製代碼
cala> val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
 scala> val b = sc.parallelize(2 to 5,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
 scala> a.cartesian(b).collect
res8: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5), (4,2), (4,3), (4,4), (4,5))


複製代碼

groupBy

groupBy方法有三個重載方法,功能是講元素經過map函數生成Key-Value格式,而後使用groupByKey方法對Key-Value進行聚合。

/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy[K](f, defaultPartitioner(this))
  }

  /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
  def groupBy[K](
      f: T => K,
      numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy(f, new HashPartitioner(numPartitions))
  }

  /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }

複製代碼
scala> val a = sc.parallelize(1 to 9,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
 scala> a.groupBy(x => {if(x % 2 == 0) "even" else "odd"}).collect
res9: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
 scala> def myfunc(a: Int):Int={
     | a % 2
     | }
myfunc: (a: Int)Int
 scala> a.groupBy(myfunc).collect
res10: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
 scala> a.groupBy(myfunc(_), 1).collect
res11: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))


複製代碼

filter

filter方法對輸入元素進行過濾,參數是一個返回值爲boolean的函數,若是函數對元素的運算結果爲true,則經過元素,不然就將該元素過濾,不進入結果集。

/** * Return a new RDD containing only the elements that satisfy a predicate. */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

複製代碼
scala> val a = sc.parallelize(List("we", "are", "from", "China", "not", "from", "America"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24
 scala> val b = a.filter(x => x.length >= 4)
b: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at filter at <console>:25
 scala> b.collect.foreach(println)
from
China
from
America

複製代碼

distinct

distinct方法將RDD中重複的元素去掉,只留下惟一的RDD元素。

/** * Return a new RDD containing the distinct elements in this RDD. */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

複製代碼
scala> val a = sc.parallelize(List("we", "are", "from", "China", "not", "from", "America"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24
 scala> val b = a.map(x => x.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:25
 scala> val c = b.distinct
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at distinct at <console>:25
 scala> c.foreach(println)
5
4
2
3
7

複製代碼

subtract

subtract方法就是求集合A-B,即把集合A中包含集合B的元素都刪除,返回剩下的元素。

/** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be &lt;= us. */
  def subtract(other: RDD[T]): RDD[T] = withScope {
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
  }

  /** * Return an RDD with the elements from `this` that are not in `other`. */
  def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
    subtract(other, new HashPartitioner(numPartitions))
  }

  /** * Return an RDD with the elements from `this` that are not in `other`. */
  def subtract(
      other: RDD[T],
      p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    if (partitioner == Some(p)) {
      // Our partitioner knows how to handle T (which, since we have a partitioner, is
      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
      val p2 = new Partitioner() {
        override def numPartitions: Int = p.numPartitions
        override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
      }
      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
      // anyway, and when calling .keys, will not have a partitioner set, even though
      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
      // partitioned by the right/real keys (e.g. p).
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
  }

複製代碼
scala> val a = sc.parallelize(1 to 9, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
 scala> val b = sc.parallelize(2 to 5, 4)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
 scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at subtract at <console>:27
 scala> c.collect
res14: Array[Int] = Array(6, 8, 1, 7, 9)

複製代碼

persist與cache

cache,緩存數據,把RDD緩存到內存中,以便下次計算式再次被調用。persist是把RDD根據不一樣的級別進行持久化,經過參數指定持久化級別,若是不帶參數則爲默認持久化級別,即只保存到內存中,與Cache等價。

sample

sample方法的做用是隨即對RDD中的元素進行採樣,或得一個新的子RDD。根據參數制定是否放回採樣,子集佔總數的百分比和隨機種子。

/** * Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be greater * than or equal to 0 * @param seed seed for the random number generator * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[RDD]]. */
  def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = {
    require(fraction >= 0,
      s"Fraction must be nonnegative, but got ${fraction}")

    withScope {
      require(fraction >= 0.0, "Negative fraction value: " + fraction)
      if (withReplacement) {
        new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
      } else {
        new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
      }
    }
  }

複製代碼
scala> val a = sc.parallelize(1 to 100, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
 scala> val b = a.sample(false, 0.2, 0)
b: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[32] at sample at <console>:25
 scala> b.foreach(println)
5
19
20
26
27
29
30
57
40
61
45
68
73
50
75
79
81
85
89
99

複製代碼

鍵值對型transformation算子

groupByKey

相似於groupBy,將每個相同的Key的Value彙集起來造成序列,能夠使用默認的分區器和自定義的分區器。

/** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

  /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. The ordering of elements within * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }

複製代碼
scala> val a = sc.parallelize(List("mk", "zq", "xwc", "fig", "dcp", "snn"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24
 scala> val b = a.keyBy(x => x.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at keyBy at <console>:25
 scala> b.groupByKey.collect
res17: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fig, dcp, snn)))


複製代碼

combineByKey

comineByKey方法可以有效地講鍵值對形式的RDD相同的Key的Value合併成序列形式,用戶能自定義RDD的分區器和是否在Map端進行聚合操做。

/** * Generic function to combine the elements for each key using a custom set of aggregation * functions. This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see `combineByKeyWithClassTag` */
  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      partitioner, mapSideCombine, serializer)(null)
  }

  /** * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. * This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see `combineByKeyWithClassTag` */
  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
  }

複製代碼
scala> val a = sc.parallelize(List("xwc", "fig","wc", "dcp", "zq", "znn", "mk", "zl", "hk", "lp"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at parallelize at <console>:24
 scala> val b = sc.parallelize(List(1,2,2,3,2,1,2,2,2,3),2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
 scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[38] at zip at <console>:27
 scala> val d = c.combineByKey(List(_), (x:List[String], y:String)=>y::x, (x:List[String], y:List[String])=>x::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[39] at combineByKey at <console>:25
 scala> d.collect
res18: Array[(Int, List[String])] = Array((2,List(zq, wc, fig, hk, zl, mk)), (1,List(xwc, znn)), (3,List(dcp, lp)))


複製代碼

上面的例子使用三個參數重載的方法,該方法的第一個參數createCombiner把元素V轉換成另外一類元素C,該例子中使用的參數是List(_),表示將輸入元素放在List集合中;第二個參數mergeValue的含義是吧元素V合併到元素C中,該例子中使用的是(x:List[String],y:String)=>y::x,表示將y字符合併到x鏈表集合中;第三個參數的含義是講兩個C元素合併,該例子中使用的是(x:List[String], y:List[String])=>x:::y, 表示把x鏈表集合中的內容合併到y鏈表中。

reduceByKey

使用一個reduce函數來實現對想要的Key的value的聚合操做,發送給reduce前會在map端本地merge操做,該方法的底層實現是調用combineByKey方法的一個重載方法。

/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

  /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

  /** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

複製代碼
scala> val a = sc.parallelize(List("dcp","fjg","snn","wc", "za"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
 scala> val b = a.map(x => (x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at map at <console>:25
 scala> b.reduceByKey((a, b) => a + b ).collect
res1: Array[(Int, String)] = Array((2,wcza), (3,dcpfjgsnn))

複製代碼

sortByKey

根據Key值對鍵值對進行排序,若是是字符,則按照字典順序排序,若是是數組則按照數字大小排序,可經過參數指定升序仍是降序。

scala> val a = sc.parallelize(List("dcp","fjg","snn","wc", "za"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
 scala> val b = sc.parallelize(1 to a.count.toInt,2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:26
 scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[6] at zip at <console>:27
 scala> c.sortByKey(true).collect
res2: Array[(String, Int)] = Array((dcp,1), (fjg,2), (snn,3), (wc,4), (za,5))

複製代碼

cogroup

scala> val a = sc.parallelize(List(1,2,2,3,1,3),2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val b = a.map(x => (x, "b"))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[11] at map at <console>:25

scala> val c = a.map(x => (x, "c"))
c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[12] at map at <console>:25

scala> b.cogroup(c).collect
res3: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(CompactBuffer(b, b),CompactBuffer(c, c))), (1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b, b),CompactBuffer(c, c))))




scala> val a = sc.parallelize(List(1,2,2,2,1,3),1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> val b = a.map(x => (x, "b"))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[16] at map at <console>:25

scala> val c = a.map(x => (x, "c"))
c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:25

scala> b.cogroup(c).collect
res4: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b, b, b),CompactBuffer(c, c, c))))


複製代碼

join

首先對RDD進行cogroup操做,而後對每一個新的RDD下Key的值進行笛卡爾積操做,再返回結果使用flatmapValue方法。

scala> val a= sc.parallelize(List("fjg","wc","xwc"),2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24 

scala> val c = sc.parallelize(List("fig", "wc", "sbb", "zq","xwc","dcp"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> val d = c.keyBy(_.length)
d: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[23] at keyBy at <console>:25

scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at keyBy at <console>:25

scala> b.join(d).collect
res6: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fig)), (3,(fjg,sbb)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fig)), (3,(xwc,sbb)), (3,(xwc,xwc)), (3,(xwc,dcp)))

複製代碼

Action算子

collect

把RDD中的元素以數組的形式返回。

/** * Return an array that contains all of the elements in this RDD. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

複製代碼
scala> val a = sc.parallelize(List("a", "b", "c"),2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24
 scala> a.collect
res7: Array[String] = Array(a, b, c)


複製代碼

reduce

使用一個帶兩個參數的函數把元素進行彙集,返回一個元素的結果。該函數中的二元操做應該知足交換律和結合律,這樣才能在並行計算中獲得正確的計算結果。

/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */
  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

複製代碼
scala> val a = sc.parallelize(1 to 10, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
 scala> a.reduce((a, b) => a + b)
res8: Int = 55

複製代碼

take

take方法會從RDD中取出前n個元素。先掃描一個分區,以後從分區中獲得結果,而後評估該分區的元素是否知足n,若果不知足則繼續從其餘分區中掃描獲取。

/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */
  def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }
      buf.toArray
    }
  }

複製代碼
scala> val a = sc.parallelize(1 to 10, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:24
 scala> a.take(5)
res9: Array[Int] = Array(1, 2, 3, 4, 5)

複製代碼

top

top會採用隱式排序轉換來獲取最大的前n個元素。

/** * Returns the top k (largest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of * [[takeOrdered]]. For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1) * // returns Array(12) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) * // returns Array(6, 5) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */
  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    takeOrdered(num)(ord.reverse)
  }
  /** * Returns the first k (smallest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]]. * For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1) * // returns Array(2) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) * // returns Array(2, 3) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements */
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
  }

複製代碼
scala> val c = sc.parallelize(Array(1,2,3,5,3,8,7,97,32),2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
 scala> c.top(3)
res10: Array[Int] = Array(97, 32, 8)

複製代碼

count

count方法計算返回RDD中元素的個數。

/** * Return the number of elements in the RDD. */
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

複製代碼
scala> val c = sc.parallelize(Array(1,2,3,5,3,8,7,97,32),2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24
 scala> c.count
res11: Long = 9


複製代碼

takeSample

返回一個固定大小的數組形式的採樣子集,此外還會把返回元素的順序隨機打亂。

/** * Return a fixed-size sampled subset of this RDD in an array * * @param withReplacement whether sampling is done with replacement * @param num size of the returned sample * @param seed seed for the random number generator * @return sample of specified size in an array * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */
  def takeSample(
      withReplacement: Boolean,
      num: Int,
      seed: Long = Utils.random.nextLong): Array[T] = withScope {
    val numStDev = 10.0

    require(num >= 0, "Negative number of elements requested")
    require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
      "Cannot support a sample size > Int.MaxValue - " +
      s"$numStDev * math.sqrt(Int.MaxValue)")

    if (num == 0) {
      new Array[T](0)
    } else {
      val initialCount = this.count()
      if (initialCount == 0) {
        new Array[T](0)
      } else {
        val rand = new Random(seed)
        if (!withReplacement && num >= initialCount) {
          Utils.randomizeInPlace(this.collect(), rand)
        } else {
          val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
            withReplacement)
          var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

          // If the first sample didn't turn out large enough, keep trying to take samples;
          // this shouldn't happen often because we use a big multiplier for the initial size
          var numIters = 0
          while (samples.length < num) {
            logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
            samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
            numIters += 1
          }
          Utils.randomizeInPlace(samples, rand).take(num)
        }
      }
    }
  }

複製代碼
scala> val c = sc.parallelize(Array(1,2,3,5,3,8,7,97,32),2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
 scala> c.takeSample(true,3, 1)
res14: Array[Int] = Array(1, 3, 7)

複製代碼

saveAsTextFile

將RDD存儲爲文本文件,一次存一行

countByKey

相似count,可是countByKey會根據Key計算對應的Value個數,返回Map類型的結果。

/** * Count the number of elements for each key, collecting the results to a local Map. * * @note This method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */
  def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }

複製代碼
scala> val c = sc.parallelize(List("fig", "wc", "sbb", "zq","xwc","dcp"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at parallelize at <console>:24
 scala> val d = c.keyBy(_.length)
d: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[37] at keyBy at <console>:25
 scala> d.countByKey
res15: scala.collection.Map[Int,Long] = Map(2 -> 2, 3 -> 4)


複製代碼

aggregate

/** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. * * @param zeroValue the initial value for the accumulated result of each partition for the * `seqOp` operator, and also the initial value for the combine results from * different partitions for the `combOp` operator - this will typically be the * neutral element (e.g. `Nil` for list concatenation or `0` for summation) * @param seqOp an operator used to accumulate results within a partition * @param combOp an associative operator used to combine results from different partitions */
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

複製代碼

fold

/** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative function and a neutral "zero value". The function * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * allocation; however, it should not modify t2. * * This behaves somewhat differently from fold operations implemented for non-distributed * collections in functional languages like Scala. This fold operation may be applied to * partitions individually, and then fold those results into the final result, rather than * apply the fold to each element sequentially in some defined ordering. For functions * that are not commutative, the result may differ from that of a fold applied to a * non-distributed collection. * * @param zeroValue the initial value for the accumulated result of each partition for the `op` * operator, and also the initial value for the combine results from different * partitions for the `op` operator - this will typically be the neutral * element (e.g. `Nil` for list concatenation or `0` for summation) * @param op an operator used to both accumulate results within a partition and combine results * from different partitions */
  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

複製代碼
相關文章
相關標籤/搜索