Spark DataSource API

Spark 1.3 引入了初版的數據源 API,咱們可使用它將常見的數據格式整合到 Spark SQL 中。可是,隨着 Spark 的不斷髮展,這一 API 也體現出了其侷限性,故而 Spark 團隊不得不加入愈來愈多的專有代碼來編寫數據源,以得到更好的性能。Spark 2.3 中,新一版的數據源 API 初見雛形,它克服了上一版 API 的種種問題,原來的數據源代碼也在逐步重寫。本文將演示這兩版 API 的使用方法,比較它們的不一樣之處,以及新版 API 的優點在哪裏。html

DataSource V1 API

V1 API 由一系列的抽象類和接口組成,它們位於 spark/sql/sources/interfaces.scala 文件中。主要的內容有:java

1
2
3
4
5
6
7
8
9
10
11
12
trait RelationProvider {
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
  def sqlContext: SQLContext
  def schema: StructType
}

trait TableScan {
  def buildScan(): RDD[Row]
}

經過實現 RelationProvider 接口,代表該類是一種新定義的數據源,能夠供 Spark SQL 取數所用。傳入 createRelation 方法的參數能夠用來作初始化,如文件路徑、權限信息等。BaseRelation 抽象類則用來定義數據源的表結構,它的來源能夠是數據庫、Parquet 文件等外部系統,也能夠直接由用戶指定。該類還必須實現某個 Scan 接口,Spark 會調用 buildScan 方法來獲取數據源的 RDD,咱們將在下文看到。mysql

JdbcSourceV1

下面咱們來使用 V1 API 實現一個經過 JDBC 讀取數據庫的自定義數據源。爲簡單起見,表結構信息是直接寫死在代碼裏的,咱們先從整表掃描開始。完整的代碼能夠在 GitHub(連接)中找到,數據源表則能夠在這個 連接 中查看。git

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class JdbcSourceV1 extends RelationProvider {
  override def createRelation(parameters: Map[String, String]): BaseRelation = {
    new JdbcRelationV1(parameters("url"))
  }
}

class JdbcRelationV1(url: String) extends BaseRelation with TableScan {
  override def schema: StructType = StructType(Seq(
    StructField("id", IntegerType),
    StructField("emp_name", StringType)
  ))

  override def buildScan(): RDD[Row] = new JdbcRDD(url)
}

class JdbcRDD(url: String) extends RDD[Row] {
  override def compute(): Iterator[Row] = {
    val conn = DriverManager.getConnection(url)
    val stmt = conn.prepareStatement("SELECT * FROM employee")
    val rs = stmt.executeQuery()

    new Iterator[Row] {
      def hasNext: Boolean = rs.next()
      def next: Row = Row(rs.getInt("id"), rs.getString("emp_name"))
    }
  }
}

JdbcRDD#compute 負責實際的讀取操做,它從上游獲取到數據庫鏈接信息、選取的字段、以及過濾條件,拼裝 SQL 後執行,並返回一個 Row 類型的迭代器對象,每一行數據的結構和請求的字段列表相符。定義好數據源後,咱們就能夠用 DataFrame 對象來直接操做了:github

1
2
3
4
5
6
7
val df = spark.read
  .format("JdbcSourceV2")
  .option("url", "jdbc:mysql://localhost/spark")
  .load()

df.printSchema()
df.show()

上述代碼輸出的內容是:sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
root
 |-- id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- dep_name: string (nullable = true)
 |-- salary: decimal(7,2) (nullable = true)
 |-- age: decimal(3,0) (nullable = true)

+---+--------+----------+-------+---+
| id|emp_name|  dep_name| salary|age|
+---+--------+----------+-------+---+
|  1| Matthew|Management|4500.00| 55|
|  2|  Olivia|Management|4400.00| 61|
|  3|   Grace|Management|4000.00| 42|
|  4|     Jim|Production|3700.00| 35|
|  5|   Alice|Production|3500.00| 24|
+---+--------+----------+-------+---+

V1 API 的侷限性

咱們能夠看到,V1 API 使用起來很是方便,所以可以知足 Spark SQL 初期的需求,但也難免存在不少侷限性:數據庫

依賴上層 API

createRelation 接收 SQLContext 做爲參數;buildScan 方法返回的是 RDD[Row] 類型;而在實現寫操做時,insert 方法會直接接收 DataFrame 類型的參數:apache

1
2
3
trait InsertableRelation {
  def insert(data: DataFrame, overwrite: Boolean): Unit
}

這些類型都屬於較爲上層的 Spark API,其中某些類已經發生了變化,如 SQLContext 已被 SparkSession 取代,而 DataFrame 也改成了 Dataset[Row] 類型的一個別稱。這些改變不該該體現到底層的數據源 API 中。api

難以添加新的優化算子

除了上文中的 TableScan 接口,V1 API 還提供了 PrunedScan 接口,用來裁剪不須要的字段;PrunedFilteredScan 接口則能夠將過濾條件下推到數據源中。在 JdbcSourceV1 示例中,這類下推優化會體如今 SQL 語句裏:服務器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class JdbcRelationV1 extends BaseRelation with PrunedFilteredScan {
  def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = {
    new JdbcRDD(requiredColumns, filters)
  }
}

class JdbcRDD(columns: Array[String], filters: Array[Filter]) {
  def compute() = {
    val wheres = filters.flatMap {
      case EqualTo(attribute, value) => Some(s"$attribute = '$value'")
      case _ => None
    }
    val sql = s"SELECT ${columns.mkString(", ")} FROM employee WHERE ${wheres.mkString(" AND ")}"
  }
}

若是咱們想添加新的優化算子(如 LIMIT 子句),就難免要引入一系列的 Scan 接口組合:

1
2
3
4
5
6
7
8
9
10
11
trait LimitedScan {
  def buildScan(limit: Int): RDD[Row]
}

trait PrunedLimitedScan {
  def buildScan(requiredColumns: Array[String], limit: Int): RDD[Row]
}

trait PrunedFilteredLimitedScan {
  def buildScan(requiredColumns: Array[String], filters: Array[Filter], limit: Int): RDD[Row]
}

難以傳遞分區信息

對於支持數據分區的數據源,如 HDFS、Kafka 等,V1 API 沒有提供原生的支持,於是也不能利用數據局部性(Data Locality)。咱們須要本身繼承 RDD 來實現,好比下面的代碼就對 Kafka 數據源進行了分區,並告知 Spark 能夠將數據讀取操做放入 Kafka Broker 所在的服務器上執行,以提高效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
case class KafkaPartition(partitionId: Int, leaderHost: String) extends Partition {
  def index: Int = partitionId
}

class KafkaRDD(sc: SparkContext) extends RDD[Row](sc, Nil) {
  def getPartitions: Array[Partition] = Array(
    // populate with Kafka PartitionInfo
    KafkaPartition(0, "broker_0"),
    KafkaPartition(1, "broker_1")
  )

  override def getPreferredLocations(split: Partition): Seq[String] = Seq(
    split.asInstanceOf[KafkaPartition].leaderHost
  )
}

此外,相似 Cassandra 這樣的數據庫,會按主鍵對數據進行分片。那麼,若是查詢語句中包含了按該主鍵進行分組的子句,Spark 就能夠省去一次 Shuffle 操做。這在 V1 API 中也是不支持的,而 V2 API 則提供了 SupportsReportPartitioning 接口來支持。

缺乏事務性的寫操做

Spark 任務是有可能失敗的,使用 V1 API 時就會留下部分寫入的數據。固然,對於 HDFS 這樣的文件系統來講問題不大,由於能夠用 _SUCCESS 來標記該次寫操做是否執行成功。但這一邏輯也須要最終用戶來實現,而 V2 API 則提供了明確的接口來支持事務性的寫操做。

缺乏列存儲和流式計算支持

Spark SQL 目前已支持列存儲和流式計算,但二者都不是用 V1 API 實現的。ParquetFileFormat 和 KafkaSource 等類型都使用了專有代碼和內部 API。這些特性也在 V2 API 中獲得支持。

DataSource V2 API

V2 API 首先使用了一個標記性的 DataSourceV2 接口,實現了該接口的類還必須實現 ReadSupport 或 WriteSupport,用來表示自身支持讀或寫操做。ReadSupport 接口中的方法會被用來建立 DataSourceReader 類,同時接收到初始化參數;該類繼而建立 DataReaderFactory 和 DataReader 類,後者負責真正的讀操做,接口中定義的方法和迭代器相似。此外,DataSourceReader還能夠實現各種 Support 接口,代表本身支持某些優化下推操做,如裁剪字段、過濾條件等。WriteSupport API 的層級結構與之相似。這些接口都是用 Java 語言編寫,以得到更好的交互支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface DataSourceV2 {}

public interface ReadSupport extends DataSourceV2 {
  DataSourceReader createReader(DataSourceOptions options);
}

public interface DataSourceReader {
  StructType readSchema();
  List<DataReaderFactory<Row>> createDataReaderFactories();
}

public interface SupportsPushDownRequiredColumns extends DataSourceReader {
  void pruneColumns(StructType requiredSchema);
}

public interface DataReaderFactory<T> {
  DataReader<T> createDataReader();
}

public interface DataReader<T> extends Closeable {
  boolean next();
  T get();
}

可能你會注意到,DataSourceReader#createDataReaderFactories 仍然捆綁了 Row 類型,這是由於目前 V2 API 只支持 Row類型的返回值,且這套 API 仍處於進化狀態(Evolving)。

JdbcSourceV2

讓咱們使用 V2 API 來重寫 JDBC 數據源。下面是一個整表掃描的示例,完整代碼能夠在 GitHub(連接)上查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class JdbcDataSourceReader extends DataSourceReader {
  def readSchema = StructType(Seq(
    StructField("id", IntegerType),
    StructField("emp_name", StringType)
  ))

  def createDataReaderFactories() = {
    Seq(new JdbcDataReaderFactory(url)).asJava
  }
}

class JdbcDataReader(url: String) extends DataReader[Row] {
  private var conn: Connection = null
  private var rs: ResultSet = null

  def next() = {
    if (rs == null) {
      conn = DriverManager.getConnection(url)
      val stmt = conn.prepareStatement("SELECT * FROM employee")
      rs = stmt.executeQuery()
    }
    rs.next()
  }

  def get() = Row(rs.getInt("id"), rs.getString("emp_name"))
}

裁剪字段

經過實現 SupportsPushDownRequiredColumns 接口,Spark 會調用其 pruneColumns 方法,傳入用戶所指定的字段列表(StructType),DataSourceReader 能夠將該信息傳給 DataReader 使用。

1
2
3
4
5
6
7
8
9
10
11
class JdbcDataSourceReader with SupportsPushDownRequiredColumns {
  var requiredSchema = JdbcSourceV2.schema
  def pruneColumns(requiredSchema: StructType)  = {
    this.requiredSchema = requiredSchema
  }

  def createDataReaderFactories() = {
    val columns = requiredSchema.fields.map(_.name)
    Seq(new JdbcDataReaderFactory(columns)).asJava
  }
}

咱們能夠用 df.explain(true) 來驗證執行計劃。例如,SELECT emp_name, age FROM employee 語句的執行計劃在優化先後是這樣的:

1
2
3
4
5
6
7
8
9
== Analyzed Logical Plan ==
emp_name: string, age: decimal(3,0)
Project [emp_name#1, age#4]
+- SubqueryAlias employee
   +- DataSourceV2Relation [id#0, emp_name#1, dep_name#2, salary#3, age#4], datasource.JdbcDataSourceReader@15ceeb42

== Optimized Logical Plan ==
Project [emp_name#1, age#4]
+- DataSourceV2Relation [emp_name#1, age#4], datasource.JdbcDataSourceReader@15ceeb42

能夠看到,字段裁剪被反映到了數據源中。若是咱們將實際執行的 SQL 語句打印出來,也能看到字段裁剪下推的結果。

過濾條件

相似的,實現 SupportsPushDownFilters 接口能夠將過濾條件下推到數據源中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class JdbcDataSourceReader with SupportsPushDownFilters {
  var filters = Array.empty[Filter]
  var wheres = Array.empty[String]

  def pushFilters(filters: Array[Filter]) = {
    val supported = ListBuffer.empty[Filter]
    val unsupported = ListBuffer.empty[Filter]
    val wheres = ListBuffer.empty[String]

    filters.foreach {
      case filter: EqualTo => {
        supported += filter
        wheres += s"${filter.attribute} = '${filter.value}'"
      }
      case filter => unsupported += filter
    }

    this.filters = supported.toArray
    this.wheres = wheres.toArray
    unsupported.toArray
  }

  def pushedFilters = filters

  def createDataReaderFactories() = {
    Seq(new JdbcDataReaderFactory(wheres)).asJava
  }
}

多分區支持

createDataReaderFactories 返回的是列表類型,每一個讀取器都會產生一個 RDD 分區。若是咱們想開啓多個讀取任務,就能夠生成多個讀取器工廠,併爲每一個讀取器限定主鍵範圍:

1
2
3
4
5
6
def createDataReaderFactories() = {
  Seq((1, 6), (7, 11)).map { case (minId, maxId) =>
    val partition = s"id BETWEEN $minId AND $maxId"
    new JdbcDataReaderFactory(partition)
  }.asJava
}

事務性的寫操做

V2 API 提供了兩組 commit / abort 方法,用來實現事務性的寫操做:

1
2
3
4
5
6
7
8
9
10
public interface DataSourceWriter {
  void commit(WriterCommitMessage[] messages);
  void abort(WriterCommitMessage[] messages);
}

public interface DataWriter<T> {
  void write(T record) throws IOException;
  WriterCommitMessage commit() throws IOException;
  void abort() throws IOException;
}

DataSourceWriter 在 Spark Driver 中執行,DataWriter 則運行在其餘節點的 Spark Executor 上。當 DataWriter 成功執行了寫操做,就會將提交信息傳遞給 Driver;當 DataSourceWriter 收集到了全部寫任務的提交信息,就會執行最終的提交操做。若是某個寫任務失敗了,它的 abort 方法會獲得執行;若是通過多輪重試後仍然失敗,則全部寫任務的 abort 方法都會被調用,進行數據清理操做。

列存儲與流式計算支持

這兩個特性仍處於實驗性階段,在 Spark 中尚未獲得使用。簡單來講,DataSourceReader 類能夠實現 SupportsScanColumnarBatch 接口來聲明本身會返回 ColumnarBatch 對象,這個對象是 Spark 內部用來存放列式數據的。對於流式數據,則有 MicroBatchReader 和 ContinuousReader 這兩個接口,感興趣的讀者能夠到 Spark 單元測試 代碼中查看。

參考資料

相關文章
相關標籤/搜索