Spark的RDD編程實戰案例

             Spark的RDD編程實戰案例java

                                     做者:尹正傑apache

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。編程

 

 

  RDD體現了裝飾者設計模式,將數據處理的邏輯進行封裝,接下來讓咱們一塊兒來體驗一下吧。設計模式

 

一.RDD概述數組

1>.什麼是RDD緩存

  RDD全稱爲"Resilient Distributed Dataset",叫作彈性分佈式數據集,是Spark中最基本的數據抽象。

  代碼中是一個抽象類,它表明一個不可變、可分區、裏面的元素可並行計算的集合。

2>.RDD的屬性網絡

Internally, each RDD is characterized by five main properties:
    A list of partitions:
        一組分區(Partition),即數據集的基本組成單位;
    A function for computing each split:
        一個計算每一個分區的函數,換句話說,是計算數據放在哪一個分區中;
    A list of dependencies on other RDDs:
        RDD之間的依賴關係;
    Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
        一個Partitioner,即RDD的分片函數;
    Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file):
        一個列表,存儲存取每一個Partition的優先位置(preferred location),即數據所存儲的節點;

3>.RDD的特色 dom

  RDD表示只讀的分區的數據集,對RDD進行改動,只能經過RDD的轉換操做,由一個RDD獲得一個新的RDD,新的RDD包含了從其餘RDD衍生所必需的信息。

  RDDs之間存在依賴,RDD的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化RDD來切斷血緣關係。
  RDD有如下幾個顯著特色:
    分區       RDD邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個compute函數獲得每一個分區的數據;
      若是RDD是經過已有的文件系統構建,則compute函數是讀取指定文件系統中的數據,若是RDD是經過其餘RDD轉換而來,則compute函數是執行轉換邏輯將其餘RDD的數據進行轉換;
    只讀       RDD是隻讀的,要想改變RDD中的數據,只能在現有的RDD基礎上建立新的RDD;       由一個RDD轉換到另外一個RDD,能夠經過豐富的操做算子實現,再也不像MapReduce那樣只能寫map和reduce了;       RDD的操做算子(Operate)包括兩類:
        transformations(轉換算子):
          它是用來將RDD進行轉化,構建RDD的血緣關係;
        actions(行動算子):
          它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中;
    依賴       RDDs經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下所示,依賴包括兩種:
        窄依賴:
          RDDs之間分區是一一對應的;
        寬依賴:
          下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。
    緩存       若是在應用程序中屢次使用同一個RDD,能夠將該RDD緩存起來,該RDD只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用;    

    CheckPoint       雖然RDD的血緣關係自然地能夠實現容錯,當RDD的某個分區數據失敗或丟失,能夠經過血緣關係重建。
      可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。
      爲此,RDD支持checkpoint將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於checkpoint後的RDD不須要知道它的父RDDs了,它能夠從checkpoint處拿到數據。

 

二.RDD的建立分佈式

1>.編程模型ide

  在Spark中,RDD被表示爲對象,經過對象上的方法調用來對RDD進行轉換。

  通過一系列的transformations定義RDD以後,就能夠調用actions觸發RDD的計算,action能夠是嚮應用程序返回結果(count, collect等),或者是向存儲系統保存數據(saveAsTextFile等)。

  在Spark中,只有遇到action,纔會執行RDD的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。
  要使用Spark,開發者須要編寫一個Driver程序,它被提交到集羣以調度運行Worker,

2>.RDD的建立 

package com.yinzhengjie.bigdata.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CreateRDD {

  def main(args: Array[String]): Unit = {

    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //建立Spark上下文對象
    val sc = new SparkContext(config)

    /**
      *   RDD的建立:
      *     在Spark中建立RDD的建立方式能夠分爲三種:
      *       (1)從集合(內存)中建立RDD;
      *             從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD
      *       (2)從外部存儲建立RDD;
      *             包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等
      *       (3)從其餘RDD建立。
      */


    //使用SparkContext對象的parallelize方法能夠在內存中建立RDD
    val arrayRDD:RDD[String] = sc.parallelize(Array("yinzhengjie","JasonYin2020"))
    arrayRDD.collect().foreach(println)

    //使用SparkContext對象的makeRDD方法也能夠在內存中建立RDD,其底層實現就是parallelize方法
    val listRDD:RDD[Int] = sc.makeRDD(List(100,200,300))
    listRDD.collect().foreach(println)

    /**
      *   使用SparkContext對象的textFile方法從外部存儲中建立RDD
      *
      *   舒適提示:
      *     默認狀況下能夠讀取項目路徑,也能夠讀取其它路徑,好比HDFS,HBase對應的路徑等
      *     默認從文件中讀取的數據都是字符串類型
      */
    val fileRDD:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\spark\\data")
    fileRDD.collect().foreach(println)
  }
}
CreateRDD.scala文件內容(RDD的建立)
3>.RDD的分區
package com.yinzhengjie.bigdata.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDPartition {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //建立Spark上下文對象
    val sc = new SparkContext(config)

    /**
      *   使用SparkContext對象的makeRDD函數簽名以下:
      *     def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism) : RDD[T] = withScope
      *
      *   舒適提示:
      *     seq:
      *       傳入一個序列集合類型,好比List,Array
      *     Int = defaultParallelism:
      *       指定分區數的並行度,傳入一個整形,不傳也能夠,即便用defaultParallelism,該值默認是您的操做系統對應的總核心數。
      *
      */
    val listRDD:RDD[String] = sc.makeRDD(List("yinzhengjie","JasonYin2020"),6)  //使用6個自定義分區

    //將RDD的數據保存到文件中
    listRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output")


    /**
      *   使用SparkContext對象的textFile函數簽名以下:
      *     def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope
      *
      *   舒適提示:
      *     path:
      *       指定文件的路徑,能夠是本地路徑,也能夠是hdfs,hbase等路徑
      *     minPartitions:
      *       指定最小的分區數據,可是不必定是這個分區數,取決於Hadoop讀取文件時分片規則。
      *
      */
    val fileRDD:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\spark\\data",2)  //自定義2個分區(但實際上可能比2要大,這取決於Hadoop的分片機制)

    //保存文件時建議不要和源文件在同一個目錄,不然可能會出錯喲~
    fileRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output2")
  }
}
RDDPartition.scala文件內容(RDD的分區)

 

三.RDD經常使用的算子(Operate)

  RDD的操做算子(Operate)包括兩類,即轉換算子(transformations operate)和actions(行動算子)。
    
  transformations(轉換算子):
    它是用來將RDD進行轉化,構建RDD的血緣關係。         
  actions(行動算子):     它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中。

  舒適提示:
    轉換算子只是對業務邏輯的封裝並無真正執行代碼,而行動算子就會真正觸發代碼的執行操做。換句話說,行動算子就是用來觸發RDD計算操做的,一旦使用了行動算子,那麼在行動算子以前的轉換算子會被觸發執行。

1>.Value類型

package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(20 to 30)
    //遍歷listRDD
    listRDD.collect().foreach(println)

    //使用map算子(Operate),將listRDD的全部元素乘以5獲得新的RDD
    val mapRDD:RDD[Int] = listRDD.map(x => x * 5)   //該行可簡寫爲"val mapRDD:RDD[Int] = listRDD.map(_ * 5)"

    //遍歷mapRDD
    mapRDD.collect().foreach(println)
  }
}
map(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapPartitionsOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(20 to 30)

    //遍歷listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用mapPartitions算子(Operate),將listRDD的全部元素乘以5獲得新的RDD
      *
      *    mapPartitionsk能夠對一個RDD中全部的分區進行遍歷,假設有N個元素,有M個分區,那麼map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理全部分區。
      *
      *    map()和mapPartition()的區別以下:
      *       map():
      *         每次處理一條數據。
      *       mapPartition():
      *         每次處理一個分區的數據,這個分區的數據處理完後,原RDD中分區的數據才能釋放,可能致使OOM。
      *     開發指導:
      *       當內存空間較大的時候建議使用mapPartition(),以提升處理效率。
      *
      *    舒適提示:
      *       mapPartitions效率優於map算子(Operate),減小了發送執行器(Executor)執行交互次數(mapPartitions的Operate是基於分區爲單位發送一次任務調度到Executor,而map的Operate是每處理一條數據就發送一次任務調度給Executor)
      *       若是分區的數據比執行器(Executor)的內存大,則使用mapPartitions可能會出現內存溢出(OOM),好比一個分區有12G數據,但Executor僅有10G大小,就會出現OOM現象。
      *       綜上所述,到底使用map仍是mapPartitions算子(Operate)根據實際狀況而定。
      */
    val mapPartitionsRDD:RDD[Int] = listRDD.mapPartitions(datas => {
      datas.map(_ * 5)
    })

    //遍歷mapRDD
    mapPartitionsRDD.collect().foreach(println)

  }
}
mapPartitions(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object mapPartitionsWithIndexOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD並指定分區數爲3
    val listRDD:RDD[Int] = sc.makeRDD(20 to 30,3)
    //遍歷listRDD
    listRDD.collect().foreach(println)

    //使用mapPartitionsWithIndex算子(Operate),將listRDD的全部元素跟所在分區造成一個元組組成一個新的RDD
    val tupleRDD:RDD[(Int,String)] = listRDD.mapPartitionsWithIndex{
      case (numPartition,datas) => {
        datas.map((_,"分區編號: " + numPartition))
      }
    }

    //遍歷tupleRDD
    tupleRDD.collect().foreach(println)
  }
}
mapPartitionsWithIndex(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FlatMapOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD並指定分區數爲3
    val listRDD:RDD[List[Int]] = sc.makeRDD(Array(List(10,20),List(60,80)))

    //遍歷listRDD
    listRDD.collect().foreach(println)

    //使用flatMap算子(Operate),將listRDD的全部元素扁平化,它相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
    val flatMapRDD:RDD[Int] = listRDD.flatMap(x =>x)

    //遍歷flatMapRDD
    flatMapRDD.collect().foreach(println)
  }
}
flatMap(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object GlomOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(100 to 161,4)

    //遍歷listRDD
    listRDD.collect().foreach(println)

    //將一個分區的數據放到一個數組中,這樣咱們能夠對其進行操做,好比求和,求最值等。
    val glomRDD:RDD[Array[Int]] = listRDD.glom()

    //遍歷glomRDD
    glomRDD.collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

  }
}
glom案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object GroupByOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(10 to 20)

    //遍歷listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用groupBy算子(Operate)進行分組,按照傳入函數(指定規則)的返回值進行分組,將相同的key對應的值放入一個迭代器。
      *
      *   分組後的數據造成了對偶元組(K,V),K表示分組的key,V表示分組的數據集合。
      *
      *   下面的案例就是按照元素模以2的值進行分組。
      */
    val groupByRDD:RDD[(Int,Iterable[Int])] = listRDD.groupBy(i => i % 2)

    //遍歷groupByRDD
    groupByRDD.collect().foreach(println)
  }
}
groupBy(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object FilterOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(10 to 20)

    //遍歷listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用filter算子(Operate)進行過濾。返回一個新的RDD,該RDD由通過func函數(按照指定的規則)計算後返回值爲true的輸入元素組成。
      *
      *   下面的案例就是按照元素模以2的值進行過濾,即僅保留偶數。
      */
    val filterRDD:RDD[Int] = listRDD.filter(x => x % 2 == 0)

    //遍歷filterRDD
    filterRDD.collect().foreach(println)
  }
}
filter(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SampleOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(10 to 20)

    //遍歷listRDD
    listRDD.collect().foreach(println)

    /**
      *   sample算子(Operate)用以指定的隨機種子隨機抽樣出數量爲fraction的數據。
      *
      *   sample的函數簽名以下:
      *     def sample( withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]
      *
      *   如下是相關參數說明:
      *     withReplacement:
      *       表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣,
      *     fraction:
      *       表示sample的打分,是一個Double類型。
      *     seed:
      *       用於指定隨機數生成器種子。
      *
      */
    val sampleRDD:RDD[Int] = listRDD.sample(false,0.7,1)
    
    //遍歷sampleRDD
    sampleRDD.collect().foreach(println)

  }
}
sample(withReplacement, fraction, seed)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object DistinctOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(List(6,6,8,1,2,1,6,9,5,6,1,8,2,7,0,7,6,3,5,4,6,0,7,1))

    //遍歷listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用distinct算子(Operate)對數據去重,可是由於去重後會致使數據減小,因此能夠自定義分區數量,默認分區數是你操做系統的真實core數量。
      *
      */
    //    val distinctRDD:RDD[Int] = listRDD.distinct()
    val distinctRDD:RDD[Int] = listRDD.distinct(3)

    //爲了了看到試驗效果,建議將結果以文件的形式保存,直接打印到控制檯終端可能看不出效果喲~
    // distinctRDD.collect().foreach(println)
    distinctRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\output")
  }
}
distinct([numPartitions])) 案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CoalesceOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD,指定分區數切片爲4
    val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4)

    //遍歷listRDD
//    listRDD.collect().foreach(println)

    println("縮減分區前分區數量: " + listRDD.partitions.size)

    //使用coalesce算子(Operate)縮減分區數,用於大數據集過濾後,提升小數據集的執行效率。能夠簡單理解爲合併分區
    val coalesceRDD:RDD[Int] = listRDD.coalesce(3)

    println("縮減分區後分區數量: " + coalesceRDD.partitions.size)

    coalesceRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\output")
  }
}
coalesce(numPartitions)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RepartitionsOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD,指定分區數切片爲4
    val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4)

    //遍歷listRDD,發現數據是有序的
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    println("Rebalance前分區數量: " + listRDD.partitions.size)

    /**
      *   使用repartition算子(Operate)是根據分區數,從新經過網絡隨機洗牌全部數據。
      *
      *   coalesce和repartition的區別
      *     1>.coalesce從新分區,能夠選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。
      *     2>.repartition其實是調用的coalesce,默認是進行shuffle的。
      *
      *   下面的案例就是對listRDD進行從新分區(將listRDD的4個分區數從新分區爲2個),生成一個新的RDD對象rebalanceRDD。
      */

    val rebalanceRDD:RDD[Int] = listRDD.repartition(2)

    println("Rebalance後分區數量:" + rebalanceRDD.partitions.size)

    //遍歷rebalanceRDD,此時你會發現數據並非有序的,而是被打亂啦~
    rebalanceRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )


  }
}
repartition(numPartitions)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SortByOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    val listRDD:RDD[Int] = sc.parallelize(List(2,1,7,6,9,3,8,5))

    //遍歷listRDD
    listRDD.collect().foreach(println)

    /**
      *   sortBy算子的函數參數列表簽名以下:
      *     def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)
      *
      *   經過函數簽名能夠知道咱們使用時只須要傳入一個參數便可, 其它2個參數均有默認值,
      *
      *   使用func先對數據進行處理,按照處理後的數據比較結果排序,默認爲升序(ascending: Boolean = true)。
      *
      *   下面的案例按照自身大小進行排序,默認是升序。
      */
    val sortByRDD:RDD[Int] = listRDD.sortBy(x => x)

    //遍歷sortByRDD
    sortByRDD.collect().foreach(
      x =>{
        println(x)
      }
    )

    //下面的案例按照自身大小進行排序,咱們指定ascending的值爲false,排序則爲降序。
    val sortByRDD2:RDD[Int] = listRDD.sortBy(x => x,false)

    //遍歷sortByRDD2
    sortByRDD2.collect().foreach(println)

  }
}
sortBy(func,[ascending], [numTasks])案例

2>.雙Value類型交互

package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object UnionOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)
    //建立rdd1
    val rdd1:RDD[Int] = sc.parallelize(List(1,3,5,7,9))

    //建立rdd2
    val rdd2:RDD[Int] = sc.makeRDD(List(2,4,6,8,10))

    /**
      *   union算子(Operate)能夠對源RDD和參數RDD求並集後返回一個新的RDD。
      *
      *   下面的案例就是將rdd1和rdd2進行合併爲sumrdd,
      */
    val sumRDD:RDD[Int] =rdd1.union(rdd2)

    //遍歷sumRDD
    sumRDD.collect().foreach(println)
  }

}
union(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubtractOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立rdd1
    val rdd1:RDD[Int] = sc.parallelize(10 to 20)

    //建立rdd2
    val rdd2:RDD[Int] = sc.makeRDD(15 to 30)

    /**
      *   subtract算子是用來計算差的一種函數,去除兩個RDD中相同的元素,不一樣的RDD將保留下來。
      *
      *   下面的案例就是計算第一個RDD與第二個RDD的差集並打印
      */
    val subtractRDD:RDD[Int] =rdd1.subtract(rdd2)

    //遍歷subtractRDD
    subtractRDD.collect().foreach(println)
  }
}
subtract(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object IntersectionOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立rdd1
    val rdd1:RDD[Int] = sc.parallelize(10 to 20)

    //建立rdd2
    val rdd2:RDD[Int] = sc.makeRDD(15 to 30)

    //使用計算兩個RDD的交集
    val intersectionRDD:RDD[Int] = rdd1.intersection(rdd2)

    //遍歷intersectionRDD
    intersectionRDD.collect().foreach(println)
  }
}
intersection(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CartesianOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立rdd1
    val rdd1:RDD[Int] = sc.parallelize(10 to 20)

    //建立rdd2
    val rdd2:RDD[Int] = sc.makeRDD(15 to 30)

    //計算兩個RDD的笛卡爾積並打印,生產環境中應該儘可能避免使用!
    val cartesian:RDD[(Int,Int)]  = rdd1.cartesian(rdd2)

    //遍歷cartesian
    cartesian.collect().foreach(println)
  }
}
cartesian(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ZipOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立rdd1
    val rdd1:RDD[Int] = sc.parallelize(Array(100,200,300),3)

    //建立rdd2
    val rdd2:RDD[String] = sc.makeRDD(Array("storm","spark","flink"),3)

    //zip算子能夠將兩個RDD組合成Key/Value形式的RDD,這裏默認兩個RDD的partition數量以及元素數量都相同,不然會拋出異常。
    val zipRDD:RDD[(Int,String)] = rdd1.zip(rdd2)

    //遍歷zipRDD
    zipRDD.collect().foreach(println)
  }
}
zip(otherDataset)案例

3>.Key-Value類型

package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object PartitionByOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val rdd1:RDD[(Int,String)] = sc.makeRDD(Array((1,"hdfs"),(2,"yarn"),(3,"mapreduce"),(4,"spark")),4)

    //遍歷rdd2
    rdd1.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
    println("rdd1分區數是: " + rdd1.partitions.size)

    /**
      *   對rdd1進行重分區(對rdd1進行分區操做,若是原有的rdd1和現有的rdd2分區數是一致的話就不進行分區, 不然會生成ShuffleRDD,即會產生shuffle過程。)
      *
      *   須要注意的是,partitionBy算子屬於PairRDDFunctions類,所以這裏設計到了隱式轉換喲~
      *
      */
    val rdd2:RDD[(Int,String)] = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))

    println("rdd2分區數是: " + rdd2.partitions.size)

    //遍歷rdd2
    rdd2.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
  }
}
partitionBy案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object GroupByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立一個數組
    val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS")

    //建立RDD並將上面的words映射爲二元組便於後面使用grooupByKey算子
    val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1))

    //groupByKey也是對每一個key進行操做,但只生成一個sequence。
    val groupByKeyRDD:RDD[(String,Iterable[Int])] = mapRDD.groupByKey()

    //遍歷groupByKeyRDD
    groupByKeyRDD.collect().foreach(println)

    //對每一個單詞進行統計
    groupByKeyRDD.map(word => (word._1, word._2.sum)).collect().foreach(println)
  }
}
groupByKey案例 
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ReduceByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立一個數組
    val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS")

    //建立RDD並將上面的words映射爲二元組便於後面使用reduceByKey算子
    val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1))

    /**
      *   在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,reduce任務的個數能夠經過第二個可選的參數來設置。
      *
      *   reduceByKey和groupByKey的區別以下:
      *     reduceByKey:
      *       按照key進行聚合,在shuffle以前有combine(預聚合)操做,返回結果是RDD[k,v].
      *     groupByKey:
      *       按照key進行分組,直接進行shuffle。
      *     開發指導:
      *       reduceByKey比groupByKey建議使用,由於預聚合操做會節省帶寬傳輸,可是須要注意是否會影響業務邏輯。
      */
    val reduceByKeyRDD:RDD[(String,Int)] = mapRDD.reduceByKey(_+_)

    //遍歷reduceByKeyRDD
    reduceByKeyRDD.collect().foreach(println)


  }
}
reduceByKey(func, [numTasks])案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AggregateByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2)

    //遍歷listRDD各個分區的元素
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )


    /**
      *     aggregateByKey的函數簽名以下:
      *       def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope
      *
      *     做用:
      *       在kv對的RDD中,按key將value進行分組合並,合併時,將每一個value和初始值做爲seq函數的參數,進行計算,返回的結果做爲一個新的kv對,而後再將結果按照key進行合併;
      *       最後將每一個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果做爲一個新的kv對輸出。
      *       參數描述:
      *         zeroValue:
      *           給每個分區中的每個key一個初始值;
      *         seqOp:
      *           函數用於在每個分區中用初始值逐步迭代value;
      *         combOp:
      *           函數用於合併每一個分區中的結果。
      *
      *     下面的案例爲建立一個pairRDD,取出每一個分區相同key對應值的最大值,而後相加
      */
    val aggregateByKeyRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(math.max(_,_),_+_)

    //遍歷aggregateByKeyRDD各個分區的元素
    aggregateByKeyRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    //使用aggregateByKey也能夠實現相似於WordCount的功能
    val wcRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(_+_,_+_)

    //遍歷wcRDD
    wcRDD.collect().foreach(println)

  }
}
aggregateByKey案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object FoldByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2)

    //遍歷listRDD各個分區的元素
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    /**
      *   aggregateByKey與foldByKey的區別:
      *     aggregateByKey的簡化操做,seqop和combop相同。
      *     咱們會發現aggregateByKey須要傳遞2個參數,分別用於分區內和分區間的操做;
      *     而foldByKey只須要傳遞一個參數,由於分區內和分區間的操做相同,所以只須要傳遞一個參數便可.
      *
      *   下面的案例是計算相同key對應值的相加結果
      */
    val foldByKeyRDD:RDD[(String,Int)] = listRDD.foldByKey(0)(_+_)

    //遍歷foldByKeyRDD各個分區的元素
    foldByKeyRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
  }
}
foldByKey案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CombineByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

    //遍歷listRDD各個分區的元素
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )


    /**
      *   參數:
      *       (createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C)
      *   做用:
      *       對相同K,把V合併成一個集合。
      *   參數描述:
      *     createCombiner:
      *       combineByKey() 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。
      *       若是這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函數來建立那個鍵對應的累加器的初始值
      *     mergeValue:
      *       若是這是一個在處理當前分區以前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併
      *     mergeCombiners:
      *       因爲每一個分區都是獨立處理的, 所以對於同一個鍵能夠有多個累加器。
      *       若是有兩個或者更多的分區都有對應同一個鍵的累加器, 就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。
      *
      *
      *    下面的案例就是根據key計算每種key的均值。(先計算每一個key出現的次數以及能夠對應值的總和,再相除獲得結果)
      */
    val combineByKeyRDD:RDD[(String,(Int,Int))] =  listRDD.combineByKey(
      (_,1),                                    //轉換結構,一個key第一次出現對其計數爲1
      (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),   //定義分區內的計算規則,即相同key的vlaue相加,並將計數器加1
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)  //定義分區間的計算規則,即將各個分區相同key的計算結果進行累加操做。
    )

    //遍歷combineByKeyRDD各個分區的元素
    combineByKeyRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    //計算平均值
    val averageValueRDD:RDD[(String,Double)] = combineByKeyRDD.map{case (key,value) => (key,value._1/value._2.toDouble)}

    //遍歷averageValueRDD各個分區的元素(能夠查看對應key的平均值)
    averageValueRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
  }
}
combineByKey[C]案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SortByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

   val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink")))

    /**
      *   sortByKey算子在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD,ascending的值默認爲true
      */
    val positiveSequenceRDD:RDD[(Int,String)] = arrayRDD.sortByKey()

    positiveSequenceRDD.collect().foreach(println)

    /**
      *   sortByKey算子在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD,ascending的值爲false時,順序爲倒敘。
      */
    val ReverseOrderRDD:RDD[(Int,String)] = arrayRDD.sortByKey(false)

    ReverseOrderRDD.collect().foreach(println)
  }
}
sortByKey([ascending], [numTasks])案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapValuesOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink"),(1,"mapreduce")))
    arrayRDD.collect().foreach(println)

    /**
      *   針對於(K,V)形式的類型只對V進行操做
      *
      *   下面的案例就是對value添加字符串"*****"
      */
    val mapValuesRDD:RDD[(Int,String)] = arrayRDD.mapValues(_ + "*****")

    mapValuesRDD.collect().foreach(println)
  }
}
mapValues案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object JoinOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink")))

    val rdd2:RDD[(Int,Int)] = sc.parallelize(Array((1,30),(2,60),(3,90)))


    /**
      *   在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD.
      */
    val rdd3:RDD[(Int,(String,Int))] = rdd1.join(rdd2)

    rdd3.collect().foreach(println)

  }
}
join(otherDataset, [numTasks])案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CogroupOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink")))

    val rdd2:RDD[(Int,Int)] = sc.makeRDD(Array((1,30),(2,60),(3,90)))


    /**
      *   在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD..
      */
    val rdd3:RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2)

    rdd3.collect().foreach(println)

  }

}
cogroup(otherDataset, [numTasks])案例

4>.Actions

package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ReduceOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val rdd1:RDD[Int] = sc.makeRDD(1 to 100,2)
    val rdd2 = sc.parallelize(Array(("Hadoop",100),("Spark",300),("Flink",500),("MapReduce",700)))

    /**
      *   經過func函數彙集RDD中的全部元素,先聚合分區內數據,再聚合分區間數據。
      */
    val res1:Int = rdd1.reduce(_+_)
    val res2:(String,Int) = rdd2.reduce((x,y)=>(x._1 + "-" + y._1,x._2 + y._2))

    println(res1)
    println(res2)
  }
}
reduce(func)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CollectOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val rdd1:RDD[Int] = sc.parallelize(1 to 100)

    /**
      *   在驅動程序中,以數組的形式返回數據集的全部元素。
      */
    val res:Array[Int] = rdd1.collect()

    res.foreach(println)
  }
}
collect()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CountOperate {

  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val rdd1:RDD[Int] = sc.makeRDD(1 to 100)

    /**
      *   返回RDD中元素的個數
      */
    val count:Long = rdd1.count

    println(count)
  }
}
count()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FirstOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val rdd1:RDD[Int] = sc.makeRDD(50 to 100)

    /**
      *   返回RDD中的第一個元素
      */
    val res1:Int = rdd1.first()

    println(res1)

  }
}
first()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object TakeOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val rdd1:RDD[Int] = sc.makeRDD(50 to 100)

    /**
      *   返回一個由RDD的前n個元素組成的數組
      */
    val res1:Array[Int] = rdd1.take(5)

    res1.foreach(println)
  }
}
take(n)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object TakeOrderedOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    //建立RDD
    val listRDD:RDD[Int] = sc.makeRDD(List(9,5,20,7,10,4,8,30,6))

    /**
      *   返回該RDD排序後的前n個元素組成的數組
      */
    val res:Array[Int] = listRDD.takeOrdered(5)

    res.foreach(println)

  }
}
takeOrdered(n)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AggregateOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    val rdd1:RDD[Int] = sc.parallelize(1 to 100,2)

    /**
      *   aggregate函數將每一個分區裏面的元素經過seqOp和初始值進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個函數最終返回的類型不須要和RDD中元素類型一致。
      *
      *   須要注意的是,aggregate算子在計算時,各分區內部計算須要加上初始值(zeroValue),分區間計算也會加上該初始值(zeroValue)
      */
    val res1:Int = rdd1.aggregate(0)(_+_,_+_)
    val res2:Int = rdd1.aggregate(100)(_+_,_+_)

    println(res1)
    println(res2)
  }
}
aggregate案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object FoldOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    val rdd1:RDD[Int] = sc.parallelize(1 to 100,2)

    /**
      *   摺疊操做,aggregate的簡化操做,seqop和combop同樣。
      */
    val res1:Int =  rdd1.fold(0)(_+_)
    val res2:Int =  rdd1.fold(100)(_+_)

    println(res1)
    println(res2)
  }
}
fold(num)(func)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsTextFileOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

    /**
      *   將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本
      */
    listRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\text")
  }
}
saveAsTextFile(path)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsSequenceFileOperate {
  def main(args: Array[String]): Unit = {
      //建立SparkConf對象
      val  config:SparkConf = new SparkConf()
      config.setMaster("local[*]")
      config.setAppName("WordCount")

      //建立Spark上下文對象
      val sc = new SparkContext(config)


      val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

      /**
        *   將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。
        */
      listRDD.saveAsSequenceFile("E:\\yinzhengjie\\bigdata\\spark\\sequence")
  }
}
saveAsSequenceFile(path)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsObjectFileOperate {
  def main(args: Array[String]): Unit = {
      //建立SparkConf對象
      val config: SparkConf = new SparkConf()
      config.setMaster("local[*]")
      config.setAppName("WordCount")

      //建立Spark上下文對象
      val sc = new SparkContext(config)


      val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2)

      /**
        * 用於將RDD中的元素序列化成對象,存儲到文件中。
        */
      listRDD.saveAsObjectFile("E:\\yinzhengjie\\bigdata\\spark\\object")
  }
}
saveAsObjectFile(path)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.Map

object CountByKeyOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val config: SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)


    val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2)

    /**
      *   針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。
      */
    val res:Map[String,Long] = listRDD.countByKey

    println(res)
  }
}
countByKey()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ForeachOperate {
  def main(args: Array[String]): Unit = {
    //建立SparkConf對象
    val config: SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //建立Spark上下文對象
    val sc = new SparkContext(config)

    val listRDD: RDD[Int] = sc.makeRDD(20 to 30,2)

    /**
      *   在數據集的每個元素上,運行函數func進行更新。
      */
    listRDD.foreach(println)
  }
}
foreach(func)案例

5>.RDD的函數傳遞

  在實際開發中咱們每每須要本身定義一些對於RDD的操做,那麼此時須要主要的是,初始化工做是在Driver端進行的,而實際運行程序是在Executor端進行的,這就涉及到了跨進程通訊,是須要序列化的。

  接下來咱們看下面2個案例操做。
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import java.io.Serializable

/**
  *   傳遞Search對象時,必須得先序列化後才能在網絡傳遞,不然沒法在Exector端進行反序列化。
  *
  */
class Search(query:String) extends Serializable {
  //過濾出包含字符串的數據
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }

  //過濾出包含字符串的RDD
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }

  //過濾出包含字符串的RDD
  def getMatche2(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

object SerializableableMethod {
  def main(args: Array[String]): Unit = {

      //1.初始化配置信息及SparkContext
      val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
      val sc = new SparkContext(sparkConf)

      //2.建立一個RDD
      val rdd: RDD[String] = sc.parallelize(Array("Hadoop", "Spark", "Hive", "Storm"))

      //3.建立一個Search對象,該過程是在Driver端執行的
      val search = new Search("S")

      //4.運用第一個過濾函數並打印結果,該過程是在Exector端執行的,所以須要將Driver端的Search對象傳遞給Exector,這意味着Search對象必須是序列化的,不然就會報錯喲(Caused by: java.io.NotSerializableException: com.yinzhengjie.bigdata.spark.rdd.functionTransfer.Search)
      val match1: RDD[String] = search.getMatch1(rdd)
      match1.collect().foreach(println)

      //5.釋放資源
      sc.stop()
  }
}
傳遞一個方案案例
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD


/**
  *   僅傳遞字符串時,無需進行序列化操做喲~
  */
class Search(query:String) {
  //過濾出包含字符串的數據
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }

  //過濾出包含字符串的RDD
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }

  //過濾出包含字符串的RDD
  def getMatche2(rdd: RDD[String]): RDD[String] = {
//    rdd.filter(x => x.contains(query))
    val query_ : String = this.query    //將類變量賦值給局部變量,該代碼是在Driver端執行
    rdd.filter(x => x.contains(query_))  //該代碼在Exector端執行,所以query_這個成員變量屬性須要傳遞過來,而query_自己就是字符串,所以無需序列化。
  }
}

object SerializableableAttribute {
  def main(args: Array[String]): Unit = {
    //1.初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //2.建立一個RDD
    val rdd: RDD[String] = sc.makeRDD(Array("Hadoop", "Spark", "Hive", "Storm"))

    //3.建立一個Search對象
    val search = new Search("o")

    //4.運用第一個過濾函數並打印結果
    val match1: RDD[String] = search.getMatche2(rdd)
    match1.collect().foreach(println)

    //5.釋放資源
    sc.stop()
  }
}
傳遞一個屬性(局部變量)案例

 

四.RDD依賴關係

1>.Lineage(血統)

package com.yinzhengjie.bigdata.spark.dependent

import org.apache.spark.rdd.RDD
import org.apache.spark.{Dependency, SparkConf, SparkContext}

/**
  *   RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。
  *
  *   RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。
  */
object Lineage {
  def main(args: Array[String]): Unit = {
          //1.初始化配置信息及SparkContext
          val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
          val sc = new SparkContext(sparkConf)

          //2.建立一個RDD
          val listRDD: RDD[Int] = sc.parallelize(1 to 10)

          //3.將listRDD射成一個個元組
          val mapRDD: RDD[(Int,Int)] = listRDD.map((_,1))

          //4.統計每一種key對應的個數
          val reduceRDD:RDD[(Int,Int)] = mapRDD.reduceByKey(_+_)

          /**
          *    5 >.查看reduceRDD的Lineage(血統)
          *
          *    RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
          *      窄依賴
          *        窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴咱們形象的比喻爲獨生子女。
          *      寬依賴
          *        寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引發shuffle操做,寬依賴咱們形象的比喻爲超生。
          */
          val lineage:String = reduceRDD.toDebugString
          println(lineage)

          //6>.查看依賴類型
          val dependencies:Seq[Dependency[_]] = reduceRDD.dependencies
          println(dependencies)

          //5.釋放資源
          sc.stop()
  }
}

2>.DAG

  DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage;
  
  對於窄依賴,partition的轉換處理在Stage中完成計算;

  對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據;

  以下圖所示,是某個job分爲3個階段(stage),窄依賴能夠放在同一個階段(stage),而寬依賴因爲shuffle的存在所以不能放在同一個階段(state)中:
    A和B:
      groupBy操做是寬依賴,存在shuffle操做。  
    F和G:
      join操做是寬依賴,存在shuffle操做。
    B和G:
      是窄依賴,覺得B的各個分區和G的分區惟一對應。
    E和F,D和F,C和D:
      map和union均沒有shuffle操做,所以均是宅依賴,所以他們能夠在同一個階段(stage)。

  舒適提示:
    寬依賴有shufle操做,窄依賴沒有shuffle操做,所以咱們能夠將宅依賴放在同一個階段執行,而寬依賴則須要分開不一樣的階段操做,由於寬依賴要作shuffle的前提是須要依賴上一個階段的執行結果。
    因爲窄依賴不須要等待,就能夠利用並行的概念來執行數據,從而提高效率。

 

3>.任務規劃

  RDD任務切分中間分爲:Application、Job、Stage和Task。

  Application:
    初始化一個SparkContext即生成一個Application
  
  Job:
    一個Action算子就會生成一個Job

  Stage:
    根據RDD之間的依賴關係的不一樣將Job劃分紅不一樣的Stage,遇到一個寬依賴則劃分一個Stage。
 
  Task:
    Stage是一個TaskSet,將Stage劃分的結果發送到不一樣的Executor執行即爲一個Task。

  舒適提示:
    Application->Job->Stage-> Task每一層都是1對n的關係。

 

五.RDD緩存

1>.RDD緩存概述

  RDD經過persist方法或cache方法能夠將前面的計算結果緩存,默認狀況下 persist()會把數據以序列化的形式緩存在JVM的堆空間中。

  可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。

  經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
  
  以下圖所示,在存儲級別的末尾加上"_2"來把持久化數據存爲兩份。
  
  緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。

 

2>.緩存代碼實現案例

package com.yinzhengjie.bigdata.spark.cache

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CacheDemo {
  def main(args: Array[String]): Unit = {

    //1.初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //2.建立一個RDD
    val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020"))

    //3.將RDD轉換爲攜帶當前時間戳不作緩存
    val nocache = listRDD.map(_.toString+System.currentTimeMillis)

    //4>.查看nocache的Lineage(血統)關係
    System.out.println(nocache.toDebugString)

    //5>.屢次打印無緩存結果
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)

    //6>.將RDD轉換爲攜帶當前時間戳並作緩存
    val cache =  listRDD.map(_.toString+System.currentTimeMillis).cache

    //7>.查看cache的Lineage(血統)關係
    System.out.println(cache.toDebugString)

    //8>.屢次打印緩存結果
    cache.collect.foreach(println)
    cache.collect.foreach(println)
    cache.collect.foreach(println)
    cache.collect.foreach(println)
    cache.collect.foreach(println)

    //9.釋放資源
    sc.stop()
  }
}

 

六.RDD 檢查點(CheckPoint)

1>.檢查點概述

  Spark中對於數據的保存除了持久化操做以外,還提供了一種檢查點的機制,檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。

  檢查點經過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。爲當前RDD設置檢查點,該函數將會建立一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。

  在checkpoint的過程當中,該RDD的全部依賴於父RDD中的信息將所有被移除。對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。

2>.檢查點代碼實現案例

package com.yinzhengjie.bigdata.spark.cache

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //設置檢查點的保存目錄,實際工做中應該使用hdfs路徑,本地目錄通常用於測試使用
    sc.setCheckpointDir("E:\\yinzhengjie\\bigdata\\spark\\checkpoint")

    //建立一個RDD
    val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020"))

    //將RDD轉換爲攜帶當前時間戳
    val nocache = listRDD.map(_.toString+System.currentTimeMillis)

    //設置檢查點,數據會被持久化到sc上定義的檢查點保存目錄
    nocache.checkpoint()

    //使用行動算子屢次打印結果
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)

    //查看Lineage(血統)
    println(nocache.toDebugString)

    //釋放資源
    sc.stop()
  }
}
相關文章
相關標籤/搜索