Apache Spark Delta Lake 寫數據使用及實現原理代碼解析

Apache Spark Delta Lake 寫數據使用及實現原理代碼解析

Delta Lake 寫數據是其最基本的功能,並且其使用和現有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實現原理以前先來看看如何使用它,具體使用以下:sql

df.write.format("delta").save("/data/yangping.wyp/delta/test/")
 
//數據按照 dt 分區
df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")
 
// 覆蓋以前的數據
df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")

你們能夠看出,使用寫 Delta 數據是很是簡單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。apache

Delta Lake 寫數據原理

前面簡單瞭解瞭如何使用 Delta Lake 來寫數據,本小結咱們將深刻介紹 Delta Lake 是如何保證寫數據的基本原理以及如何保證事務性。緩存

得益於 Apache Spark 強大的數據源 API,咱們能夠很方便的給 Spark 添加任何數據源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實現的一種新的數據源,咱們調用 df.write.format("delta") 其實底層調用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類。爲了簡單起見,本文介紹的是 Delta Lake 批量寫的實現,實時流寫 Delta Lake 本文不涉及,後面有機會再介紹。 Delta Lake 批量寫擴展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質,並實現了其中的方法。咱們調用上面的寫數據方法首先會調用 DeltaDataSource 類的 createRelation 方法,它的具體實現以下:微信

override def createRelation(
    sqlContext: SQLContext,
    mode: SaveMode,
    parameters: Map[String, String],
    data: DataFrame): BaseRelation = {
 
  // 寫數據的路徑
  val path = parameters.getOrElse("path", {
    throw DeltaErrors.pathNotSpecifiedException
  })
 
  // 分區字段
  val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
    .map(DeltaDataSource.decodePartitioningColumns)
    .getOrElse(Nil)
 
 
  // 事務日誌對象
  val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
 
  // 真正的寫操做過程
  WriteIntoDelta(
    deltaLog = deltaLog,
    mode = mode,
    new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
    partitionColumns = partitionColumns,
    configuration = Map.empty,
    data = data).run(sqlContext.sparkSession)
 
  deltaLog.createRelation()
}

其中 mode 就是保持數據的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個傳遞的參數,好比分區字段、數據保存路徑以及 Delta 支持的一些參數(replaceWhere、mergeSchema、overwriteSchema 等,具體參見 org.apache.spark.sql.delta.DeltaOptions);data 就是咱們須要保存的數據。session

createRelation 方法緊接着就是獲取數據保存的路徑,分區字段等信息。而後初始化 deltaLog,deltaLog 的初始化會作不少事情,好比會讀取磁盤全部的事務日誌(_delta_log 目錄下),並構建最新事務日誌的最新快照,裏面能夠拿到最新數據的版本。因爲 deltaLog 的初始化成本比較高,因此 deltaLog 初始化完以後會緩存到 deltaLogCache 中,這是一個使用 Guava 的 CacheBuilder 類實現的一個緩存,緩存的數據保持一小時,緩存大小能夠經過 delta.log.cacheSize 參數進行設置。只要寫數據的路徑是同樣的,就只須要初始化一次 deltaLog,後面直接從緩存中拿便可。除非以前緩存的 deltaLog 被清理了,或者無效纔會再次初始化。DeltaLog 類是 Delta Lake 中最重要的類之一,涉及的內容很是多,因此咱們會單獨使用一篇文章進行介紹。app

緊接着初始化 WriteIntoDelta,WriteIntoDelta 擴展自 RunnableCommand,Delta Lake 中的更新、刪除、合併都是擴展這個類的。初始化完 WriteIntoDelta 以後,就會調用 run 方法執行真正的寫數據操做。WriteIntoDelta 的 run 方法實現以下:ide

override def run(sparkSession: SparkSession): Seq[Row] = {
    deltaLog.withNewTransaction { txn =>
      val actions = write(txn, sparkSession)
      val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere)
      txn.commit(actions, operation)
    }
    Seq.empty
}

Delta Lake 全部的更新操做都是在事務中進行的,deltaLog.withNewTransaction 就是一個事務,withNewTransaction 的實現以下:函數

def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
  try {
    // 更新當前表事務日誌的快照
    update()
    // 初始化樂觀事務鎖對象
    val txn = new OptimisticTransaction(this)
    // 開啓事務
    OptimisticTransaction.setActive(txn)
    // 執行寫數據操做
    thunk(txn)
  } finally {
    // 關閉事務
    OptimisticTransaction.clearActive()
  }
}

在開啓事務以前,須要更新當前表事務的快照,由於在執行寫數據以前,這張表可能已經被修改了,執行 update 操做以後,就能夠拿到當前表的最新版本,緊接着開啓樂觀事務鎖。thunk(txn) 就是須要執行的事務操做,對應 deltaLog.withNewTransaction 裏面的全部代碼。oop

咱們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執行寫數據的操做,它的實現以下:ui

  def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
    import sparkSession.implicits._
    // 若是不是第一次往表裏面寫數據,須要判斷寫數據的模式是否符合條件
    if (txn.readVersion > -1) {
      // This table already exists, check if the insert is valid.
      if (mode == SaveMode.ErrorIfExists) {
        throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
      } else if (mode == SaveMode.Ignore) {
        return Nil
      } else if (mode == SaveMode.Overwrite) {
        deltaLog.assertRemovable()
      }
    }
 
    // 更新表的模式,好比是否覆蓋現有的模式,是否和現有的模式進行 merge
    updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
 
    // 是否認義分區過濾條件
    val replaceWhere = options.replaceWhere
    val partitionFilters = if (replaceWhere.isDefined) {
      val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
      if (mode == SaveMode.Overwrite) {
        verifyPartitionPredicates(
          sparkSession, txn.metadata.partitionColumns, predicates)
      }
      Some(predicates)
    } else {
      None
    }
 
    // 第一次寫數據初始化事務日誌的目錄
    if (txn.readVersion < 0) {
      // Initialize the log path
      deltaLog.fs.mkdirs(deltaLog.logPath)
    }
 
    // 寫數據到文件系統中
    val newFiles = txn.writeFiles(data, Some(options))
     
    val deletedFiles = (mode, partitionFilters) match {
       // 全量覆蓋,直接拿出緩存在內存中最新事務日誌快照裏面的全部 AddFile 文件
      case (SaveMode.Overwrite, None) =>
        txn.filterFiles().map(_.remove)
      // 從事務日誌快照中獲取對應分區裏面的全部 AddFile 文件
      case (SaveMode.Overwrite, Some(predicates)) =>
        // Check to make sure the files we wrote out were actually valid.
        val matchingFiles = DeltaLog.filterFileList(
          txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
        val invalidFiles = newFiles.toSet -- matchingFiles
        if (invalidFiles.nonEmpty) {
          val badPartitions = invalidFiles
            .map(_.partitionValues)
            .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
            .mkString(", ")
          throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
        }
 
        txn.filterFiles(predicates).map(_.remove)
      case _ => Nil
    }
 
    newFiles ++ deletedFiles
  }
}

若是 txn.readVersion == -1,說明是第一次寫數據到 Delta Lake 表,因此當這個值大於 -1 的時候,須要判斷一下寫數據的操做是否合法。
因爲 Delta Lake 底層使用的是 Parquet 格式,因此 Delta Lake 表也支持模式的增長合併等,這就是 updateMetadata 函數對應的操做。
由於 Delta Lake 表支持分區,因此咱們可能在寫數據的時候指定某個分區進行覆蓋。
真正寫數據的操做是 txn.writeFiles 函數執行的,具體實現以下:

def writeFiles(
      data: Dataset[_],
      writeOptions: Option[DeltaOptions],
      isOptimize: Boolean): Seq[AddFile] = {
    hasWritten = true
 
    val spark = data.sparkSession
    val partitionSchema = metadata.partitionSchema
    val outputPath = deltaLog.dataPath
 
    val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
    val partitioningColumns =
      getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
 
    // 獲取 DelayedCommitProtocol,裏面能夠設置寫文件的名字,
    // commitTask 和 commitJob 等作一些事情
    val committer = getCommitter(outputPath)
 
    val invariants = Invariants.getFromSchema(metadata.schema, spark)
 
    SQLExecution.withNewExecutionId(spark, queryExecution) {
      val outputSpec = FileFormatWriter.OutputSpec(
        outputPath.toString,
        Map.empty,
        output)
 
      val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
 
      FileFormatWriter.write(
        sparkSession = spark,
        plan = physicalPlan,
        fileFormat = snapshot.fileFormat,
        committer = committer,
        outputSpec = outputSpec,
        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
        partitionColumns = partitioningColumns,
        bucketSpec = None,
        statsTrackers = Nil,
        options = Map.empty)
    }
 
    // 返回新增的文件
    committer.addedStatuses
}

Delta Lake 寫操做最終調用 Spark 的 FileFormatWriter.write 方法進行的,經過這個方法的複用將咱們真正的數據寫入到 Delta Lake 表裏面去了。
在 Delta Lake 中,若是是新增文件則會在事務日誌中使用 AddFile 類記錄相關的信息,AddFile 持久化到事務日誌裏面的內容以下:

{"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}}

能夠看出 AddFile 裏面記錄了新增文件的保存路徑,分區信息,新增的文件大小,修改時間等信息。若是是刪除文件,也會在事務日誌裏面記錄這個刪除操做,對應的就是使用 RemoveFile 類存儲,RemoveFile 持久化到事務日誌裏面的內容以下:

{"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}}

RemoveFile 裏面保存了刪除文件的路徑,刪除時間等信息。若是新增一個文件,再刪除一個文件,那麼最新的事務日誌快照裏面只會保存刪除這個文件的記錄。從這裏面也能夠看出, Delta Lake 刪除、新增 ACID 是針對文件級別的。

上面的寫操做確定會產生新的文件,因此寫操做以後就須要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和須要刪除的文件(RemoveFile)。針對那些文件須要刪除須要作一些判斷,主要分兩種狀況(具體參見 write 方法裏面的):

  • 若是是全表覆蓋,則直接從緩存在內存中最新的事務日誌快照中拿出全部 AddFile 文件,而後將其標記爲 RemoveFile;
  • 若是是分區內的覆蓋,則從緩存在內存中最新的事務日誌快照中拿出對應分區下的 AddFile 文件,而後將其標記爲 RemoveFile。

最後 write 方法返回新增的文件和須要刪除的文件(newFiles ++ deletedFiles),這些文件最終須要記錄到事務日誌裏面去。關於事務日誌是如何寫進去的請參見這篇文章的詳細分析。

寫在最後

爲了營造一個開放的Cassandra技術交流環境,社區創建了微信公衆號和釘釘羣。爲廣大用戶提供專業的技術分享及問答,按期開展專家技術直播,歡迎你們加入。另雲Cassandra免費火爆公測中,歡迎試用:https://www.aliyun.com/product/cds

 

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索