1. mappartition簡介typescript
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
3. mappartitions低效用法數據庫
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
val a = sc.parallelize(1 to 20, 2) def terFunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res.::= (cur*3) ; } res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33
4. mappartitions高效用法數組
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] { def hasNext : Boolean = { iter.hasNext } def next : Int= { val cur = iter.next cur*3 }}
val result = a.mapPartitions(v => new CustomIterator(v))println(result.collect().mkString(","))
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
本文分享自微信公衆號 - 浪尖聊大數據(bigdatatip)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。緩存