==> mapPartitionsWithIndex網絡
---> 定義: def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)
ide
---> 做用: 對 RDD 每一個分區進行操做,帶有分區號函數
---> 示例:輸出分區號和內容spa
// 建立一個RDD val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) // 建立一個函數,做爲 f 的值 def func(index:Int, iter:Iterator[Int]):Iterator[String] = { iter.toList.map(x=>"[PartID: " + index + ", value= " + x + "]").iterator } // 調用 rdd1.mapPartitionsWithIndex(func).colect // 結果 res15: Array[String] = Array([PartitionID: 0,value=1], [PartitionID: 0,value=2], [PartitionID: 0,value=3], [PartitionID: 0,value=4], [PartitionID: 1,value=5], [PartitionID: 1,value=6], [PartitionID: 1,value=7], [PartitionID: 1,value=8], [PartitionID: 1,value=9])
==> aggregatescala
---> 定義:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) => U, combOp: (U, U) => U): Uorm
---- (zeroValue: U) 初始值
server
---- seqOp:(U, T) => U 局部操做it
---- combOp:(U, U) => U 全局操做io
---> 做用:先對局部進行操做,再對全局進行操做class
---> 示例:
// 求兩個分區最大值的和,初始值爲0 val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) rdd1.aggregate(0)(math.max(_,_), _+_) // 結果爲:res16: Int = 13
==> aggregateByKey
---> 定義:
---> 做用:對 key-value 格式 的數據進行 aggregate 操做
---> 示例:
// 準備一個 key-value 格式的 RDD val parRDD = sc.parallelize(List(("cat", 2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)), 2) // 計算每一個分區中的動物最多的個數求和 parRDD.aggregateByKey(0)(math.max(_, _), _+_) // 結果爲: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) // 計算每種動物的總數量 parRDD.aggregateByKey(0)(_+_, _+_).collect // 方法一 parRDD.reduceByKey(_+_).collect
==> coalesce 與 repartition
---> 做用:將 RDD 中的分區進行重分區
---> 區別: coalesce 默認不會進行 shuffle(false)
repartition 會進行 shuffle(true), 會將數據真正經過網絡進行重分區
---> 示例:
// 定義一個 RDD val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8), 2) // 顯示分區中的分區號和分區號中的內容 def func(index:Int, iter:Iterator[Int]):Iterator[String] = { iter.toList.map(x=>"[PartID: " + index + ", value= " + x + "]").iterator } // 查看 rdd 中的分區狀況 rdd.mapPartitionsWithIndex(func).collect // 結果爲: Array[String] = Array( // [PartID: 0, value= 1], [PartID: 0, value= 2], [PartID: 0, value= 3], [PartID: 0, value= 4], // [PartID: 1, value= 5], [PartID: 1, value= 6], [PartID: 1, value= 7], [PartID: 1, value= 8]) // 使用 repartition 將分區數改成3 val rdd2 = rdd1.repartition(3) val rdd3 = rdd1.coalesce(3, true) // 查看rdd2 與rdd3 的分區狀況 rdd2.mapPartitionsWithIndex(func).collect rdd3.mapPartitionsWithIndex(func).collect // 結果爲:Array[String] = Array( // [PartID: 0, value= 3], [PartID: 0, value= 6], // [PartID: 1, value= 1], [PartID: 1, value= 4], [PartID: 1, value= 7], // [PartID: 2, value= 2], [PartID: 2, value= 5])