Spark算子:RDD基本轉換操做(6)–zip、zipPartitions

zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]函數

zip函數用於將兩個RDD組合成Key/Value形式的RDD,這裏默認兩個RDD的partition數量以及元素數量都相同,不然會拋出異常。spa

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 5,2)
  var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2)
  var rdd3 = rdd1.zip(rdd2)
  println("RDD partitions size:" + rdd3.partitions.size)
  rdd3.collect.foreach(println(_))
}

 

16/12/20 11:30:42 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:25, took 0.680351 s
(1,A)
(2,B)
(3,C)
(4,D)
(5,E)

16/12/20 11:30:42 INFO SparkContext: Invoking stop() from shutdown hookscala

 

zipPartitions

zipPartitions函數將多個RDD按照partition組合成爲新的RDD,該函數須要組合的RDD具備相同的分區數,但對於每一個分區內的元素數量沒有要求。ip

該函數有好幾種實現,可分爲三類:ci

  • 參數是一個RDD

def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]it

def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]spark

這兩個區別就是參數preservesPartitioning,是否保留父RDD的partitioner分區信息io

映射方法f參數爲兩個RDD的迭代器。ast

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 10, 2)
  var rdd2 = sc.makeRDD(Array('A', 'B', 'C', 'D', 'E'), 2)
  var rdd3 = rdd1.zipPartitions(rdd2) {
    (rdd1Iter, rdd2Iter) => {
      var result = List[String]()
      while (rdd1Iter.hasNext && rdd2Iter.hasNext) {
        result ::= (rdd1Iter.next() + "_" + rdd2Iter.next())
      }
      result.iterator
    }
  }.collect.foreach(println(_))
}

16/12/20 15:02:55 INFO DAGScheduler: ResultStage 0 (collect at ShellTest.scala:31) finished in 0.239 s
16/12/20 15:02:55 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:31, took 0.699322 s
2_B
1_A
8_E
7_D
6_C

16/12/20 15:02:55 INFO SparkContext: Invoking stop() from shutdown hooktest

 

  • 參數是兩個RDD

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不過該函數參數爲兩個RDD,映射方法f輸入參數爲兩個RDD的迭代器。

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 10, 2)
  var rdd2 = sc.makeRDD(Array('A', 'B', 'C', 'D', 'E'), 2)
  var rdd3 = sc.makeRDD(Array('a', 'b', 'c', 'd', 'e'), 2)
  var rdd4 = rdd1.zipPartitions(rdd2, rdd3) {
    (rdd1Iter, rdd2Iter, rdd3Iter) => {
      var result = List[String]()
      while (rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
        result ::= (rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
      }
      result.iterator
    }
  }.collect.foreach(println(_))
}

 

16/12/20 15:06:21 INFO DAGScheduler: ResultStage 0 (collect at ShellTest.scala:32) finished in 0.260 s
16/12/20 15:06:21 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:32, took 0.816710 s
2_B_b
1_A_a
8_E_e
7_D_d
6_C_c

16/12/20 15:06:21 INFO SparkContext: Invoking stop() from shutdown hook

  • 參數是三個RDD

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

 

用法同上面,只不過這裏又多了個一個RDD而已。

相關文章
相關標籤/搜索