原創文章,轉載請務必將下面這段話置於文章開頭處。
本文轉發自 技術世界, 原文連接 http://www.jasongj.com/spark/committer/本文所述內容均基於 2018年9月17日 Spark 最新 Release 2.3.1 版本,以及 hadoop-2.6.0-cdh-5.4.4json
Spark 輸出數據到 HDFS 時,須要解決以下問題:數據結構
本文經過 Local mode 執行以下 Spark 程序詳解 commit 原理app
sparkContext.textFile("/json/input.zstd") .map(_.split(",")) .saveAsTextFile("/jason/test/tmp")
在詳述 commit 原理前,須要說明幾個述語oop
在本文中,會使用以下縮寫性能
在啓動 Job 以前,Driver 首先經過 FileOutputFormat 的 checkOutputSpecs 方法檢查輸出目錄是否已經存在。若已存在,則直接拋出 FileAlreadyExistsException
spa
Job 開始前,由 Driver(本例使用 local mode,所以由 main 線程執行)調用 FileOuputCommitter.setupJob 建立 Application Attempt 目錄,即 ${output.dir.root}/_temporary/${appAttempt}
線程
由各 Task 執行 FileOutputCommitter.setupTask 方法(本例使用 local mode,所以由 task 線程執行)。該方法不作任何事情,由於 Task 臨時目錄由 Task 按需建立。
<img width="80%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Setup task"/>scala
本例中,Task 寫數據須要經過 TextOutputFormat 的 getRecordWriter 方法建立 LineRecordWriter。而建立前須要經過 FileOutputFormat.getTaskOutputPath設置 Task 輸出路徑,即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
。該 Task Attempt 全部數據均寫在該目錄下的文件內
3d
Task 執行數據寫完後,經過 FileOutputCommitter.needsTaskCommit 方法檢查是否須要 commit 它寫在 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
下的數據。code
檢查依據是 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
目錄是否存在
若是須要 commit,而且開啓了 Output commit coordination,還須要經過 RPC 由 Driver 側的 OutputCommitCoordinator 判斷該 Task Attempt 是否能夠 commit
之因此須要由 Driver 側的 CommitCoordinator 判斷是否能夠 commit,是由於可能因爲 speculation 或者其它緣由(如以前的 TaskAttemp 未被 Kill 成功)存在同一 Task 的多個 Attemp 同時寫數據且都申請 commit 的狀況。
當申請 commitTask 的 TaskAttempt 爲失敗的 Attempt,則直接拒絕
若該 TaskAttempt 成功,而且 CommitCoordinator 未容許過該 Task 的其它 Attempt 的 commit 請求,則容許該 TaskAttempt 的 commit 請求
若 CommitCoordinator 以前已容許過該 TaskAttempt 的 commit 請求,則繼續贊成該 TaskAttempt 的 commit 請求,即 CommitCoordinator 對該申請的處理是冪等的。
若該 TaskAttempt 成功,且 CommitCoordinator 以前已容許該 Task 的其它 Attempt 的 commit 請求,則直接拒絕當前 TaskAttempt 的 commit 請求
OutputCommitCoordinator 爲了實現上述功能,爲每一個 ActiveStage 維護一個以下 StageState
private case class StageState(numPartitions: Int) { val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER) val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() }
該數據結構中,保存了每一個 Task 被容許 commit 的 TaskAttempt。默認值均爲 NO_AUTHORIZED_COMMITTER
同時,保存了每一個 Task 的全部失敗的 Attempt
當 TaskAttempt 被容許 commit 後,Task (本例因爲使用 local model,所以由 task 線程執行)會經過以下方式 commitTask。
當 mapreduce.fileoutputcommitter.algorithm.version
的值爲 1 (默認值)時,Task 將 taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
重命令爲 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
若 mapreduce.fileoutputcommitter.algorithm.version
的值爲 2,直接將taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
內的全部文件移動到 outputPath 即 ${output.dir.root}/
當全部 Task 都執行成功後,由 Driver (本例因爲使用 local model,故由 main 線程執行)執行 FileOutputCommitter.commitJob
若 mapreduce.fileoutputcommitter.algorithm.version
的值爲 1,則由 Driver 單線程遍歷全部 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
,並將其下全部文件移動到 finalOutput 即 ${output.dir.root}
下
若 mapreduce.fileoutputcommitter.algorithm.version
的值爲 2,則無須移動任何文件。由於全部 Task 的輸出文件已在 commitTask 內被移動到 finalOutput 即 ${output.dir.root}
內
全部 commit 過的 Task 輸出文件移動到 finalOutput 即 ${output.dir.root}
後,Driver 經過 cleanupJob 刪除 ${output.dir.root}/_temporary/
下全部內容
<img width="70%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Cleanup job"/>
上文所述的 commitTask 與 commitJob 機制,保證了一次 Application Attemp 中不一樣 Task 的不一樣 Attemp 在 commit 時的數據一致性
而當整個 Application retry 時,在以前的 Application Attemp 中已經成功 commit 的 Task 無須從新執行,其數據可直接恢復
恢復 Task 時,先獲取上一次的 Application Attempt,以及對應的 committedTaskPath,即 ${output.dir.root}/_temporary/${preAppAttempt}/${taskAttempt}
若 mapreduce.fileoutputcommitter.algorithm.version
的值爲 1,而且 preCommittedTaskPath 存在(說明在以前的 Application Attempt 中該 Task 已被 commit 過),則直接將 preCommittedTaskPath 重命名爲 committedTaskPath
若 mapreduce.fileoutputcommitter.algorithm.version
的值爲 2,無須恢復任何數據,由於在以前 Application Attempt 中 commit 過的 Task 的數據已經在 commitTask 中被移動到 ${output.dir.root}
中
停止 Task 時,由 Task 調用 FileOutputCommitter.abortTask
方法刪除 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
停止 Job 由 Driver 調用 FileOutputCommitter.abortJob
方法完成。該方法經過 FileOutputCommitter.cleanupJob
方法刪除 ${output.dir.root}/_temporary
V1 committer(即 mapreduce.fileoutputcommitter.algorithm.version
的值爲 1),commit 過程以下
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
移動到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
移動到 ${output.dir.root}
,而後建立 _SUCCESS
標記文件${output.dir.root}/_temporary/${preAppAttempt}/${preTaskAttempt}
移動到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
V2 committer(即 mapreduce.fileoutputcommitter.algorithm.version
的值爲 2),commit 過程以下
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
移動到 ${output.dir.root}
_SUCCESS
標記文件V1 在 Job 執行結束後,在 Driver 端經過 commitJob 方法,單線程串行將全部 Task 的輸出文件移動到輸出根目錄。移動以文件爲單位,當 Task 個數較多(大 Job,或者小文件引發的大量小 Task),Name Node RPC 較慢時,該過程耗時較久。在實踐中,可能所以發生全部 Task 均執行結束,但 Job 不結束的問題。甚至 commitJob 耗時比 全部 Task 執行時間還要長
而 V2 在 Task 結束後,由 Task 在 commitTask 方法內,將本身的數據文件移動到輸出根目錄。一方面,Task 結束時即移動文件,不需等待 Job 結束才移動文件,即文件移動更早發起,也更早結束。另外一方面,不一樣 Task 間並行移動文件,極大縮短了整個 Job 內全部 Task 的文件移動耗時
V1 只有 Job 結束,纔會將數據文件移動到輸出根目錄,纔會對外可見。在此以前,全部文件均在 ${output.dir.root}/_temporary/${appAttempt}
及其子文件內,對外不可見。
當 commitJob 過程耗時較短時,其失敗的可能性較小,可認爲 V1 的 commit 過程是兩階段提交,要麼全部 Task 都 commit 成功,要麼都失敗。
而因爲上文提到的問題, commitJob 過程可能耗時較久,若是在此過程當中,Driver 失敗,則可能發生部分 Task 數據被移動到 ${output.dir.root} 對外可見,部分 Task 的數據未及時移動,對外不可見的問題。此時發生了數據不一致性的問題
V2 當 Task 結束時,當即將數據移動到 ${output.dir.root},當即對外可見。若是 Application 執行過程當中失敗了,已 commit 的 Task 數據仍然對外可見,而失敗的 Task 數據或未被 commit 的 Task 數據對外不可見。也即 V2 更易發生數據一致性問題