Spark之RDD容錯原理及四大核心要點

1、Spark RDD容錯原理

  RDD不一樣的依賴關係致使Spark對不一樣的依賴關係有不一樣的處理方式。緩存

  對於寬依賴而言,因爲寬依賴實質是指父RDD的一個分區會對應一個子RDD的多個分區,在此狀況下出現部分計算結果丟失,單一計算丟失的數據沒法達到效果,便採用從新計算該步驟中的全部數據,從而會致使計算數據重複;對於窄依賴而言,因爲窄依賴實質是指父RDD的分區最多被一個子RDD使用,在此狀況下出現部分計算的錯誤,因爲計算結果的數據只與依賴的父RDD的相關數據有關,因此不須要從新計算全部數據,只從新計算出錯部分的數據便可。框架

2、RDD容錯的四大核心要點

  Spark框架層面的容錯機制,主要分爲三大層面(調度層、RDD血統層、Checkpoint層),在這三大層面中包括Spark RDD容錯四大核心要點。分佈式

  (1)Stage輸出失敗,上層調度器DAGScheduler重試。
  (2)Spark計算中,Task內部任務失敗,底層調度器重試。
  (3)RDD Lineage血統中窄依賴、寬依賴計算。
  (4)Checkpoint緩存。oop

1.調度層(包含DAG生成和Task重算兩大核心)

  從調度層面講,錯誤主要出如今兩個方面,分別是在Stage輸出時出錯和在計算時出錯。性能

  1)DAG生成層

  Stage輸出失敗,上層調度器DAGScheduler會進行重試,DAGScheduler.scala的resubmitFailedStages的源碼以下。fetch

  /**
   * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
   * the last fetch failure.
   */
  private[scheduler] def resubmitFailedStages() {
    // 判斷是否存在失敗的Stages
    if (failedStages.size > 0) {
      // Failed stages may be removed by job cancellation, so failed might be empty even if
      // the ResubmitFailedStages event has been scheduled.
      // 失敗的階段能夠經過做業取消刪除,若是ResubmitFailedStages事件已調度,失敗將是空值
      logInfo("Resubmitting failed stages")
      clearCacheLocs()
      // 獲取全部失敗Stage的列表
      val failedStagesCopy = failedStages.toArray
      // 清空failedStages
      failedStages.clear()
      // 對以前獲取全部失敗的Stage,根據jobId排序後逐一重試
      for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
        submitStage(stage)
      }
    }
    submitWaitingStages()
  }

  2)Task計算層

  Spark計算過程當中,計算內部某個Task任務出現失敗,底層調度器會對此Task進行若干次重試(默認4次)。TaskSetManager.scala的handleFailedTask的源碼以下。spa

/**
   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
   * DAG Scheduler.
   */
  def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {

    ......

    if (!isZombie && state != TaskState.KILLED
        && reason.isInstanceOf[TaskFailedReason]
        && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
      assert (null != failureReason)
      // 對失敗的Task的numFailures進行計數加1
      numFailures(index) += 1
      // 判斷失敗的Task計數是否大於設定的最大失敗次數,若是大於,則輸出日誌,並再也不重試
      if (numFailures(index) >= maxTaskFailures) {
        logError("Task %d in stage %s failed %d times; aborting job".format(
          index, taskSet.id, maxTaskFailures))
        abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
          .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
        return
      }
    }
    // 若是運行的Task爲0時,則完成Task步驟
    maybeFinishTaskSet()
  }

2.RDD Lineage血統層容錯

  Spark中RDD採用高度受限的分佈式共享內存,且新的RDD的產生只可以經過其餘RDD上的批量操做來建立,依賴於以RDD的Lineage爲核心的容錯處理,在迭代計算方面比Hadoop快20多倍,同時還能夠在5~7s內交互式地查詢TB級別的數據集。scala

  Spark RDD實現基於Lineage的容錯機制,基於RDD的各項transformation構成了compute chain,在部分計算結果丟失的時候能夠根據Lineage從新恢復計算。日誌

  (1)在窄依賴中,在子RDD的分區丟失,要重算父RDD分區時,父RDD相應分區的全部數據都是子RDD分區的數據,並不存在冗餘計算。
  (2)在寬依賴狀況下,丟失一個子RDD分區,重算的每一個父RDD的每一個分區的全部數據並非都給丟失的子RDD分區用的,會有一部分數據至關於對應的是未丟失的子RDD分區中須要的數據,這樣就會產生冗餘計算開銷和巨大的性能浪費。orm

3.checkpoint層容錯

  Spark checkpoint經過將RDD寫入Disk做檢查點,是Spark lineage容錯的輔助,lineage過長會形成容錯成本太高,這時在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。

  checkpoint主要適用於如下兩種狀況:

  (1)DAG中的Lineage過長,若是重算,開銷太大,如PageRank、ALS等。
  (2)尤爲適合在寬依賴上做checkpoint,這個時候就能夠避免爲Lineage從新計算而帶來的冗餘計算。

相關文章
相關標籤/搜索