delta lake 包含Protocol、Metadata、FileAction(AddFile、RemoveFile)、CommitInfo和SetTransaction這幾種元數據action。json
//初始的commit log會包含protocol和metaData的信息 {"commitInfo":{"timestamp":1576480709055,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}} {"protocol":{"minReaderVersion":1,"minWriterVersion":2}} {"metaData":{"id":"fe0948b9-8253-4942-9e28-3a89321a004d","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"azkaban_tag\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"project_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"flow_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"job_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"application_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"queue_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"master_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1576480707164}} {"add":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","partitionValues":{},"size":1443338,"modificationTime":1576480711000,"dataChange":true}} //以後的commit log會記錄下當前操做的信息 {"commitInfo":{"timestamp":1576481270646,"operation":"DELETE","operationParameters":{"predicate":"[\"(`master_name` = 'mob_analyse')\"]"},"readVersion":0,"isBlindAppend":false}} {"remove":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","deletionTimestamp":1576481270643,"dataChange":true}} {"add":{"path":"part-00000-d6431884-390d-4837-865c-f6e52f0e2cf5-c000.snappy.parquet","partitionValues":{},"size":1430267,"modificationTime":1576481273000,"dataChange":true}}
當存在checkpoint文件時,DeltaLog類的currentSnapshot會根據checkpoint和以後的json日誌來計算快照。session
當沒有checkpoint文件時,經過DeltaLog類的update方法來計算快照。併發
@volatile private var currentSnapshot: Snapshot = lastCheckpoint.map { c => val checkpointFiles = c.parts .map(p => checkpointFileWithParts(logPath, c.version, p)) //目前版本沒用到parts,疑似商業版功能 .getOrElse(Seq(checkpointFileSingular(logPath, c.version))) //返回最新checkpoint文件路徑 val deltas = store.listFrom(deltaFile(logPath, c.version + 1)) //返回checkpoint以後版本的json文件 .filter(f => isDeltaFile(f.getPath)) .toArray val deltaVersions = deltas.map(f => deltaVersion(f.getPath)) verifyDeltaVersions(deltaVersions) //驗證版本日誌是否連續 val newVersion = deltaVersions.lastOption.getOrElse(c.version) logInfo(s"Loading version $newVersion starting from checkpoint ${c.version}") try { val deltaIndex = DeltaLogFileIndex(DeltaLog.COMMIT_FILE_FORMAT, deltas) val checkpointIndex = DeltaLogFileIndex(DeltaLog.CHECKPOINT_FILE_FORMAT, fs, checkpointFiles) val snapshot = new Snapshot( //建立快照 logPath, newVersion, None, checkpointIndex :: deltaIndex :: Nil, minFileRetentionTimestamp, this, // we don't want to make an additional RPC here to get commit timestamps when "deltas" is // empty. The next "update" call will take care of that if there are delta files. deltas.lastOption.map(_.getModificationTime).getOrElse(-1L)) validateChecksum(snapshot) //經過crc文件校驗版本,可是目前delta版本並無生成crc文件,後續會更新或者又是商業版的坑? lastUpdateTimestamp = clock.getTimeMillis() snapshot } catch { case e: FileNotFoundException if Option(e.getMessage).exists(_.contains("parquet does not exist")) => recordDeltaEvent(this, "delta.checkpoint.error.partial") throw DeltaErrors.missingPartFilesException(c, e) case e: AnalysisException if Option(e.getMessage).exists(_.contains("Path does not exist")) => recordDeltaEvent(this, "delta.checkpoint.error.partial") throw DeltaErrors.missingPartFilesException(c, e) } }.getOrElse { new Snapshot(logPath, -1, None, Nil, minFileRetentionTimestamp, this, -1L) //沒有checkpoint文件時,從頭開始讀delta log計算 }
// Reconstruct the state by applying deltas in order to the checkpoint. // We partition by path as it is likely the bulk of the data is add/remove. // Non-path based actions will be collocated to a single partition. private val stateReconstruction = { val implicits = spark.implicits import implicits._ val numPartitions = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_PARTITIONS) val checkpointData = previousSnapshot.getOrElse(emptyActions) val deltaData = load(files) val allActions = checkpointData.union(deltaData) val time = minFileRetentionTimestamp val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) val logPath = path.toUri // for serializability allActions.as[SingleAction] .mapPartitions { actions => val hdpConf = hadoopConf.value actions.flatMap { _.unwrap match { case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap) case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap) case other if other == null => None case other => Some(other.wrap) } } } .withColumn("file", assertLogBelongsToTable(logPath)(input_file_name())) .repartition(numPartitions, coalesce($"add.path", $"remove.path")) .sortWithinPartitions("file") .as[SingleAction] .mapPartitions { iter => val state = new InMemoryLogReplay(time) state.append(0, iter.map(_.unwrap)) state.checkpoint.map(_.wrap) } }
日誌的提交是在OptimisticTransactionImpl的commit()中實現的。app
/** * Modifies the state of the log by adding a new commit that is based on a read at * the given `lastVersion`. In the case of a conflict with a concurrent writer this * method will throw an exception. * * @param actions Set of actions to commit * @param op Details of operation that is performing this transactional commit */ @throws(classOf[ConcurrentModificationException]) def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation( deltaLog, "delta.commit") { val version = try { // Try to commit at the next version. var finalActions = prepareCommit(actions, op) //各類檢查 // Find the isolation level to use for this commit val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false) val isolationLevelToUse = if (noDataChanged) { //0.5版本新特性,很簡單的隔離等級斷定,writeIsolation尚未使用,等後續更新吧 // If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation // provides Serializable guarantee. Hence, allow reduced conflict detection by using // SnapshotIsolation of what the table isolation level is. SnapshotIsolation } else { Serializable } val isBlindAppend = { //判斷是否不讀取delta數據且全部的文件操做都是AddFile val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty val onlyAddFiles = finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile]) onlyAddFiles && !dependsOnFiles } if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) { //默認會將commitInfo記錄到commit log裏 commitInfo = CommitInfo( clock.getTimeMillis(), op.name, op.jsonEncodedValues, Map.empty, Some(readVersion).filter(_ >= 0), None, Some(isBlindAppend)) finalActions = commitInfo +: finalActions } // Register post-commit hooks if any lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) { registerPostCommitHook(GenerateSymlinkManifest) //生成manifest支持Presto和Athena } val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse) //提交action日誌 logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") postCommit(commitVersion, finalActions) //檢測是否合併checkpoint commitVersion } catch { case e: DeltaConcurrentModificationException => recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType) throw e case NonFatal(e) => recordDeltaEvent( deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e))) throw e } runPostCommitHooks(version, actions) //0.5版本新特性,用來支持Presto和Amazon Athena version }
(具體代碼在OptimisticTransaction.scala的checkAndRetry方法裏,有興趣的能夠看一下)ide
調用DeltaTable裏的delete方法能夠刪除知足指定條件的數據。oop
private def performDelete( sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = { import sparkSession.implicits._ var numTouchedFiles: Long = 0 var numRewrittenFiles: Long = 0 var scanTimeMs: Long = 0 var rewriteTimeMs: Long = 0 val startTime = System.nanoTime() val numFilesTotal = deltaLog.snapshot.numOfFiles val deleteActions: Seq[Action] = condition match { case None => //沒有限定條件,需刪除整張表,此時遍歷全部文件,而後刪除就行 // Case 1: Delete the whole table if the condition is true val allFiles = txn.filterFiles(Nil) numTouchedFiles = allFiles.size scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 val operationTimestamp = System.currentTimeMillis() allFiles.map(_.removeWithTimestamp(operationTimestamp)) //邏輯刪除數據文件 case Some(cond) => //有條件就須要區分不一樣狀況了 val (metadataPredicates, otherPredicates) = DeltaTableUtils.splitMetadataAndDataPredicates( //將條件解析成能用元數據定位的和其餘 cond, txn.metadata.partitionColumns, sparkSession) if (otherPredicates.isEmpty) { //第一種狀況,只使用元數據就能定位全部數據 // Case 2: The condition can be evaluated using metadata only. // Delete a set of files without the need of scanning any data files. val operationTimestamp = System.currentTimeMillis() val candidateFiles = txn.filterFiles(metadataPredicates) //返回涉及到的文件 scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 numTouchedFiles = candidateFiles.size candidateFiles.map(_.removeWithTimestamp(operationTimestamp)) //刪除 } else { //第二種狀況,須要把文件中不須要刪除的數據重寫一份 // Case 3: Delete the rows based on the condition. val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates) numTouchedFiles = candidateFiles.size val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) //生成重寫後的文件名和對應的AddFile action val fileIndex = new TahoeBatchFileIndex( sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) //替換文件索引,更新LogicalPlan val data = Dataset.ofRows(sparkSession, newTarget) val filesToRewrite = withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") { if (numTouchedFiles == 0) { Array.empty[String] } else { data.filter(new Column(cond)).select(new Column(InputFileName())).distinct() .as[String].collect() } } scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 if (filesToRewrite.isEmpty) { // Case 3.1: no row matches and no delete will be triggered Nil } else { // Case 3.2: some files need an update to remove the deleted files // Do the second pass and just read the affected files val baseRelation = buildBaseRelation( sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) val targetDF = Dataset.ofRows(sparkSession, newTarget) val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType))) val updatedDF = targetDF.filter(new Column(filterCond)) val rewrittenFiles = withStatusCode( "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") { txn.writeFiles(updatedDF) //寫文件 } numRewrittenFiles = rewrittenFiles.size rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs val operationTimestamp = System.currentTimeMillis() removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ //刪文件 rewrittenFiles //寫文件 } } } if (deleteActions.nonEmpty) { txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq)) //提交commit日誌 } recordDeltaEvent( //記錄本次操做的詳細信息 deltaLog, "delta.dml.delete.stats", data = DeleteMetric( condition = condition.map(_.sql).getOrElse("true"), numFilesTotal, numTouchedFiles, numRewrittenFiles, scanTimeMs, rewriteTimeMs) ) }
調用DeltaTable裏的update()方法能夠更新知足指定條件的數據。(和delete有些類似)源碼分析
(關鍵代碼詳見UpdateCommand.scala的performUpdate方法,和delete類似)post
DeltaTable裏merge直接調用DeltaMergeBuilder方法,後續的whenMatched和whenNotMatched都是向mergeBuilder裏面添加從句,最後使用execute()啓動執行;ui
updateAll也是一樣的流程,只是MergeIntoClause.toActions(Nil, Nil)參數爲空(相似於update set * ),後續execute時resolveClause方法會予以解析。
private def addUpdateClause(set: Map[String, Column]): DeltaMergeBuilder = { if (set.isEmpty && matchCondition.isEmpty) { // Nothing to update = no need to add an update clause mergeBuilder } else { val setActions = set.toSeq val updateActions = MergeIntoClause.toActions( //轉化爲MergeAction colNames = setActions.map(x => UnresolvedAttribute.quotedString(x._1)), exprs = setActions.map(x => x._2.expr), isEmptySeqEqualToStar = false) val updateClause = MergeIntoUpdateClause(matchCondition.map(_.expr), updateActions) //和條件一塊兒打包 mergeBuilder.withClause(updateClause) //加到mergeBuilder裏 } }
whenMatched時能夠執行delete操做,直接用MergeIntoDeleteClause封裝一下matchCondition,而後withClause添加進mergeBuilder;
/** Delete a matched row from the table */ def delete(): DeltaMergeBuilder = { val deleteClause = MergeIntoDeleteClause(matchCondition.map(_.expr)) mergeBuilder.withClause(deleteClause) }
whenNotMatched時能夠執行insert操做,流程相似update,MergeIntoClause.toActions轉化,MergeIntoInsertClause封裝,而後添加到mergeBuilder裏;
private def addInsertClause(setValues: Map[String, Column]): DeltaMergeBuilder = { val values = setValues.toSeq val insertActions = MergeIntoClause.toActions( colNames = values.map(x => UnresolvedAttribute.quotedString(x._1)), exprs = values.map(x => x._2.expr), isEmptySeqEqualToStar = false) val insertClause = MergeIntoInsertClause(notMatchCondition.map(_.expr), insertActions) mergeBuilder.withClause(insertClause) }
def execute(): Unit = { val sparkSession = targetTable.toDF.sparkSession val resolvedMergeInto = MergeInto.resolveReferences(mergePlan)(tryResolveReferences(sparkSession) _) //解析 if (!resolvedMergeInto.resolved) { throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto)) } // Preprocess the actions and verify val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto) //封裝 sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand) //檢查LogicalPlan mergeIntoCommand.run(sparkSession) //執行 }