Spark之RDD彈性特性

  RDD做爲彈性分佈式數據集,它的彈性具體體如今如下七個方面。node

1.自動進行內存和磁盤數據存儲的切換

  Spark會優先把數據放到內存中,若是內存實在放不下,會放到磁盤裏面,不但能計算內存放下的數據,也能計算內存放不下的數據。若是實際數據大於內存,則要考慮數據放置策略和優化算法。當應用程序內存不足時,Spark應用程序將數據自動從內存存儲切換到磁盤存儲,以保障其高效運行。web

2.基於Lineage(血統)的高效容錯機制

  Lineage是基於Spark RDD的依賴關係來完成的(依賴分爲窄依賴和寬依賴兩種形態),每一個操做只關聯其父操做,各個分片的數據之間互不影響,出現錯誤時只要恢復單個Split的特定部分便可。常規容錯有兩種方式:一個是數據檢查點;另外一個是記錄數據的更新。數據檢查點的基本工做方式,就是經過數據中心的網絡連接不一樣的機器,而後每次操做的時候都要複製數據集,就至關於每次都有一個複製,複製是要經過網絡傳輸的,網絡帶寬就是分佈式的瓶頸,對存儲資源也是很大的消耗。記錄數據更新就是每次數據變化了就記錄一下,這種方式不須要從新複製一份數據,可是比較複雜,消耗性能。Spark的RDD經過記錄數據更新的方式爲什麼很高效?由於① RDD是不可變的且Lazy;② RDD的寫操做是粗粒度的。可是,RDD讀操做既能夠是粗粒度的,也能夠是細粒度的。算法

3.Task若是失敗,會自動進行特定次數的重試

  默認重試次數爲4次。TaskSchedulerImpl的源碼以下所示:apache

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging
{
  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))

  val conf = sc.conf

  TaskSchedulerImpl是底層的任務調度接口TaskScheduler的實現,這些Schedulers從每個Stage中的DAGScheduler中獲取TaskSet,運行它們,嘗試是否有故障。DAGScheduler是高層調度,它計算每一個Job的Stage的DAG,而後提交Stage,用TaskSets的形式啓動底層TaskScheduler調度在集羣中運行。緩存

4.Stage若是失敗,會自動進行特定次數的重試

  這樣,Stage對象能夠跟蹤多個StageInfo(存儲SparkListeners監聽到的Stage的信息,將Stage信息傳遞給Listeners或web UI)。默認重試次數爲4次,且能夠直接運行計算失敗的階段,只計算失敗的數據分片,Stage的源碼以下所示:網絡

private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to.屬於這個工做集的Stage */
  val jobIds = new HashSet[Int]

  val pendingPartitions = new HashSet[Int]

  /** The ID to use for the next new attempt for this stage.用於此Stage的下一個新attempt的ID */
  private var nextAttemptId: Int = 0

  val name: String = callSite.shortForm
  val details: String = callSite.longForm

  /**
   * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   * 最新的[StageInfo] Object指針,須要被初始化,任何attempts都是被創造出來的,由於DAGScheduler使用
   * StageInfo告訴SparkListeners工做什麼時候開始(即發生前的任何階段已經建立)
   */
  private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

  /**
   * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
   * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   * 設置Stage attempt IDs當失敗時能夠讀取失敗信息,跟蹤這些失敗,爲了不無休止的重複失敗
   * 跟蹤每一次attempt,以便避免記錄重複故障,若是從同一stage建立多任務失敗(SPARK-5945)
   */
  private val fetchFailedAttemptIds = new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    fetchFailedAttemptIds.clear()
  }

  /**
   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
   * 檢查是否應該停止因爲連續屢次讀取失敗的stage
   * This method updates the running set of failed stage attempts and returns
   * true if the number of failures exceeds the allowable number of failures.
   * 若是失敗的次數超過容許的次數,此方法更新失敗stage attempts和返回的運行集
   */
  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
    fetchFailedAttemptIds.add(stageAttemptId)
    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  // 在stage中建立一個新的attempt
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    val metrics = new TaskMetrics
    metrics.register(rdd.sparkContext)
    _latestInfo = StageInfo.fromStage(
      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
    nextAttemptId += 1
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  // 返回當前stage中最新的stageinfo
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  // 返回須要從新計算的分區標識的序列
  def findMissingPartitions(): Seq[Int]
}

private[scheduler] object Stage {
  // The number of consecutive failures allowed before a stage is aborted
  // 容許一個stage停止的連續故障數
  val MAX_CONSECUTIVE_FETCH_FAILURES = 4
}

  Stage是Spark Job運行時具備相同邏輯功能和並行計算任務的一個基本單元。Stage中全部的任務都依賴一樣的Shuffle,每一個DAG任務經過DAGScheduler在Stage的邊界處發生Shuffle造成Stage,而後DAGScheduler運行這些階段的拓撲順序。每一個Stage均可能是ShuffleMapStage,若是是ShuffleMapStage,則跟蹤每一個輸出節點(nodes)上的輸出文件分區,它的任務結果是輸入其餘的Stage(s),或者輸入一個ResultStage,若輸入一個ResultStage,這個ResultStage的任務直接在這個RDD上運行計算這個Spark Action的函數(如count()、 save()等),並生成shuffleDep等字段描述Stage和生成變量,如outputLocs和numAvailableOutputs,爲跟蹤map輸出作準備。每一個Stage會有firstjobid,肯定第一個提交Stage的Job,使用FIFO調度時,會使得其前面的Job先行計算或快速恢復(失敗時)。 app

  ShuffleMapStage是DAG產生數據進行Shuffle的中間階段,它發生在每次Shuffle操做以前,可能包含多個Pipelined操做,ResultStage階段捕獲函數在RDD的分區上運行Action算子計算結果,有些Stage不是運行在RDD的全部的分區上,例如,first()、lookup()等。SparkListener是Spark調度器的事件監聽接口。注意,這個接口隨着Spark版本的不一樣會發生變化。less

5.checkpoint和persist(檢查點和持久化),可主動或被動觸發

  checkpoint是對RDD進行的標記,會產生一系列的文件,且全部父依賴都會被刪除,是整個依賴(Lineage)的終點。checkpoint也是Lazy級別的。persist後RDD工做時每一個工做節點都會把計算的分片結果保存在內存或磁盤中,下一次若是對相同的RDD進行其餘的Action計算,就能夠重用。dom

  由於用戶只與Driver Program交互,所以只能用RDD中的cache()方法去cache用戶能看到的RDD。所謂能看到,是指通過Transformation算子處理後生成的RDD,而某些在Transformation算子中Spark本身生成的RDD是不能被用戶直接cache的。例如,reduceByKey()中會生成的ShuffleRDD、MapPartitionsRDD是不能被用戶直接cache的。在Driver Program中設定RDD.cache()後,系統怎樣進行cache?首先,在計算RDD的Partition以前就去判斷Partition要不要被cache,若是要被cache,先將Partition計算出來,而後cache到內存。cache可以使用memory,若是寫到HDFS磁盤的話,就要檢查checkpoint。調用RDD.cache()後,RDD就變成persistRDD了,其StorageLevel爲MEMORY_ONLY,persistRDD會告知Driver說本身是須要被persist的。此時會調用RDD.iterator()。 RDD.scala的iterator()的源碼以下:分佈式

  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ''not'' be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   * RDD的內部方法,將從合適的緩存中讀取,不然計算它。這不該該被用戶直接使用,但可用於實現自定義的子RDD
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

  當RDD.iterator()被調用的時候,也就是要計算該RDD中某個Partition的時候,會先去cacheManager那裏獲取一個blockId,而後去BlockManager裏匹配該Partition是否被checkpoint了,若是是,那就不用計算該Partition了,直接從checkpoint中讀取該Partition的全部records放入ArrayBuffer裏面。若是沒有被checkpoint過,先將Partition計算出來,而後將其全部records放到cache中。整體來講,當RDD會被重複使用(不能太大)時,RDD須要cache。Spark自動監控每一個節點緩存的使用狀況,利用最近最少使用原則刪除老舊的數據。若是想手動刪除RDD,可使用RDD.unpersist()方法。  

  此外,能夠利用不一樣的存儲級別存儲每個被持久化的RDD。例如,它容許持久化集合到磁盤上,將集合做爲序列化的Java對象持久化到內存中、在節點間複製集合或者存儲集合到Alluxio中。能夠經過傳遞一個StorageLevel對象給persist()方法設置這些存儲級別。cache()方法使用默認的存儲級別-StorageLevel.MEMORY_ONLY。RDD根據useDisk、useMemory、 useOffHeap、deserialized、replication 5個參數的組合提供了經常使用的12種基本存儲,完整的存儲級別介紹以下。StorageLevel.scala的源碼以下:

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

  StorageLevel是控制存儲RDD的標誌,每一個StorageLevel記錄RDD是否使用memory,或使用ExternalBlockStore存儲,若是RDD脫離了memory或ExternalBlockStore,是否扔掉RDD,是否保留數據在內存中的序列化格式,以及是否複製多個節點的RDD分區。另外,org.apache.spark.storage.StorageLevel是單實例(singleton)對象,包含了一些靜態常量和經常使用的存儲級別,且可用singleton對象工廠方法StorageLevel(...)建立定製化的存儲級別。

  Spark的多個存儲級別意味着在內存利用率和CPU利用率間的不一樣權衡。推薦經過下面的過程選擇一個合適的存儲級別:①若是RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。由於這是CPU利用率最高的選項,會使RDD上的操做盡量地快。②若是不適合用默認級別,就選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提升對象的空間使用率,可是仍可以至關快地訪問。③除非算子計算RDD花費較大或者須要過濾大量的數據,不要將RDD存儲到磁盤上,不然重複計算一個分區,就會和從磁盤上讀取數據同樣慢。④若是但願更快地恢復錯誤,能夠利用replicated存儲機制,全部的存儲級別均可以經過replicated計算丟失的數據來支持完整的容錯。另外,replicated的數據能在RDD上繼續運行任務,而不須要重複計算丟失的數據。在擁有大量內存的環境中或者多應用程序的環境中,Off_Heap(將對象從堆中脫離出來序列化,而後存儲在一大塊內存中,這就像它存儲到磁盤上同樣,但它仍然在RAM內存中。Off_Heap對象在這種狀態下不能直接使用,須進行序列化及反序列化。序列化和反序列化可能會影響性能,Off_Heap堆外內存不須要進行GC)。Off_Heap具備以下優點:Off_Heap運行多個執行者共享的Alluxio中相同的內存池,顯著地減小GC。若是單個的Executor崩潰,緩存的數據也不會丟失。

6.數據調度彈性,DAGScheduler、TASKScheduler和資源管理無關

  Spark將執行模型抽象爲通用的有向無環圖計劃(DAG),這能夠將多Stage的任務串聯或並行執行,從而不須要將Stage中間結果輸出到HDFS中,當發生節點運行故障時,可有其餘可用節點代替該故障節點運行。

7.數據分片的高度彈性(coalesce)

  Spark進行數據分片時,默認將數據放在內存中,若是內存放不下,一部分會放在磁盤上進行保存。

  RDD.scala的coalesce算子代碼以下:

  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      // 從隨機分區開始,將元素均勻分佈在輸出分區上
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          // key的哈希碼是key自己,HashPartitioner將它與總分區數進行取模運算
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      // 包括一個shuffle步驟,使咱們的上游任務仍然是分佈式的
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

  例如,在計算的過程當中,會產生不少的數據碎片,這時產生一個Partition可能會很是小,若是一個Partition很是小,每次都會消耗一個線程去處理,這時可能會下降它的處理效率,須要考慮把許多小的Partition合併成一個較大的Partition去處理,這樣會提升效率。另外,有可能內存不是那麼多,而每一個Partition的數據Block比較大,這時須要考慮把Partition變成更小的數據分片,這樣讓Spark處理更多的批次,可是不會出現OOM。  

相關文章
相關標籤/搜索