##參數爲1個RDD函數
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]ui
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]spa
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]it
##參數爲2個RDDspark
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]io
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]ast
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]test
##參數爲3個RDDforeach
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]d3
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
cogroup至關於SQL中的全外關聯full outer join,返回左右RDD中的記錄,關聯不上的爲空。
參數numPartitions用於指定結果的分區數。
參數partitioner用於指定分區函數。
##參數爲1個RDD的例子
def main(args: Array[String]): Unit = { //默認分區12個 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (3, "C"))) val rdd2 = sc.makeRDD(Array((1, "A"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B"))) rdd1.cogroup(rdd2).foreach(println(_)) }
16/12/20 16:54:05 INFO Executor: Finished task 1.0 in stage 2.0 (TID 25). 1553 bytes result sent to driver
(1,(CompactBuffer(A),CompactBuffer(A, B)))
(2,(CompactBuffer(B),CompactBuffer(D)))
(3,(CompactBuffer(C, C),CompactBuffer(B, E)))
(4,(CompactBuffer(),CompactBuffer(A)))
16/12/20 16:54:05 INFO Executor: Finished task 4.0 in stage 2.0 (TID 28). 1553 bytes result sent to driver
##參數爲2個RDD的例子
def main(args: Array[String]): Unit = { //默認分區12個 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.makeRDD(Array((1, "1"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B"))) val rdd3 = sc.makeRDD(Array((5, "a"), (3, "b"), (4, "c"), (2, "d"), (3, "e"), (1, "f"))) rdd1.cogroup(rdd2,rdd3).foreach(println(_)) }
16/12/20 16:58:38 INFO Executor: Finished task 1.0 in stage 3.0 (TID 37). 1553 bytes result sent to driver
(1,(CompactBuffer(A),CompactBuffer(1, B),CompactBuffer(f)))
(2,(CompactBuffer(B),CompactBuffer(D),CompactBuffer(d)))
(3,(CompactBuffer(C),CompactBuffer(B, E),CompactBuffer(b, e)))
(4,(CompactBuffer(),CompactBuffer(A),CompactBuffer(c)))
(5,(CompactBuffer(),CompactBuffer(),CompactBuffer(a)))
16/12/20 16:58:38 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 43, localhost, partition 7, PROCESS_LOCAL, 5237 bytes)
##參數爲3個RDD示例略,同上。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
join至關於SQL中的內關聯join,只返回兩個RDD根據K能夠關聯上的結果,join只能用於兩個RDD之間的關聯,若是要多個RDD關聯,多關聯幾回便可。
參數numPartitions用於指定結果的分區數
參數partitioner用於指定分區函數
def main(args: Array[String]): Unit = { //默認分區12個 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) val rdd1 = sc.makeRDD(Array((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.makeRDD(Array((1, "1"), (3, "B"), (4, "A"), (2, "D"), (3, "E"), (1, "B"))) rdd1.join(rdd2).foreach(println(_)) }
(1,(A,1))
(1,(A,B))
(2,(B,D)) (3,(C,B)) (3,(C,E))