Delta Lake源碼分析

Delta Lake源碼分析

Delta Lake元數據

delta lake 包含Protocol、Metadata、FileAction(AddFile、RemoveFile)、CommitInfo和SetTransaction這幾種元數據action。json

  1. Protocol:這是delta lake自身的版本管理,通常只出如今第一次的commit日誌裏(以後版本升級應該也會有);
  2. Metadata:存儲delta表的schema信息,第一次commit和每次修改schema時出現,以最後一次出現的爲準;
  3. FileAction:文件的相關操做,delta lake的文件操做只有添加文件和刪除文件;
  4. CommitInfo:保存關於本次更改的原始信息,如修改時間,操做類型,讀取的數據版本等;
  5. SetTransaction:設置application的提交版本,通常用於流式計算的一致性控制(exactlyOnce)。
//初始的commit log會包含protocol和metaData的信息
{"commitInfo":{"timestamp":1576480709055,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"fe0948b9-8253-4942-9e28-3a89321a004d","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"azkaban_tag\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"project_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"flow_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"job_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"application_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"queue_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"master_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1576480707164}}
{"add":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","partitionValues":{},"size":1443338,"modificationTime":1576480711000,"dataChange":true}}

//以後的commit log會記錄下當前操做的信息
{"commitInfo":{"timestamp":1576481270646,"operation":"DELETE","operationParameters":{"predicate":"[\"(`master_name` = 'mob_analyse')\"]"},"readVersion":0,"isBlindAppend":false}}
{"remove":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","deletionTimestamp":1576481270643,"dataChange":true}}
{"add":{"path":"part-00000-d6431884-390d-4837-865c-f6e52f0e2cf5-c000.snappy.parquet","partitionValues":{},"size":1430267,"modificationTime":1576481273000,"dataChange":true}}

delta lake元數據管理

snapshot生成

當存在checkpoint文件時,DeltaLog類的currentSnapshot會根據checkpoint和以後的json日誌來計算快照。session

  1. 經過loadMetadataFromFile()方法讀取_last_checkpoint文件得到最新的checkpoint路徑;
  2. 經過LogStore.listFrom()方法得到checkpoint以後版本的delta log文件;
  3. 使用verifyDeltaVersions方法驗證delta log的文件是不是連續的(日誌版本必須是連續的,每一個commit log都須要被計算);
  4. 解析並聚合checkpoint和delta log爲Seq[DeltaLogFileIndex],而後new Snapshot();
  5. Snapshot裏的stateReconstruction會使用InMemoryLogReplay來計算日誌中的各類action,得到最終的狀態信息。

當沒有checkpoint文件時,經過DeltaLog類的update方法來計算快照。併發

  1. 當不存在_last_checkpoint文件時,new一個版本號爲-1的Snapshot;
  2. 檢測到currentSnapshot的版本爲-1,調用update方法,實際工做的是updateInternal方法,它會把當前的快照更新到最新版本;
  3. updateInternal會遍歷出版本號小於等於 max(當前版本,0)的checkpoint文件和delta log,並經過new Snapshot將這些更新添加到當前快照中。
@volatile private var currentSnapshot: Snapshot = lastCheckpoint.map { c =>
    val checkpointFiles = c.parts
      .map(p => checkpointFileWithParts(logPath, c.version, p))     //目前版本沒用到parts,疑似商業版功能
      .getOrElse(Seq(checkpointFileSingular(logPath, c.version)))   //返回最新checkpoint文件路徑
    val deltas = store.listFrom(deltaFile(logPath, c.version + 1))  //返回checkpoint以後版本的json文件
      .filter(f => isDeltaFile(f.getPath))
      .toArray
    val deltaVersions = deltas.map(f => deltaVersion(f.getPath))
    verifyDeltaVersions(deltaVersions)  //驗證版本日誌是否連續
    val newVersion = deltaVersions.lastOption.getOrElse(c.version)
    logInfo(s"Loading version $newVersion starting from checkpoint ${c.version}")
    try {
      val deltaIndex = DeltaLogFileIndex(DeltaLog.COMMIT_FILE_FORMAT, deltas)
      val checkpointIndex = DeltaLogFileIndex(DeltaLog.CHECKPOINT_FILE_FORMAT, fs, checkpointFiles)
      val snapshot = new Snapshot(  //建立快照
        logPath,
        newVersion,
        None,
        checkpointIndex :: deltaIndex :: Nil,
        minFileRetentionTimestamp,
        this,
        // we don't want to make an additional RPC here to get commit timestamps when "deltas" is
        // empty. The next "update" call will take care of that if there are delta files.
        deltas.lastOption.map(_.getModificationTime).getOrElse(-1L))

      validateChecksum(snapshot)    //經過crc文件校驗版本,可是目前delta版本並無生成crc文件,後續會更新或者又是商業版的坑?
      lastUpdateTimestamp = clock.getTimeMillis()
      snapshot
    } catch {
      case e: FileNotFoundException
          if Option(e.getMessage).exists(_.contains("parquet does not exist")) =>
        recordDeltaEvent(this, "delta.checkpoint.error.partial")
        throw DeltaErrors.missingPartFilesException(c, e)
      case e: AnalysisException if Option(e.getMessage).exists(_.contains("Path does not exist")) =>
        recordDeltaEvent(this, "delta.checkpoint.error.partial")
        throw DeltaErrors.missingPartFilesException(c, e)
    }
  }.getOrElse {
    new Snapshot(logPath, -1, None, Nil, minFileRetentionTimestamp, this, -1L)  //沒有checkpoint文件時,從頭開始讀delta log計算
  }
// Reconstruct the state by applying deltas in order to the checkpoint.
  // We partition by path as it is likely the bulk of the data is add/remove.
  // Non-path based actions will be collocated to a single partition.
  private val stateReconstruction = {
    val implicits = spark.implicits
    import implicits._

    val numPartitions = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_PARTITIONS)

    val checkpointData = previousSnapshot.getOrElse(emptyActions)
    val deltaData = load(files)
    val allActions = checkpointData.union(deltaData)
    val time = minFileRetentionTimestamp
    val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
    val logPath = path.toUri // for serializability

    allActions.as[SingleAction]
      .mapPartitions { actions =>
        val hdpConf = hadoopConf.value
        actions.flatMap {
          _.unwrap match {
            case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)
            case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)
            case other if other == null => None
            case other => Some(other.wrap)
          }
        }
      }
      .withColumn("file", assertLogBelongsToTable(logPath)(input_file_name()))
      .repartition(numPartitions, coalesce($"add.path", $"remove.path"))
      .sortWithinPartitions("file")
      .as[SingleAction]
      .mapPartitions { iter =>
        val state = new InMemoryLogReplay(time)
        state.append(0, iter.map(_.unwrap))
        state.checkpoint.map(_.wrap)
      }
  }

日誌提交

日誌的提交是在OptimisticTransactionImpl的commit()中實現的。app

  1. 調用prepareCommit方法作各類檢查,如字段是否重複、是否第一次提交等;
  2. 判斷本次commit的隔離等級,目前只檢查是否修改了數據,若修改了數據則使用Serializable級別,不然用SnapshotIsolation,由於不修改數據的狀況下,它能夠提供和Serializable同樣的保證,且能在以後的衝突檢測中更容易經過(writeIsolation尚未使用,後期會更新吧);
  3. 使用doCommit方法提交action日誌,doCommit使用deltaLog.lockInterruptibly來實現樂觀鎖,若是該version+1的log文件已存在,則提交失敗;
  4. doCommit失敗後會調用checkAndRetry進行重試,遍歷讀version後的全部commit log,進行衝突檢測,檢測經過後再次提交doCommit;
  5. 完成doCommit後,postCommit方法會檢查是否知足checkpointInterval,若是知足條件則調用deltaLog.checkpoint()方法生成新的checkpoint文件,並更新_last_checkpoint文件。
/**
   * Modifies the state of the log by adding a new commit that is based on a read at
   * the given `lastVersion`.  In the case of a conflict with a concurrent writer this
   * method will throw an exception.
   *
   * @param actions     Set of actions to commit
   * @param op          Details of operation that is performing this transactional commit
   */
  @throws(classOf[ConcurrentModificationException])
  def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(
      deltaLog,
      "delta.commit") {
    val version = try {
      // Try to commit at the next version.
      var finalActions = prepareCommit(actions, op) //各類檢查

      // Find the isolation level to use for this commit
      val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false)
      val isolationLevelToUse = if (noDataChanged) {  //0.5版本新特性,很簡單的隔離等級斷定,writeIsolation尚未使用,等後續更新吧
        // If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation
        // provides Serializable guarantee. Hence, allow reduced conflict detection by using
        // SnapshotIsolation of what the table isolation level is.
        SnapshotIsolation
      } else {
        Serializable
      }

      val isBlindAppend = { //判斷是否不讀取delta數據且全部的文件操做都是AddFile
        val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty
        val onlyAddFiles =
          finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
        onlyAddFiles && !dependsOnFiles
      }

      if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {    //默認會將commitInfo記錄到commit log裏
        commitInfo = CommitInfo(
          clock.getTimeMillis(),
          op.name,
          op.jsonEncodedValues,
          Map.empty,
          Some(readVersion).filter(_ >= 0),
          None,
          Some(isBlindAppend))
        finalActions = commitInfo +: finalActions
      }

      // Register post-commit hooks if any  
      lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty
      if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) {
        registerPostCommitHook(GenerateSymlinkManifest) //生成manifest支持Presto和Athena
      }

      val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse)  //提交action日誌
      logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
      postCommit(commitVersion, finalActions) //檢測是否合併checkpoint
      commitVersion
    } catch {
      case e: DeltaConcurrentModificationException =>
        recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
        throw e
      case NonFatal(e) =>
        recordDeltaEvent(
          deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
        throw e
    }

    runPostCommitHooks(version, actions)  //0.5版本新特性,用來支持Presto和Amazon Athena

    version
  }

衝突檢測(併發控制)

  1. 若是後續commit升級了protocol版本,則不經過;
  2. 若是後續commit更改了metadata,則不經過;
  3. 若是後續commit更改了文件:
    1. 在0.5以前的版本,只要讀了delta表的文件,且後續其餘commit log有FileAction操做,就不能經過檢測(除非是徹底不依賴delta表,單純的灌數據才行,怪不得併發低);
    2. 0.5版本增長了Serializable,WriteSerializable,SnapshotIsolation三個隔離等級;(如下僅考慮源碼的具體實現,根據isolationLevels裏的文檔註釋,它們應該具備更多的功能,尤爲是WriteSerializable級別,目前的代碼並無使用,推測應該會在後續版本進行更新,或者在商業版裏纔有)
      1. Serializable最嚴格的,要求絕對的串行化,設置了這個級別,只要出現併發衝突,且後續commit log存在AddFile操做,就會報錯;
      2. WriteSerializable容許其餘commit isBlindAppend時經過沖突檢測(即後續的commit僅AddFile,不RemoveFile),此種狀況下最終結果和串行的結果可能不一樣;
      3. SnapshotIsolation最寬鬆,基本均可以經過這部分的衝突檢測,可是可能沒法經過其餘模塊的檢測。
  4. 若是後續commit刪除了本次讀取的文件,則不經過;
  5. 若是後續commit和本次commit刪除了同一個文件,則不經過;
  6. 若是冪等的事務發生了衝突(SetTransaction部分有相同的appId),則不經過。

(具體代碼在OptimisticTransaction.scala的checkAndRetry方法裏,有興趣的能夠看一下)ide

delete

調用DeltaTable裏的delete方法能夠刪除知足指定條件的數據。oop

  1. DeltaTableOperations的executeDelete將任務解析成DeleteCommand,而後run;
  2. DeleteCommand.run會檢查目標delta表是否爲appendOnly,如果,則禁止更新和刪除數據,不然performDelete;
  3. 在performDelete方法中,首先解析給定的刪除數據的條件,劃分爲只使用元數據就能計算的謂詞和其它謂詞兩類;(具體實現是檢查謂詞是否僅包含分區列和條件是否涉及子查詢表達式)
  4. 使用OptimisticTransaction裏的filterFiles方法找出須要刪除的文件列表,
    1. 若是隻有上述第一種狀況,則不須要掃描文件數據,直接刪除文件就行,刪除調用的是removeWithTimestamp方法,返回RemoveFile action;
    2. 若是包含上述第二種狀況,則須要掃描文件數據,找出文件列表中不須要被刪除的數據,使用TransactionalWrite.writeFiles方法寫到新的文件中,此時deleteActions包括AddFile和RemoveFile。
  5. 最後用commit方法提交deleteActions,並使用recordDeltaEvent記錄本次刪除操做的詳細信息。
    (文件並無被物理刪除)
private def performDelete(
      sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = {
    import sparkSession.implicits._

    var numTouchedFiles: Long = 0
    var numRewrittenFiles: Long = 0
    var scanTimeMs: Long = 0
    var rewriteTimeMs: Long = 0

    val startTime = System.nanoTime()
    val numFilesTotal = deltaLog.snapshot.numOfFiles

    val deleteActions: Seq[Action] = condition match {
      case None =>  //沒有限定條件,需刪除整張表,此時遍歷全部文件,而後刪除就行
        // Case 1: Delete the whole table if the condition is true
        val allFiles = txn.filterFiles(Nil)

        numTouchedFiles = allFiles.size
        scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

        val operationTimestamp = System.currentTimeMillis()
        allFiles.map(_.removeWithTimestamp(operationTimestamp)) //邏輯刪除數據文件
      case Some(cond) =>  //有條件就須要區分不一樣狀況了
        val (metadataPredicates, otherPredicates) =
          DeltaTableUtils.splitMetadataAndDataPredicates(   //將條件解析成能用元數據定位的和其餘
            cond, txn.metadata.partitionColumns, sparkSession)

        if (otherPredicates.isEmpty) {  //第一種狀況,只使用元數據就能定位全部數據
          // Case 2: The condition can be evaluated using metadata only.
          //         Delete a set of files without the need of scanning any data files.
          val operationTimestamp = System.currentTimeMillis()
          val candidateFiles = txn.filterFiles(metadataPredicates)  //返回涉及到的文件

          scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
          numTouchedFiles = candidateFiles.size

          candidateFiles.map(_.removeWithTimestamp(operationTimestamp)) //刪除
        } else {  //第二種狀況,須要把文件中不須要刪除的數據重寫一份
          // Case 3: Delete the rows based on the condition.
          val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)

          numTouchedFiles = candidateFiles.size
          val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)    //生成重寫後的文件名和對應的AddFile action

          val fileIndex = new TahoeBatchFileIndex(
            sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot)
          // Keep everything from the resolved target except a new TahoeFileIndex
          // that only involves the affected files instead of all files.
          val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)   //替換文件索引,更新LogicalPlan
          val data = Dataset.ofRows(sparkSession, newTarget)
          val filesToRewrite =
            withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") {
              if (numTouchedFiles == 0) {
                Array.empty[String]
              } else {
                data.filter(new Column(cond)).select(new Column(InputFileName())).distinct()
                  .as[String].collect()
              }
            }

          scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
          if (filesToRewrite.isEmpty) {
            // Case 3.1: no row matches and no delete will be triggered
            Nil
          } else {
            // Case 3.2: some files need an update to remove the deleted files
            // Do the second pass and just read the affected files
            val baseRelation = buildBaseRelation(
              sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap)
            // Keep everything from the resolved target except a new TahoeFileIndex
            // that only involves the affected files instead of all files.
            val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)

            val targetDF = Dataset.ofRows(sparkSession, newTarget)
            val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
            val updatedDF = targetDF.filter(new Column(filterCond))

            val rewrittenFiles = withStatusCode(
              "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") {
              txn.writeFiles(updatedDF)  //寫文件
            }

            numRewrittenFiles = rewrittenFiles.size
            rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs

            val operationTimestamp = System.currentTimeMillis()
            removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ //刪文件
              rewrittenFiles  //寫文件
          }
        }
    }
    if (deleteActions.nonEmpty) {
      txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq)) //提交commit日誌
    }

    recordDeltaEvent(   //記錄本次操做的詳細信息
      deltaLog,
      "delta.dml.delete.stats",
      data = DeleteMetric(
        condition = condition.map(_.sql).getOrElse("true"),
        numFilesTotal,
        numTouchedFiles,
        numRewrittenFiles,
        scanTimeMs,
        rewriteTimeMs)
    )
  }

update

調用DeltaTable裏的update()方法能夠更新知足指定條件的數據。(和delete有些類似)源碼分析

  1. DeltaTableOperations的executeUpdate將任務解析成UpdateCommand,而後run;
  2. UpdateCommand.run檢查目標delta表是否爲appendOnly,如果,則禁止更新和刪除數據,不然performUpdate;
  3. 解析給定條件,劃分爲只使用元數據就能計算的謂詞和其它謂詞兩類;
  4. 使用OptimisticTransaction裏的filterFiles方法找出須要刪除的文件列表,
    1. 若是隻有上述第一種狀況,removeWithTimestamp直接刪除文件,而後調用rewriteFiles方法,使用buildUpdatedColumns更新受影響的列,最後writeFiles;
    2. 若是包含上述第二種狀況,掃描數據,找出須要更新的數據,(邏輯)刪除原文件,更新受影響的數據,而後rewriteFiles。
  5. 最後用commit方法提交deleteActions,並使用recordDeltaEvent記錄本次刪除操做的詳細信息。

(關鍵代碼詳見UpdateCommand.scala的performUpdate方法,和delete類似)post

merge

DeltaTable裏merge直接調用DeltaMergeBuilder方法,後續的whenMatched和whenNotMatched都是向mergeBuilder裏面添加從句,最後使用execute()啓動執行;ui

  1. whenMatched時能夠執行update操做。
    1. update調用addUpdateClause方法,它使用MergeIntoClause.toActions將解析後的列名和update的表達式轉化爲MergeAction,MergeIntoUpdateClause將它與whenMatched的條件結合,經過withClause()添加到mergeBuilder裏;
    2. updateAll也是一樣的流程,只是MergeIntoClause.toActions(Nil, Nil)參數爲空(相似於update set * ),後續execute時resolveClause方法會予以解析。

      private def addUpdateClause(set: Map[String, Column]): DeltaMergeBuilder = {
          if (set.isEmpty && matchCondition.isEmpty) {
            // Nothing to update = no need to add an update clause
            mergeBuilder
          } else {
            val setActions = set.toSeq
            val updateActions = MergeIntoClause.toActions(  //轉化爲MergeAction
              colNames = setActions.map(x => UnresolvedAttribute.quotedString(x._1)),
              exprs = setActions.map(x => x._2.expr),
              isEmptySeqEqualToStar = false)
            val updateClause = MergeIntoUpdateClause(matchCondition.map(_.expr), updateActions) //和條件一塊兒打包
            mergeBuilder.withClause(updateClause)   //加到mergeBuilder裏
          }
      }
  2. whenMatched時能夠執行delete操做,直接用MergeIntoDeleteClause封裝一下matchCondition,而後withClause添加進mergeBuilder;

    /** Delete a matched row from the table */
      def delete(): DeltaMergeBuilder = {
        val deleteClause = MergeIntoDeleteClause(matchCondition.map(_.expr))
        mergeBuilder.withClause(deleteClause)
      }
  3. whenNotMatched時能夠執行insert操做,流程相似update,MergeIntoClause.toActions轉化,MergeIntoInsertClause封裝,而後添加到mergeBuilder裏;

    private def addInsertClause(setValues: Map[String, Column]): DeltaMergeBuilder = {
        val values = setValues.toSeq
        val insertActions = MergeIntoClause.toActions(
          colNames = values.map(x => UnresolvedAttribute.quotedString(x._1)),
          exprs = values.map(x => x._2.expr),
          isEmptySeqEqualToStar = false)
        val insertClause = MergeIntoInsertClause(notMatchCondition.map(_.expr), insertActions)
        mergeBuilder.withClause(insertClause)
      }
  4. 調用execute來執行。
    1. 使用MergeInto.resolveReferences解析mergeClause。首先會檢查merge的語法規則;
      1. 一個merge中至少存在一個whenClauses;
      2. 若是存在兩個whenMatched,則第一個必須有條件;
      3. whenMatched最多有兩個;
      4. update、delete和insert都只能出現一次。
    2. 具體的解析工做是由resolveClause和resolveOrFail來完成的(resolveOrFail提供了一個遞歸的調用)。
    3. 使用PreprocessTableMerge進行預處理,將MergeIntoInsertClause(notMatch)和MergeIntoMatchedClause(match:MergeIntoUpdateClause和MergeIntoDeleteClause都繼承自它)封裝成MergeIntoCommand;
    4. 調用MergeIntoCommand.run。
      1. 若是隻有whenNotMatched,則只須要insert數據,調用writeInsertsOnlyWhenNoMatchedClauses方法,此時只須要left anti join 找到須要插入的數據,而後寫就好了,相關方法是OptimisticTransaction.filterFiles和TransactionalWrite.writeFiles;
      2. 若是包含whenMatched,
        1. 調用findTouchedFiles找到全部須要更改的文件(用withColumn把列編號和文件名加到數據上,而後inner join找到match的數據);
        2. 而後調用writeAllChanges方法處理須要改變的數據,具體流程是對sourceDF(merge的目標df)和targetDF(上一步找出來的delta文件DF)作full join,而後使用JoinedRowProcessor.processPartition處理相應的邏輯,最後writeFiles寫數據,而後remove找出的delta文件。
        3. 提交commit,而後recordDeltaEvent記錄本次的MergeStats。
def execute(): Unit = {
       val sparkSession = targetTable.toDF.sparkSession
       val resolvedMergeInto =
         MergeInto.resolveReferences(mergePlan)(tryResolveReferences(sparkSession) _)   //解析
       if (!resolvedMergeInto.resolved) {
         throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto))
       }
       // Preprocess the actions and verify
       val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto)   //封裝
       sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand)   //檢查LogicalPlan
       mergeIntoCommand.run(sparkSession)   //執行
     }
相關文章
相關標籤/搜索