Spark核心編程的三大數據結構 之 RDD基礎編程 (二)

這是我參與更文挑戰的第2天,活動詳情查看:更文挑戰數據庫

前傳

Spark核心編程的三大數據結構 之 RDD基礎編程 (一)apache

4. RDD依賴關係

4.1 RDD 血緣關係

RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。編程

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ), 2)
        println(rdd.toDebugString)

        println("--------------------------------------")

        val rdd1 = rdd.map(t => (t._1, t._2 * 2))
        println(rdd1.toDebugString)

        println("--------------------------------------")

        val rdd2 = rdd1.mapValues(_ + 100)
        println(rdd2.toDebugString)

        println("--------------------------------------")

        val rdd3 = rdd2.reduceByKey(_ + _)
        println(rdd3.toDebugString)

        println("--------------------------------------")

        val res = rdd2.collect()
        println(res.mkString("\n"))

    }
複製代碼
(2) ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) MapPartitionsRDD[1] at map at _1.scala:27 []
 |  ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) MapPartitionsRDD[2] at mapValues at _1.scala:32 []
 |  MapPartitionsRDD[1] at map at _1.scala:27 []
 |  ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) ShuffledRDD[3] at reduceByKey at _1.scala:37 []
 +-(2) MapPartitionsRDD[2] at mapValues at _1.scala:32 []
    |  MapPartitionsRDD[1] at map at _1.scala:27 []
    |  ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
複製代碼

4.2 RDD 依賴關係

這裏所謂的依賴關係,其實就是兩個相鄰RDD之間的關係緩存

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ), 2)
        println(rdd.dependencies)

        println("--------------------------------------")

        val rdd1 = rdd.map(t => (t._1, t._2 * 2))
        println(rdd1.dependencies)

        println("--------------------------------------")

        val rdd2 = rdd1.mapValues(_ + 100)
        println(rdd2.dependencies)

        println("--------------------------------------")

        val rdd3 = rdd2.reduceByKey(_ + _)
        println(rdd3.dependencies)

        println("--------------------------------------")

        val res = rdd2.collect()
        println(res.mkString("\n"))

    }
複製代碼
List()
--------------------------------------
List(org.apache.spark.OneToOneDependency@38704ff0)
--------------------------------------
List(org.apache.spark.OneToOneDependency@44de94c3)
--------------------------------------
List(org.apache.spark.ShuffleDependency@2c58dcb1)
--------------------------------------
複製代碼

4.3 RDD 窄依賴

窄依賴表示每個父(上游)RDD的Partition最多被子(下游)RDD的一個Partition使用.markdown

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
複製代碼

4.4 RDD 寬依賴

寬依賴表示同一個父(上游)RDD的Partition被多個子(下游)RDD的Partition依賴,會引發Shuffle.數據結構

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {
  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)
  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)
  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}

複製代碼

4.5 RDD 階段劃分

DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具備方向,不會閉環。例如,DAG記錄了RDD的轉換過程和任務的階段。app

image.png

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
        //建立新階段可能會拋出異常,例如,做業運行在
        //刪除底層HDFS文件的HadoopRDD。
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
    .......
  }
  
  /** * 建立一個與提供的jobId相關聯的ResultStage */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
  
   /** * 獲取或建立給定RDD的父階段列表。新的stage將使用提供的firstJobId建立 */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }
  
  /** * 若是在shuffleIdToMapStage中存在shuffle map stage,則獲取一個shuffle map stage。不然,若是洗牌地圖階段不存在,該方法將建立洗牌地圖階段,以及任何丟失的祖先洗牌地圖階段。 */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage

      case None =>
        //爲全部缺失的祖先洗牌依賴建立階段。
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          //即便getmissing祖宗shuffledependencies只返回shuffle依賴
          //沒有在shuffleIdToMapStage中,當咱們
          //在foreach循環中獲取一個特定的依賴,它被添加到
          // shuffleIdToMapStage經過早期依賴的階段建立過程。看到
          // SPARK-13902獲取更多信息。
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        //最後,爲給定的shuffle依賴建立一個stage。
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }
複製代碼

4.6 RDD 任務劃分

RDD任務切分中間分爲:Application、Job、Stage和Taskide

  1. Application:初始化一個SparkContext即生成一個Application;
  2. Job:一個Action算子就會生成一個Job;
  3. Stage:Stage等於寬依賴(ShuffleDependency)的個數加1;
  4. Task:一個Stage階段中,最後一個RDD的分區個數就是Task的個數。

注意:Application->Job->Stage->Task每一層都是1對n的關係函數

//劃分源碼
val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
  }
  
// Figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

//findMissingPartitions 有兩個實現類

//ShuffleMapStage實現
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
  .findMissingPartitions(shuffleDep.shuffleId)
  .getOrElse(0 until numPartitions)

//ResultStage實現
  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }
複製代碼

5.RDD持久化

5.1 RDD Cache緩存

  • RDD經過Cache或者Persist方法將前面的計算結果緩存,默認狀況下會把數據以緩存在JVM的堆內存中。可是並非這兩個方法被調用時當即緩存,而是觸發後面的action算子時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
    • 緩存有可能丟失,或者存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。
    • Spark會自動對一些Shuffle操做的中間數據作持久化操做(好比:reduceByKey)。這樣作的目的是爲了當一個節點Shuffle失敗了避免從新計算整個輸入。可是,在實際使用的時候,若是想重用數據,仍然建議調用persist或cache。
def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            1, 2, 5, 9
        ), 2)

        val mapRdd = rdd.map(i => {
            println("map--------")
            ("a", i)
        })

        mapRdd.cache() //源碼調用的persist()
        
        //指定存儲級別
        //mapRdd.persist(StorageLevel.DISK_ONLY)

        mapRdd.reduceByKey(_ + _).collect().foreach(println)
        mapRdd.groupByKey().collect().foreach(println)

        sc.stop()


    }
複製代碼
StorageLevel的全部存儲級別
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
複製代碼

5.2 RDD CheckPoint檢查點

  • 所謂的檢查點其實就是經過將RDD中間結果寫入磁盤
  • 因爲血緣依賴過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是檢查點以後有節點出現問題,能夠從檢查點開始重作血緣,減小了開銷。
  • 對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。
def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        //設置檢查點存儲路徑
        sc.setCheckpointDir("./check")

        val rdd = sc.makeRDD(List(
            1, 2, 5, 9
        ), 2)

        val mapRdd = rdd.map(i => {
            println("map--------")
            ("a", i)
        })

        mapRdd.cache()
        // checkpoint會把前面的RDD執行兩次 配合cache()便可只執行一次
        mapRdd.checkpoint()

        mapRdd.reduceByKey(_ + _).collect().foreach(println)
        mapRdd.groupByKey().collect().foreach(println)

        sc.stop()
        
    }
複製代碼

5.3 緩存和檢查點區別

  • Cache緩存只是將數據保存起來,不切斷血緣依賴。Checkpoint檢查點切斷血緣依賴。
  • Cache緩存的數據一般存儲在磁盤、內存等地方,可靠性低。Checkpoint的數據一般存儲在HDFS等容錯、高可用的文件系統,可靠性高。
  • 建議對checkpoint()的RDD使用Cache緩存,這樣checkpoint的job只需從Cache緩存中讀取數據便可,不然須要再從頭計算一次RDD。

6.RDD分區器

Spark目前支持Hash分區和Range分區,和用戶自定義分區。Hash分區爲當前的默認分區。分區器直接決定了RDD中分區的個數、RDD中每條數據通過Shuffle後進入哪一個分區,進而決定了Reduce的個數。oop

  • 只有Key-Value類型的RDD纔有分區器,非Key-Value類型的RDD分區的值是None
  • 每一個RDD的分區ID範圍:0 ~ (numPartitions - 1),決定這個值是屬於那個分區的。

6.1 Hash分區:對於給定的key,計算其hashCode,併除以分區個數取餘

  • HashPartitioner 具體代碼可自行查看

6.2 Range分區:將必定範圍內的數據映射到一個分區中,儘可能保證每一個分區數據均勻,並且分區間有序

  • RangePartitioner 具體代碼可自行查看

6.3 自定義分區

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6), ("c", 6), ("d", 6)
        ),5 )

        val parRDD = rdd.partitionBy(new MyPartitioner)

        val reduceRdd1 = parRDD.reduceByKey((i,j)=>{
            println("reduceRdd1")
            i+j
        })
        val reduceRdd2 = reduceRdd1.reduceByKey((i,j)=>{
            println("reduceRdd2")
            i+j
        })

        reduceRdd2.saveAsTextFile("out1")

        sc.stop()


    }

    /** * 實現把 * a 分到0區 * b 分到1區 * 其餘 分到2區 */
    class MyPartitioner extends Partitioner {
        /** * numPartitions 和 getPartition 是必須重寫的方法 */
        override def numPartitions: Int = 3
        override def getPartition(key: Any): Int = key match {
            case "a" => 0
            case "b" => 1
            case _ => 2
        }

        /** * equals 和 hashCode 可選 */
        override def equals(other: Any): Boolean = other match {
            case h: HashPartitioner =>
                h.numPartitions == numPartitions
            case _ =>
                false
        }
        override def hashCode: Int = numPartitions

    }
複製代碼

7.RDD文件讀取與保存

Spark的數據讀取及數據保存能夠從兩個維度來做區分:文件格式以及文件系統。 文件格式分爲:text文件、csv文件、sequence文件以及Object文件; 文件系統分爲:本地文件系統、HDFS、HBASE以及數據庫。

7.1 text文件

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
        ), 1)

        /** * text文件 */
        rdd.saveAsTextFile("text")
        sc.textFile("text").collect().foreach(println)

        sc.stop()
    }
複製代碼

7.2 sequence文件

SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。在SparkContext中,能夠調用sequenceFile[keyClass, valueClass](path)

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
        ), 1)

        /** * Sequence文件 */
        rdd.saveAsSequenceFile("sequence")
        sc.sequenceFile[String, Int]("sequence").collect().foreach(println)

        sc.stop()
    }
複製代碼

7.3 object對象文件

對象文件是將對象序列化後保存的文件,採用Java的序列化機制。能夠經過objectFile[T: ClassTag](path)函數接收一個路徑,讀取對象文件,返回對應的RDD,也能夠經過調用saveAsObjectFile()實現對對象文件的輸出。由於是序列化因此要指定類型。

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
        ), 1)

        /** * object文件 */
        rdd.saveAsObjectFile("object")
        sc.objectFile[(String, Int)]("object").collect().foreach(println)

        sc.stop()
    }
複製代碼
相關文章
相關標籤/搜索