spark map和mapPartitions的區別

package dayo1

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object MapAndPartitions {
  def main(args: Array[String]): Unit = {
    val cof = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )
    val sc = new SparkContext ( cof )

    //建立RDD(並列化方法)
    val arrayRDD = sc.parallelize ( Array ( 1, 2, 3, 4, 5, 6, 7, 8, 9 ) )

    //map數據每次處理一行數據
    arrayRDD.map ( elements => elements ).foreach ( println )

    arrayRDD.mapPartitions(tp=>{
      val result=new ArrayBuffer[Int]()
      tp.foreach(tp=>{
        result+=tp
      })
      result.iterator
    }
    ).foreach(println)

    sc.stop ()
  }

  /**
    * 兩個函數最終處理獲得的結果是同樣的
    *
    * mapPartitions比較適合須要分批處理數據的狀況,好比將數據插入某個表,每批數據只須要開啓一次數據庫鏈接,大大減小了鏈接開支,僞代碼以下:
    *
    * 複製代碼
    *     arrayRDD.mapPartitions(datas=>{
    * dbConnect = getDbConnect() //獲取數據庫鏈接
    *       datas.foreach(data=>{
    *         dbConnect.insert(data) //循環插入數據
    * })
    *       dbConnect.commit() //提交數據庫事務
    *       dbConnect.close() //關閉數據庫鏈接
    * })
    * 複製代碼
    */
}
相關文章
相關標籤/搜索