Spark CommitCoordinator 保證數據一致性

原創文章,轉載請務必將下面這段話置於文章開頭處。
本文轉發自 技術世界原文連接  http://www.jasongj.com/spark/committer/

本文所述內容均基於 2018年9月17日 Spark 最新 Release 2.3.1 版本,以及 hadoop-2.6.0-cdh-5.4.4json

概述

Spark 輸出數據到 HDFS 時,須要解決以下問題:數據結構

  • 因爲多個 Task 同時寫數據到 HDFS,如何保證要麼全部 Task 寫的全部文件要麼同時對外可見,要麼同時對外不可見,即保證數據一致性
  • 同一 Task 可能由於 Speculation 而存在兩個徹底相同的 Task 實例寫相同的數據到 HDFS中,如何保證只有一個 commit 成功
  • 對於大 Job(如具備幾萬甚至幾十萬 Task),如何高效管理全部文件

commit 原理

本文經過 Local mode 執行以下 Spark 程序詳解 commit 原理app

sparkContext.textFile("/json/input.zstd")
  .map(_.split(","))
  .saveAsTextFile("/jason/test/tmp")

在詳述 commit 原理前,須要說明幾個述語oop

  • Task,即某個 Application 的某個 Job 內的某個 Stage 的一個 Task
  • TaskAttempt,Task 每次執行都視爲一個 TaskAttempt。對於同一個 Task,可能同時存在多個 TaskAttemp
  • Application Attempt,即 Application 的一次執行

在本文中,會使用以下縮寫性能

  • ${output.dir.root} 即輸出目錄根路徑
  • ${appAttempt} 即 Application Attempt ID,爲整型,從 0 開始
  • ${taskAttemp} 即 Task Attetmp ID,爲整型,從 0 開始

檢查 Job 輸出目錄

在啓動 Job 以前,Driver 首先經過 FileOutputFormat 的 checkOutputSpecs 方法檢查輸出目錄是否已經存在。若已存在,則直接拋出 FileAlreadyExistsException
Check output pathspa

Driver執行setupJob

Job 開始前,由 Driver(本例使用 local mode,所以由 main 線程執行)調用 FileOuputCommitter.setupJob 建立 Application Attempt 目錄,即 ${output.dir.root}/_temporary/${appAttempt}
Setup job線程

Task執行setupTask

由各 Task 執行 FileOutputCommitter.setupTask 方法(本例使用 local mode,所以由 task 線程執行)。該方法不作任何事情,由於 Task 臨時目錄由 Task 按需建立。
<img width="80%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Setup task"/>scala

按需建立 Task 目錄

本例中,Task 寫數據須要經過 TextOutputFormatgetRecordWriter 方法建立 LineRecordWriter。而建立前須要經過 FileOutputFormat.getTaskOutputPath設置 Task 輸出路徑,即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}。該 Task Attempt 全部數據均寫在該目錄下的文件內
Create task output file3d

檢查是否須要 commit

Task 執行數據寫完後,經過 FileOutputCommitter.needsTaskCommit 方法檢查是否須要 commit 它寫在 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 下的數據。code

檢查依據是 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 目錄是否存在
Need commmit task

若是須要 commit,而且開啓了 Output commit coordination,還須要經過 RPC 由 Driver 側的 OutputCommitCoordinator 判斷該 Task Attempt 是否能夠 commit
Need commmit task detail

之因此須要由 Driver 側的 CommitCoordinator 判斷是否能夠 commit,是由於可能因爲 speculation 或者其它緣由(如以前的 TaskAttemp 未被 Kill 成功)存在同一 Task 的多個 Attemp 同時寫數據且都申請 commit 的狀況。

CommitCoordinator

當申請 commitTask 的 TaskAttempt 爲失敗的 Attempt,則直接拒絕

若該 TaskAttempt 成功,而且 CommitCoordinator 未容許過該 Task 的其它 Attempt 的 commit 請求,則容許該 TaskAttempt 的 commit 請求

若 CommitCoordinator 以前已容許過該 TaskAttempt 的 commit 請求,則繼續贊成該 TaskAttempt 的 commit 請求,即 CommitCoordinator 對該申請的處理是冪等的。

若該 TaskAttempt 成功,且 CommitCoordinator 以前已容許該 Task 的其它 Attempt 的 commit 請求,則直接拒絕當前 TaskAttempt 的 commit 請求
Coordinator handle request

OutputCommitCoordinator 爲了實現上述功能,爲每一個 ActiveStage 維護一個以下 StageState

private case class StageState(numPartitions: Int) {
  val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
  val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
  }

該數據結構中,保存了每一個 Task 被容許 commit 的 TaskAttempt。默認值均爲 NO_AUTHORIZED_COMMITTER

同時,保存了每一個 Task 的全部失敗的 Attempt

commitTask

當 TaskAttempt 被容許 commit 後,Task (本例因爲使用 local model,所以由 task 線程執行)會經過以下方式 commitTask。

mapreduce.fileoutputcommitter.algorithm.version 的值爲 1 (默認值)時,Task 將 taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 重命令爲 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
Commit task v1

mapreduce.fileoutputcommitter.algorithm.version 的值爲 2,直接將taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 內的全部文件移動到 outputPath 即 ${output.dir.root}/
Commit task v2

commitJob

當全部 Task 都執行成功後,由 Driver (本例因爲使用 local model,故由 main 線程執行)執行 FileOutputCommitter.commitJob

mapreduce.fileoutputcommitter.algorithm.version 的值爲 1,則由 Driver 單線程遍歷全部 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt},並將其下全部文件移動到 finalOutput 即 ${output.dir.root}
Commit job v1

mapreduce.fileoutputcommitter.algorithm.version 的值爲 2,則無須移動任何文件。由於全部 Task 的輸出文件已在 commitTask 內被移動到 finalOutput 即 ${output.dir.root}
Commit job v2

全部 commit 過的 Task 輸出文件移動到 finalOutput 即 ${output.dir.root} 後,Driver 經過 cleanupJob 刪除 ${output.dir.root}/_temporary/ 下全部內容
<img width="70%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Cleanup job"/>

recoverTask

上文所述的 commitTask 與 commitJob 機制,保證了一次 Application Attemp 中不一樣 Task 的不一樣 Attemp 在 commit 時的數據一致性

而當整個 Application retry 時,在以前的 Application Attemp 中已經成功 commit 的 Task 無須從新執行,其數據可直接恢復

恢復 Task 時,先獲取上一次的 Application Attempt,以及對應的 committedTaskPath,即 ${output.dir.root}/_temporary/${preAppAttempt}/${taskAttempt}

mapreduce.fileoutputcommitter.algorithm.version 的值爲 1,而且 preCommittedTaskPath 存在(說明在以前的 Application Attempt 中該 Task 已被 commit 過),則直接將 preCommittedTaskPath 重命名爲 committedTaskPath

mapreduce.fileoutputcommitter.algorithm.version 的值爲 2,無須恢復任何數據,由於在以前 Application Attempt 中 commit 過的 Task 的數據已經在 commitTask 中被移動到 ${output.dir.root}
Recover task

abortTask

停止 Task 時,由 Task 調用 FileOutputCommitter.abortTask 方法刪除 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
Abort task

abortJob

停止 Job 由 Driver 調用 FileOutputCommitter.abortJob 方法完成。該方法經過 FileOutputCommitter.cleanupJob 方法刪除 ${output.dir.root}/_temporary

總結

V1 vs. V2 committer 過程

V1 committer(即 mapreduce.fileoutputcommitter.algorithm.version 的值爲 1),commit 過程以下

  • Task 線程將 TaskAttempt 數據寫入 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask 由 Task 線程將 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 移動到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
  • commitJob 由 Driver 單線程依次將全部 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt} 移動到 ${output.dir.root},而後建立 _SUCCESS 標記文件
  • recoverTask 由 Task 線程將 ${output.dir.root}/_temporary/${preAppAttempt}/${preTaskAttempt} 移動到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}

V2 committer(即 mapreduce.fileoutputcommitter.algorithm.version 的值爲 2),commit 過程以下

  • Task 線程將 TaskAttempt 數據寫入 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask 由 Task 線程將 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 移動到 ${output.dir.root}
  • commitJob 建立 _SUCCESS 標記文件
  • recoverTask 無需任何操做

V1 vs. V2 committer 性能對比

V1 在 Job 執行結束後,在 Driver 端經過 commitJob 方法,單線程串行將全部 Task 的輸出文件移動到輸出根目錄。移動以文件爲單位,當 Task 個數較多(大 Job,或者小文件引發的大量小 Task),Name Node RPC 較慢時,該過程耗時較久。在實踐中,可能所以發生全部 Task 均執行結束,但 Job 不結束的問題。甚至 commitJob 耗時比 全部 Task 執行時間還要長

而 V2 在 Task 結束後,由 Task 在 commitTask 方法內,將本身的數據文件移動到輸出根目錄。一方面,Task 結束時即移動文件,不需等待 Job 結束才移動文件,即文件移動更早發起,也更早結束。另外一方面,不一樣 Task 間並行移動文件,極大縮短了整個 Job 內全部 Task 的文件移動耗時

V1 vs. V2 committer 一致性對比

V1 只有 Job 結束,纔會將數據文件移動到輸出根目錄,纔會對外可見。在此以前,全部文件均在 ${output.dir.root}/_temporary/${appAttempt} 及其子文件內,對外不可見。

當 commitJob 過程耗時較短時,其失敗的可能性較小,可認爲 V1 的 commit 過程是兩階段提交,要麼全部 Task 都 commit 成功,要麼都失敗。

而因爲上文提到的問題, commitJob 過程可能耗時較久,若是在此過程當中,Driver 失敗,則可能發生部分 Task 數據被移動到 ${output.dir.root} 對外可見,部分 Task 的數據未及時移動,對外不可見的問題。此時發生了數據不一致性的問題

V2 當 Task 結束時,當即將數據移動到 ${output.dir.root},當即對外可見。若是 Application 執行過程當中失敗了,已 commit 的 Task 數據仍然對外可見,而失敗的 Task 數據或未被 commit 的 Task 數據對外不可見。也即 V2 更易發生數據一致性問題

相關文章
相關標籤/搜索