我的主頁zicesun.comes6
這裏,從源碼的角度總結一下Spark RDD算子的用法。shell
/** * Return a new RDD by applying a function to all elements of this RDD. */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
複製代碼
源碼中有一個 sc.clean()
函數,它的所用是去除閉包中不能序列話的外部引用變量。Scala支持閉包,閉包會把它對外的引用(閉包裏面引用了閉包外面的對像)保存到本身內部,這個閉包就能夠被單獨使用了,而不用擔憂它脫離了當前的做用域;可是在spark這種分佈式環境裏,這種做法會帶來問題,若是對外部的引用是不可serializable的,它就不能正確被髮送到worker節點上去了;還有一些引用,可能根本沒有用到,這些沒有使用到的引用是不須要被髮到worker上的; 實際上sc.clean函數調用的是ClosureCleaner.clean();ClosureCleaner.clean()經過遞歸遍歷閉包裏面的引用,檢查不能serializable的, 去除unused的引用;apache
map函數是一個粗粒度的操做,對於一個RDD來講,會使用迭代器對分區進行遍歷,而後針對一個分區使用你想要執行的操做f, 而後返回一個新的RDD。其實能夠理解爲rdd的每個元素都會執行一樣的操做。數組
scala> val array = Array(1,2,3,4,5,6)
array: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.app
appName applicationAttemptId applicationId
scala> val rdd = sc.parallelize(array, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val mapRdd = rdd.map(x => x * 2)
mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
scala> mapRdd.collect().foreach(println)
2
4
6
8
10
12
複製代碼
flatMap方法與map方法相似,可是容許一次map方法中輸出多個對象,而不是map中的一個對象通過函數轉換成另外一個對象。緩存
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
複製代碼
scala> val a = sc.parallelize(1 to 10, 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> a.flatMap(num => 1 to num).collect
res1: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
複製代碼
mapPartitions是map的另外一個實現,map的輸入函數應用與RDD的每一個元素,可是mapPartitions的輸入函數做用於每一個分區,也就是每一個分區的內容做爲總體。bash
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
複製代碼
scala> def myfunc[T](iter: Iterator[T]):Iterator[(T,T)]={
| var res = List[(T,T)]()
| var pre = iter.next
| while(iter.hasNext){
| var cur = iter.next
| res .::= (pre, cur)
| pre = cur
| }
| res.iterator
| }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
scala> val a = sc.parallelize(1 to 9,3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> a.mapPartitions
mapPartitions mapPartitionsWithIndex
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
複製代碼
mapPartitionWithIndex方法與mapPartitions方法相似,不一樣的是mapPartitionWithIndex會對原始分區的索引進行追蹤,這樣就能夠知道分區所對應的元素,方法的參數爲一個函數,函數的輸入爲整型索引和迭代器。閉包
/** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
複製代碼
scala> val x = sc.parallelize(1 to 10, 3)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> def myFunc(index:Int, iter:Iterator[Int]):Iterator[String]={
| iter.toList.map(x => index + "," + x).iterator
| }
myFunc: (index: Int, iter: Iterator[Int])Iterator[String]
scala> x.mapPartitions
mapPartitions mapPartitionsWithIndex
scala> x.mapPartitionsWithIndex(myFunc).collect
res1: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
複製代碼
foreach主要對每個輸入的數據對象執行循環操做,能夠用來執行對RDD元素的輸出操做。app
/** * Applies a function f to all elements of this RDD. */
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
複製代碼
scala> var x = sc.parallelize(List(1 to 9), 3)
x: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> x.foreach(print)
Range(1, 2, 3, 4, 5, 6, 7, 8, 9)
複製代碼
foreachPartition方法和mapPartition的做用同樣,經過迭代器參數對RDD中每個分區的數據對象應用函數,區別在於使用的參數是否有返回值。less
/** * Applies a function f to each partition of this RDD. */
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
複製代碼
scala> val b = sc.parallelize(List(1,2,3,4,5,6), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> b.foreachPartition(x => println(x.reduce((a,b) => a +b)))
7
3
11
複製代碼
glom的做用與collec相似,collect是將RDD直接轉化爲數組的形式,而glom則是將RDD分區數據組裝到數組類型的RDD中,每個返回的數組包含一個分區的全部元素,按分區轉化爲數組,有幾個分區就返回幾個數組類型的RDD。dom
/** * Return an RDD created by coalescing all elements within each partition into an array. */
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
複製代碼
下面的例子中,RDD a有三個分區,glom將a轉化爲由三個數組構成的RDD。
scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> a.glom.collect
res5: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
scala> a.glom
res6: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[4] at glom at <console>:26
複製代碼
union方法與++方法是等價的,將兩個RDD去並集,取並集的過程當中不會去重。
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */
def ++(other: RDD[T]): RDD[T] = withScope {
this.union(other)
}
複製代碼
scala> val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val b = sc.parallelize(2 to 5,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> a.un
union unpersist
scala> a.union(b).collect
res7: Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 5)
複製代碼
計算兩個RDD中每一個對象的笛卡爾積
/** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
複製代碼
cala> val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val b = sc.parallelize(2 to 5,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> a.cartesian(b).collect
res8: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5), (4,2), (4,3), (4,4), (4,5))
複製代碼
groupBy方法有三個重載方法,功能是講元素經過map函數生成Key-Value格式,而後使用groupByKey方法對Key-Value進行聚合。
/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
/** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy(f, new HashPartitioner(numPartitions))
}
/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
複製代碼
scala> val a = sc.parallelize(1 to 9,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> a.groupBy(x => {if(x % 2 == 0) "even" else "odd"}).collect
res9: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
scala> def myfunc(a: Int):Int={
| a % 2
| }
myfunc: (a: Int)Int
scala> a.groupBy(myfunc).collect
res10: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
scala> a.groupBy(myfunc(_), 1).collect
res11: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
複製代碼
filter方法對輸入元素進行過濾,參數是一個返回值爲boolean的函數,若是函數對元素的運算結果爲true,則經過元素,不然就將該元素過濾,不進入結果集。
/** * Return a new RDD containing only the elements that satisfy a predicate. */
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
複製代碼
scala> val a = sc.parallelize(List("we", "are", "from", "China", "not", "from", "America"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> val b = a.filter(x => x.length >= 4)
b: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at filter at <console>:25
scala> b.collect.foreach(println)
from
China
from
America
複製代碼
distinct方法將RDD中重複的元素去掉,只留下惟一的RDD元素。
/** * Return a new RDD containing the distinct elements in this RDD. */
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
複製代碼
scala> val a = sc.parallelize(List("we", "are", "from", "China", "not", "from", "America"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> val b = a.map(x => x.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:25
scala> val c = b.distinct
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at distinct at <console>:25
scala> c.foreach(println)
5
4
2
3
7
複製代碼
subtract方法就是求集合A-B,即把集合A中包含集合B的元素都刪除,返回剩下的元素。
/** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
/** * Return an RDD with the elements from `this` that are not in `other`. */
def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
subtract(other, new HashPartitioner(numPartitions))
}
/** * Return an RDD with the elements from `this` that are not in `other`. */
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
val p2 = new Partitioner() {
override def numPartitions: Int = p.numPartitions
override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
// anyway, and when calling .keys, will not have a partitioner set, even though
// the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
// partitioned by the right/real keys (e.g. p).
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
複製代碼
scala> val a = sc.parallelize(1 to 9, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> val b = sc.parallelize(2 to 5, 4)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at subtract at <console>:27
scala> c.collect
res14: Array[Int] = Array(6, 8, 1, 7, 9)
複製代碼
cache,緩存數據,把RDD緩存到內存中,以便下次計算式再次被調用。persist是把RDD根據不一樣的級別進行持久化,經過參數指定持久化級別,若是不帶參數則爲默認持久化級別,即只保存到內存中,與Cache等價。
sample方法的做用是隨即對RDD中的元素進行採樣,或得一個新的子RDD。根據參數制定是否放回採樣,子集佔總數的百分比和隨機種子。
/** * Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be greater * than or equal to 0 * @param seed seed for the random number generator * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[RDD]]. */
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")
withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
複製代碼
scala> val a = sc.parallelize(1 to 100, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
scala> val b = a.sample(false, 0.2, 0)
b: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[32] at sample at <console>:25
scala> b.foreach(println)
5
19
20
26
27
29
30
57
40
61
45
68
73
50
75
79
81
85
89
99
複製代碼
相似於groupBy,將每個相同的Key的Value彙集起來造成序列,能夠使用默認的分區器和自定義的分區器。
/** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. The ordering of elements within * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. * * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}
複製代碼
scala> val a = sc.parallelize(List("mk", "zq", "xwc", "fig", "dcp", "snn"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val b = a.keyBy(x => x.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at keyBy at <console>:25
scala> b.groupByKey.collect
res17: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fig, dcp, snn)))
複製代碼
comineByKey方法可以有效地講鍵值對形式的RDD相同的Key的Value合併成序列形式,用戶能自定義RDD的分區器和是否在Map端進行聚合操做。
/** * Generic function to combine the elements for each key using a custom set of aggregation * functions. This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see `combineByKeyWithClassTag` */
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
/** * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. * This method is here for backward compatibility. It does not provide combiner * classtag information to the shuffle. * * @see `combineByKeyWithClassTag` */
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}
複製代碼
scala> val a = sc.parallelize(List("xwc", "fig","wc", "dcp", "zq", "znn", "mk", "zl", "hk", "lp"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at parallelize at <console>:24
scala> val b = sc.parallelize(List(1,2,2,3,2,1,2,2,2,3),2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[38] at zip at <console>:27
scala> val d = c.combineByKey(List(_), (x:List[String], y:String)=>y::x, (x:List[String], y:List[String])=>x::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[39] at combineByKey at <console>:25
scala> d.collect
res18: Array[(Int, List[String])] = Array((2,List(zq, wc, fig, hk, zl, mk)), (1,List(xwc, znn)), (3,List(dcp, lp)))
複製代碼
上面的例子使用三個參數重載的方法,該方法的第一個參數createCombiner把元素V轉換成另外一類元素C,該例子中使用的參數是List(_),表示將輸入元素放在List集合中;第二個參數mergeValue的含義是吧元素V合併到元素C中,該例子中使用的是(x:List[String],y:String)=>y::x,表示將y字符合併到x鏈表集合中;第三個參數的含義是講兩個C元素合併,該例子中使用的是(x:List[String], y:List[String])=>x:::y, 表示把x鏈表集合中的內容合併到y鏈表中。
使用一個reduce函數來實現對想要的Key的value的聚合操做,發送給reduce前會在map端本地merge操做,該方法的底層實現是調用combineByKey方法的一個重載方法。
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
複製代碼
scala> val a = sc.parallelize(List("dcp","fjg","snn","wc", "za"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val b = a.map(x => (x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at map at <console>:25
scala> b.reduceByKey((a, b) => a + b ).collect
res1: Array[(Int, String)] = Array((2,wcza), (3,dcpfjgsnn))
複製代碼
根據Key值對鍵值對進行排序,若是是字符,則按照字典順序排序,若是是數組則按照數字大小排序,可經過參數指定升序仍是降序。
scala> val a = sc.parallelize(List("dcp","fjg","snn","wc", "za"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val b = sc.parallelize(1 to a.count.toInt,2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:26
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[6] at zip at <console>:27
scala> c.sortByKey(true).collect
res2: Array[(String, Int)] = Array((dcp,1), (fjg,2), (snn,3), (wc,4), (za,5))
複製代碼
scala> val a = sc.parallelize(List(1,2,2,3,1,3),2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val b = a.map(x => (x, "b"))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[11] at map at <console>:25
scala> val c = a.map(x => (x, "c"))
c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[12] at map at <console>:25
scala> b.cogroup(c).collect
res3: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(CompactBuffer(b, b),CompactBuffer(c, c))), (1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b, b),CompactBuffer(c, c))))
scala> val a = sc.parallelize(List(1,2,2,2,1,3),1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> val b = a.map(x => (x, "b"))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[16] at map at <console>:25
scala> val c = a.map(x => (x, "c"))
c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[17] at map at <console>:25
scala> b.cogroup(c).collect
res4: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b, b, b),CompactBuffer(c, c, c))))
複製代碼
首先對RDD進行cogroup操做,而後對每一個新的RDD下Key的值進行笛卡爾積操做,再返回結果使用flatmapValue方法。
scala> val a= sc.parallelize(List("fjg","wc","xwc"),2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> val c = sc.parallelize(List("fig", "wc", "sbb", "zq","xwc","dcp"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> val d = c.keyBy(_.length)
d: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[23] at keyBy at <console>:25
scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at keyBy at <console>:25
scala> b.join(d).collect
res6: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fig)), (3,(fjg,sbb)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fig)), (3,(xwc,sbb)), (3,(xwc,xwc)), (3,(xwc,dcp)))
複製代碼
把RDD中的元素以數組的形式返回。
/** * Return an array that contains all of the elements in this RDD. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
複製代碼
scala> val a = sc.parallelize(List("a", "b", "c"),2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> a.collect
res7: Array[String] = Array(a, b, c)
複製代碼
使用一個帶兩個參數的函數把元素進行彙集,返回一個元素的結果。該函數中的二元操做應該知足交換律和結合律,這樣才能在並行計算中獲得正確的計算結果。
/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
複製代碼
scala> val a = sc.parallelize(1 to 10, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> a.reduce((a, b) => a + b)
res8: Int = 55
複製代碼
take方法會從RDD中取出前n個元素。先掃描一個分區,以後從分區中獲得結果,而後評估該分區的元素是否知足n,若果不知足則繼續從其餘分區中掃描獲取。
/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */
def take(num: Int): Array[T] = withScope {
val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
val left = num - buf.size
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}
buf.toArray
}
}
複製代碼
scala> val a = sc.parallelize(1 to 10, 2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:24
scala> a.take(5)
res9: Array[Int] = Array(1, 2, 3, 4, 5)
複製代碼
top會採用隱式排序轉換來獲取最大的前n個元素。
/** * Returns the top k (largest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of * [[takeOrdered]]. For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1) * // returns Array(12) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) * // returns Array(6, 5) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
takeOrdered(num)(ord.reverse)
}
/** * Returns the first k (smallest) elements from this RDD as defined by the specified * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]]. * For example: * {{{ * sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1) * // returns Array(2) * * sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) * // returns Array(2, 3) * }}} * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements */
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= collectionUtils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}
複製代碼
scala> val c = sc.parallelize(Array(1,2,3,5,3,8,7,97,32),2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
scala> c.top(3)
res10: Array[Int] = Array(97, 32, 8)
複製代碼
count方法計算返回RDD中元素的個數。
/** * Return the number of elements in the RDD. */
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
複製代碼
scala> val c = sc.parallelize(Array(1,2,3,5,3,8,7,97,32),2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> c.count
res11: Long = 9
複製代碼
返回一個固定大小的數組形式的採樣子集,此外還會把返回元素的順序隨機打亂。
/** * Return a fixed-size sampled subset of this RDD in an array * * @param withReplacement whether sampling is done with replacement * @param num size of the returned sample * @param seed seed for the random number generator * @return sample of specified size in an array * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0
require(num >= 0, "Negative number of elements requested")
require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
"Cannot support a sample size > Int.MaxValue - " +
s"$numStDev * math.sqrt(Int.MaxValue)")
if (num == 0) {
new Array[T](0)
} else {
val initialCount = this.count()
if (initialCount == 0) {
new Array[T](0)
} else {
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
Utils.randomizeInPlace(this.collect(), rand)
} else {
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// If the first sample didn't turn out large enough, keep trying to take samples;
// this shouldn't happen often because we use a big multiplier for the initial size
var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples, rand).take(num)
}
}
}
}
複製代碼
scala> val c = sc.parallelize(Array(1,2,3,5,3,8,7,97,32),2)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> c.takeSample(true,3, 1)
res14: Array[Int] = Array(1, 3, 7)
複製代碼
將RDD存儲爲文本文件,一次存一行
相似count,可是countByKey會根據Key計算對應的Value個數,返回Map類型的結果。
/** * Count the number of elements for each key, collecting the results to a local Map. * * @note This method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver's memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
複製代碼
scala> val c = sc.parallelize(List("fig", "wc", "sbb", "zq","xwc","dcp"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at parallelize at <console>:24
scala> val d = c.keyBy(_.length)
d: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[37] at keyBy at <console>:25
scala> d.countByKey
res15: scala.collection.Map[Int,Long] = Map(2 -> 2, 3 -> 4)
複製代碼
/** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. * * @param zeroValue the initial value for the accumulated result of each partition for the * `seqOp` operator, and also the initial value for the combine results from * different partitions for the `combOp` operator - this will typically be the * neutral element (e.g. `Nil` for list concatenation or `0` for summation) * @param seqOp an operator used to accumulate results within a partition * @param combOp an associative operator used to combine results from different partitions */
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
複製代碼
/** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative function and a neutral "zero value". The function * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object * allocation; however, it should not modify t2. * * This behaves somewhat differently from fold operations implemented for non-distributed * collections in functional languages like Scala. This fold operation may be applied to * partitions individually, and then fold those results into the final result, rather than * apply the fold to each element sequentially in some defined ordering. For functions * that are not commutative, the result may differ from that of a fold applied to a * non-distributed collection. * * @param zeroValue the initial value for the accumulated result of each partition for the `op` * operator, and also the initial value for the combine results from different * partitions for the `op` operator - this will typically be the neutral * element (e.g. `Nil` for list concatenation or `0` for summation) * @param op an operator used to both accumulate results within a partition and combine results * from different partitions */
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
複製代碼