Spark算子:RDD鍵值轉換操做(4)–cogroup、join

cogroup

##參數爲1個RDD函數

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]ui

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]spa

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]it

 

##參數爲2個RDDspark

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]io

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]ast

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]test

 

##參數爲3個RDDforeach

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]d3

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

 

cogroup至關於SQL中的全外關聯full outer join,返回左右RDD中的記錄,關聯不上的爲空。

參數numPartitions用於指定結果的分區數。

參數partitioner用於指定分區函數。

##參數爲1個RDD的例子

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (3, "C")))
  val rdd2 = sc.makeRDD(Array((1, "A"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B")))
  rdd1.cogroup(rdd2).foreach(println(_))
}

16/12/20 16:54:05 INFO Executor: Finished task 1.0 in stage 2.0 (TID 25). 1553 bytes result sent to driver
(1,(CompactBuffer(A),CompactBuffer(A, B)))
(2,(CompactBuffer(B),CompactBuffer(D)))

(3,(CompactBuffer(C, C),CompactBuffer(B, E)))
(4,(CompactBuffer(),CompactBuffer(A)))
16/12/20 16:54:05 INFO Executor: Finished task 4.0 in stage 2.0 (TID 28). 1553 bytes result sent to driver

 

##參數爲2個RDD的例子

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C")))
  val rdd2 = sc.makeRDD(Array((1, "1"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B")))
  val rdd3 = sc.makeRDD(Array((5, "a"), (3, "b"), (4, "c"), (2, "d"), (3, "e"), (1, "f")))
  rdd1.cogroup(rdd2,rdd3).foreach(println(_))
}

16/12/20 16:58:38 INFO Executor: Finished task 1.0 in stage 3.0 (TID 37). 1553 bytes result sent to driver
(1,(CompactBuffer(A),CompactBuffer(1, B),CompactBuffer(f)))
(2,(CompactBuffer(B),CompactBuffer(D),CompactBuffer(d)))
(3,(CompactBuffer(C),CompactBuffer(B, E),CompactBuffer(b, e)))
(4,(CompactBuffer(),CompactBuffer(A),CompactBuffer(c)))
(5,(CompactBuffer(),CompactBuffer(),CompactBuffer(a)))

16/12/20 16:58:38 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 43, localhost, partition 7, PROCESS_LOCAL, 5237 bytes)

 

##參數爲3個RDD示例略,同上。

join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

 

join至關於SQL中的內關聯join,只返回兩個RDD根據K能夠關聯上的結果,join只能用於兩個RDD之間的關聯,若是要多個RDD關聯,多關聯幾回便可。

參數numPartitions用於指定結果的分區數

參數partitioner用於指定分區函數

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C")))
  val rdd2 = sc.makeRDD(Array((1, "1"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B")))
  rdd1.join(rdd2).foreach(println(_))
}

(1,(A,1))
(1,(A,B))

(2,(B,D)) (3,(C,B)) (3,(C,E))

相關文章
相關標籤/搜索