def union(other: RDD[T]): RDD[T]函數
該函數比較簡單,就是將兩個RDD進行合併,不去重。ui
def main(args: Array[String]): Unit = { //默認分區12個 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 4, 2) var rdd2 = sc.makeRDD(4 to 8, 1) rdd1.union(rdd2).collect.foreach(println(_)) }
16/12/20 10:28:28 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 0.645257 s
1
2
3
4
4
5
6
7
8
16/12/20 10:28:28 INFO SparkContext: Invoking stop() from shutdown hookspa
intersectionscala
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]ci
該函數返回兩個RDD的交集,而且去重。
參數numPartitions指定返回的RDD的分區數。
參數partitioner用於指定分區函數it
def main(args: Array[String]): Unit = { //默認分區12個 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 4, 2) var rdd2 = sc.makeRDD(3 to 8, 3) var rdd3 = rdd1.intersection(rdd2, 4) rdd3.collect.foreach(println(_)) println("RDD partition size:" + rdd3.partitions.size) }
16/12/20 10:35:08 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:25, took 1.333698 s
16/12/20 10:35:08 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.56.1:50350 in memory (size: 1667.0 B, free: 900.6 MB)
4
3
RDD partition size:4
16/12/20 10:35:08 INFO SparkContext: Invoking stop() from shutdown hookspark
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]io
該函數相似於intersection,但返回在RDD中出現,而且不在otherRDD中出現的元素,不去重。
參數含義同intersectionast
def main(args: Array[String]): Unit = { //默認分區12個 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 4, 2) var rdd2 = sc.makeRDD(3 to 8, 3) var rdd3 = rdd1.subtract(rdd2, 4) rdd3.collect.foreach(println(_)) println("RDD partition size:" + rdd3.partitions.size) }
16/12/20 10:37:02 INFO DAGScheduler: ResultStage 2 (collect at ShellTest.scala:24) finished in 0.124 s
16/12/20 10:37:02 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 1.173144 s
1
2
RDD partition size:4
16/12/20 10:37:02 INFO SparkContext: Invoking stop() from shutdown hooktest