mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]函數
函數做用同mapPartitions,不過提供了分區的索引(代碼中partid)。
.net
val rdd = sc.parallelize(1 to 8,3)
rdd.mapPartitionsWithIndex{
(partid,iter)=>{
var part_map = scala.collection.mutable.Map[String,List[Int]]()
var part_name = "part_" + partid
part_map(part_name) = List[Int]()
while(iter.hasNext){
part_map(part_name) :+= iter.next()//:+= 列表尾部追加元素
}
part_map.iterator
}
}.collectscala
OUTPUT blog
res0: Array[(String, List[Int])] = Array((part_0,List(1, 2)), (part_1,List(3, 4, 5)), (part_2,List(6, 7, 8)))索引