Spark算子:RDD基本轉換操做(5)–mapPartitions、mapPartitionsWithIndex

mapPartitions

def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]數據庫

該函數和map函數相似,只不過映射函數的參數由RDD中的每個元素變成了RDD中每個分區的迭代器。若是在映射的過程當中須要頻繁建立額外的對象,使用mapPartitions要比map高效的過。函數

好比,將RDD中的全部數據經過JDBC鏈接寫入數據庫,若是使用map函數,可能要爲每個元素都建立一個connection,這樣開銷很大,若是使用mapPartitions,那麼只須要針對每個分區創建一個connection。spa

參數preservesPartitioning表示是否保留父RDD的partitioner分區信息。scala

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 10, 2)
  val func = (iter : Iterator[(Int)]) => {
    var result = List()
    var i = 0;
    while(iter.hasNext){
      i += iter.next
    }
    result.::(i).iterator
  }
  rdd1.mapPartitions{func}.collect.foreach(println(_))
}

16/12/20 11:17:14 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:30, took 0.793098 s
15
40

16/12/20 11:17:14 INFO SparkContext: Invoking stop() from shutdown hook對象

 

mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]索引

函數做用同mapPartitions,不過提供了兩個參數,第一個參數爲分區的索引。ci

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd1 = sc.makeRDD(1 to 10, 2)
  val func = (partIdx : Int,iter : Iterator[(Int)]) => {
    var part_map = scala.collection.mutable.Map[String,List[(Int)]]()
    while(iter.hasNext){
      var part_name = "part_" + partIdx;
      var elem = iter.next()
      if(part_map.contains(part_name)) {
        var elems = part_map(part_name)
        elems ::= elem
        part_map(part_name) = elems
      } else {
        part_map(part_name) = List[(Int)]{elem}
      }
    }
    part_map.iterator
  }
  rdd1.mapPartitionsWithIndex{func}.collect.foreach(println(_))
}

16/12/20 11:11:54 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:37, took 0.748727 s
(part_0,List(5, 4, 3, 2, 1))
(part_1,List(10, 9, 8, 7, 6))

16/12/20 11:11:54 INFO SparkContext: Invoking stop() from shutdown hookit

相關文章
相關標籤/搜索