Spark RDD Transformation 簡單用例(三)

cache和persist

將RDD數據進行存儲,persist(newLevel: StorageLevel)設置了存儲級別,cache()和persist()是相同的,存儲級別爲MEMORY_ONLY。由於RDD的transformation是lazy的,只有action算子纔會觸發transformain真正的執行,若是一個rdd須要進行屢次的action算子操做,最好可以使用cache或persist將rdd緩存至內存中,這樣除第一次action會觸發transformation操做,後面的action算子都不會再次觸發transformation操做。apache

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1) /*複製份數,默認爲1*/ extends Externalizable 
    
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type

/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type
 

mapValues(func)

元素是key-value對的RDD的每個元素的value通過func映射(key不變)構建一個新的RDD
/**

* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)]
val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
val rdd1 = rdd.mapValues(x=>1L)
rdd1.foreachPartition(it=>{
     while(it.hasNext){
       println(it.next)
     }
   println("================")
 }
)

 

scala> val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd1 = rdd.mapValues(x=>1L)
rdd1: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[2] at mapValues at <console>:26

scala> rdd1.foreachPartition(it=>{
     | while(it.hasNext){
     | println(it.next)
     | }
     | println("================")
     | }
     | )
(1,1)
(1,1)
================
(1,1)
(2,1)
================
(2,1)
(2,1)
================

以上就是將(1,1),(1,2),(1,3),(2,1),(2,2),(2,3)中的value從新賦值爲1,變爲(1,1),(1,1),(1,1),(2,1),(2,1),(2,1)。下面使用reduceByKey計算每個key出現的次數。數組

scala> val rdd1 = rdd.mapValues(x=>1L).reduceByKey(_ + _)
rdd1: org.apache.spark.rdd.RDD[(Int, Long)] = ShuffledRDD[4] at reduceByKey at <console>:26

scala> rdd1.collect.toMap
res4: scala.collection.immutable.Map[Int,Long] = Map(1 -> 3, 2 -> 3)

其實以上操做就是action算子countByKey()的實現。 緩存

collect(f: PartialFunction[T, U])

/**
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

 PartialFunction[T,U]是個什麼東東呢?看一下PartialFunction的apply函數,即須要定義一個f(x)函數,入參類型爲A,結果輸出類型爲Bapp

/** Converts ordinary function to partial one
 *  @since   2.10
 */
def apply[A, B](f: A => B): PartialFunction[A, B] = { case x => f(x) }

 

val f : PartialFunction[Int,String] = {case 0 => "Sunday"
    case 1 => "Monday"
    case 2 => "Tuesday"
    case 3 => "Wednesday"
    case 4 => "Thursday"
    case 5 => "Friday"
    case 6 => "Saturday"
    case _ => "Unknown"
}

val rdd = sc.parallelize(0 to 9)
rdd.collect(f).collect

res3: Array[String] = Array(Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Unknown, Unknown, Unknown)

 

glom()

將每一個partition中的元素合併成一個數組
/**

* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]]
scala> val rdd = sc.parallelize(1 to 9,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> rdd.glom
res7: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[10] at glom at <console>:27

scala> rdd.glom.collect
res8: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

 

subtract(other: RDD[T])

返回other中不存在的元素組成新的RDD,分區屬性若是沒有指定,則和原RDD保持一致。
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be &lt;= us.
*/
def subtract(other: RDD[T]): RDD[T]
val rdd1 = sc.parallelize(1 to 10,2)
val rdd2 = sc.parallelize(5 to 20,3)
val rdd = rdd1.subtract(rdd2)
rdd.collect
rdd.partitions.length

 

scala> val rdd1 = sc.parallelize(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 20,3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd = rdd1.subtract(rdd2)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at subtract at <console>:28

scala> rdd.collect
res0: Array[Int] = Array(2, 4, 1, 3)                                            

scala> rdd.partitions.length
res2: Int = 2

 

指定分區數量ide

/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
scala> val rdd = rdd1.subtract(rdd2,5)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at subtract at <console>:28

scala> rdd.partitions.length
res3: Int = 5

 

自定義分區屬性partitioner函數

 

/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    key.toString.toInt%numPartitions
  }
}

val rdd = rdd1.subtract(rdd2,new MyPartitioner(3))

rdd.foreachPartition(
 x=>{
   while(x.hasNext){
     println(x.next)
   }
   println("============")
 }
 )

 

scala> class MyPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
     |   override def numPartitions: Int = numParts
     |   override def getPartition(key: Any): Int = {
     |     key.toString.toInt%numPartitions
     |   }
     | }
defined class MyPartitioner

scala> val rdd = rdd1.subtract(rdd2,new MyPartitioner(3))
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at subtract at <console>:29

scala> rdd.foreachPartition(
     |  x=>{
     |    while(x.hasNext){
     |      println(x.next)
     |    }
     |    println("============")
     |  }
     |  )
3
============
1
4
============
2
============

zip

兩個RDD對應位置(按順序)組成key-value對建立新的RDD,兩個RDD的元素在每一個分區中數量必須相同,partition數量必須相同。
/**

* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
val rdd1 = sc.parallelize(1 to 5,2)
val rdd2 = sc.parallelize(List("one","two","three","four","five"),2)
val rdd = rdd1.zip(rdd2)
rdd.collect
scala> val rdd1 = sc.parallelize(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List("one","two","three","four","five"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> val rdd = rdd1.zip(rdd2)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[16] at zip at <console>:28

scala> rdd.collect
res5: Array[(Int, String)] = Array((1,one), (2,two), (3,three), (4,four), (5,five))

 

combineByKey

/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level. This method is here for backward compatibility. It
* does not provide combiner classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]

var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd.combineByKey(
  (v : Int) => v + "$",   
  (c : String, v : Int) => c + "@" + v,  
  (c1 : String, c2 : String) => c1 + "||" + c2
).collect
scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> rdd.combineByKey(
     |   (v : Int) => v + "$",   
     |   (c : String, v : Int) => c + "@" + v,  
     |   (c1 : String, c2 : String) => c1 + "||" + c2
     | ).collect
res20: Array[(String, String)] = Array((B,1$@2), (A,1$@2@3), (C,1$))

 

沒看明白啊!沒看明白啊!沒看明白啊!沒看明白啊!沒看明白啊!this

flatMapValues

與flatMap相似,只是value通過函數f映射後獲得1個或多個元素與key組成新的key-value,而後建立新的RDD。
/**

* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd.flatMapValues(x => { x to 3}).collect

 

scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24


scala> rdd.flatMapValues(x => { x to 3})
res24: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at flatMapValues at <console>:27

scala> rdd.flatMapValues(x => { x to 3}).collect
res25: Array[(String, Int)] = Array((A,1), (A,2), (A,3), (A,2), (A,3), (A,3), (B,1), (B,2), (B,3), (B,2), (B,3), (C,1), (C,2), (C,3))

 

foldByKey

對每個key的value進行聚合運算,其中zeroValue會與每個key組成一個key-value對參與運算。
/**

* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd.foldByKey(0)(_+_)
rdd.foldByKey(0)(_+_).collect
rdd.foldByKey(1)(_+_)
rdd.foldByKey(1)(_+_).collect
scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at parallelize at <console>:24

scala> rdd.foldByKey(0)(_+_)
res26: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[36] at foldByKey at <console>:27

scala> rdd.foldByKey(0)(_+_).collect
res27: Array[(String, Int)] = Array((B,3), (A,6), (C,1))

scala> rdd.foldByKey(1)(_+_)
res28: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[38] at foldByKey at <console>:27

scala> rdd.foldByKey(1)(_+_).collect
res29: Array[(String, Int)] = Array((B,4), (A,7), (C,2))

 

keys

返回key-value的全部key.
/**

* Return an RDD with the keys of each tuple.
*/
def keys: RDD[K] = self.map(_._1)
scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24

scala> rdd.keys.collect
res32: Array[String] = Array(A, A, A, B, B, C)

values

返回key-value的全部value.

/**
* Return an RDD with the values of each tuple.
*/
def values: RDD[V] = self.map(_._2)
scala> var rdd = sc.parallelize(Array(("A",1),("A",2),("A",3),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> rdd.values
res33: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[45] at values at <console>:27

scala> rdd.values.collect
res34: Array[Int] = Array(1, 2, 3, 1, 2, 1)
相關文章
相關標籤/搜索