RDD不一樣的依賴關係致使Spark對不一樣的依賴關係有不一樣的處理方式。緩存
對於寬依賴而言,因爲寬依賴實質是指父RDD的一個分區會對應一個子RDD的多個分區,在此狀況下出現部分計算結果丟失,單一計算丟失的數據沒法達到效果,便採用從新計算該步驟中的全部數據,從而會致使計算數據重複;對於窄依賴而言,因爲窄依賴實質是指父RDD的分區最多被一個子RDD使用,在此狀況下出現部分計算的錯誤,因爲計算結果的數據只與依賴的父RDD的相關數據有關,因此不須要從新計算全部數據,只從新計算出錯部分的數據便可。框架
Spark框架層面的容錯機制,主要分爲三大層面(調度層、RDD血統層、Checkpoint層),在這三大層面中包括Spark RDD容錯四大核心要點。分佈式
(1)Stage輸出失敗,上層調度器DAGScheduler重試。
(2)Spark計算中,Task內部任務失敗,底層調度器重試。
(3)RDD Lineage血統中窄依賴、寬依賴計算。
(4)Checkpoint緩存。oop
從調度層面講,錯誤主要出如今兩個方面,分別是在Stage輸出時出錯和在計算時出錯。性能
Stage輸出失敗,上層調度器DAGScheduler會進行重試,DAGScheduler.scala的resubmitFailedStages的源碼以下。fetch
/** * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since * the last fetch failure. */ private[scheduler] def resubmitFailedStages() { // 判斷是否存在失敗的Stages if (failedStages.size > 0) { // Failed stages may be removed by job cancellation, so failed might be empty even if // the ResubmitFailedStages event has been scheduled. // 失敗的階段能夠經過做業取消刪除,若是ResubmitFailedStages事件已調度,失敗將是空值 logInfo("Resubmitting failed stages") clearCacheLocs() // 獲取全部失敗Stage的列表 val failedStagesCopy = failedStages.toArray // 清空failedStages failedStages.clear() // 對以前獲取全部失敗的Stage,根據jobId排序後逐一重試 for (stage <- failedStagesCopy.sortBy(_.firstJobId)) { submitStage(stage) } } submitWaitingStages() }
Spark計算過程當中,計算內部某個Task任務出現失敗,底層調度器會對此Task進行若干次重試(默認4次)。TaskSetManager.scala的handleFailedTask的源碼以下。spa
/** * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the * DAG Scheduler. */ def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) { ...... if (!isZombie && state != TaskState.KILLED && reason.isInstanceOf[TaskFailedReason] && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) { assert (null != failureReason) // 對失敗的Task的numFailures進行計數加1 numFailures(index) += 1 // 判斷失敗的Task計數是否大於設定的最大失敗次數,若是大於,則輸出日誌,並再也不重試 if (numFailures(index) >= maxTaskFailures) { logError("Task %d in stage %s failed %d times; aborting job".format( index, taskSet.id, maxTaskFailures)) abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" .format(index, taskSet.id, maxTaskFailures, failureReason), failureException) return } } // 若是運行的Task爲0時,則完成Task步驟 maybeFinishTaskSet() }
Spark中RDD採用高度受限的分佈式共享內存,且新的RDD的產生只可以經過其餘RDD上的批量操做來建立,依賴於以RDD的Lineage爲核心的容錯處理,在迭代計算方面比Hadoop快20多倍,同時還能夠在5~7s內交互式地查詢TB級別的數據集。scala
Spark RDD實現基於Lineage的容錯機制,基於RDD的各項transformation構成了compute chain,在部分計算結果丟失的時候能夠根據Lineage從新恢復計算。日誌
(1)在窄依賴中,在子RDD的分區丟失,要重算父RDD分區時,父RDD相應分區的全部數據都是子RDD分區的數據,並不存在冗餘計算。
(2)在寬依賴狀況下,丟失一個子RDD分區,重算的每一個父RDD的每一個分區的全部數據並非都給丟失的子RDD分區用的,會有一部分數據至關於對應的是未丟失的子RDD分區中須要的數據,這樣就會產生冗餘計算開銷和巨大的性能浪費。orm
Spark checkpoint經過將RDD寫入Disk做檢查點,是Spark lineage容錯的輔助,lineage過長會形成容錯成本太高,這時在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。
checkpoint主要適用於如下兩種狀況:
(1)DAG中的Lineage過長,若是重算,開銷太大,如PageRank、ALS等。
(2)尤爲適合在寬依賴上做checkpoint,這個時候就能夠避免爲Lineage從新計算而帶來的冗餘計算。