Spark算子:RDD基本轉換操做(4)–union、intersection、subtract

union

def union(other: RDD[T]): RDD[T]函數

該函數比較簡單,就是將兩個RDD進行合併,不去重ui

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 4, 2)
  var rdd2 = sc.makeRDD(4 to 8, 1)
  rdd1.union(rdd2).collect.foreach(println(_))
}

 

16/12/20 10:28:28 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 0.645257 s
1
2
3
4
4
5
6
7
8

16/12/20 10:28:28 INFO SparkContext: Invoking stop() from shutdown hookspa

intersectionscala

def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]ci

該函數返回兩個RDD的交集,而且去重
參數numPartitions指定返回的RDD的分區數。
參數partitioner用於指定分區函數it

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 4, 2)
  var rdd2 = sc.makeRDD(3 to 8, 3)
  var rdd3 = rdd1.intersection(rdd2, 4)
  rdd3.collect.foreach(println(_))
  println("RDD partition size:" + rdd3.partitions.size)
}

 

16/12/20 10:35:08 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:25, took 1.333698 s
16/12/20 10:35:08 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.56.1:50350 in memory (size: 1667.0 B, free: 900.6 MB)
4
3
RDD partition size:4

16/12/20 10:35:08 INFO SparkContext: Invoking stop() from shutdown hookspark

 

subtract

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]io

該函數相似於intersection,但返回在RDD中出現,而且不在otherRDD中出現的元素,不去重
參數含義同intersectionast

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 4, 2)
  var rdd2 = sc.makeRDD(3 to 8, 3)
  var rdd3 = rdd1.subtract(rdd2, 4)
  rdd3.collect.foreach(println(_))
  println("RDD partition size:" + rdd3.partitions.size)
}

16/12/20 10:37:02 INFO DAGScheduler: ResultStage 2 (collect at ShellTest.scala:24) finished in 0.124 s
16/12/20 10:37:02 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 1.173144 s
1
2
RDD partition size:4

16/12/20 10:37:02 INFO SparkContext: Invoking stop() from shutdown hooktest

相關文章
相關標籤/搜索