1. mappartition簡介typescript
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
3. mappartitions低效用法數據庫
val a = sc.parallelize(1 to 20, 2) def terFunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res.::= (cur*3) ; } res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
4. mappartitions高效用法數組
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] { def hasNext : Boolean = { iter.hasNext } def next : Int= { val cur = iter.next cur*3 }}
val result = a.mapPartitions(v => new CustomIterator(v))println(result.collect().mkString(","))
