package dayo1 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MapAndPartitions { def main(args: Array[String]): Unit = { val cof = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) val sc = new SparkContext ( cof ) //建立RDD(並列化方法) val arrayRDD = sc.parallelize ( Array ( 1, 2, 3, 4, 5, 6, 7, 8, 9 ) ) //map數據每次處理一行數據 arrayRDD.map ( elements => elements ).foreach ( println ) arrayRDD.mapPartitions(tp=>{ val result=new ArrayBuffer[Int]() tp.foreach(tp=>{ result+=tp }) result.iterator } ).foreach(println) sc.stop () } /** * 兩個函數最終處理獲得的結果是同樣的 * * mapPartitions比較適合須要分批處理數據的狀況,好比將數據插入某個表,每批數據只須要開啓一次數據庫鏈接,大大減小了鏈接開支,僞代碼以下: * * 複製代碼 * arrayRDD.mapPartitions(datas=>{ * dbConnect = getDbConnect() //獲取數據庫鏈接 * datas.foreach(data=>{ * dbConnect.insert(data) //循環插入數據 * }) * dbConnect.commit() //提交數據庫事務 * dbConnect.close() //關閉數據庫鏈接 * }) * 複製代碼 */ }