Spark 1.3 引入了初版的數據源 API,咱們可使用它將常見的數據格式整合到 Spark SQL 中。可是,隨着 Spark 的不斷髮展,這一 API 也體現出了其侷限性,故而 Spark 團隊不得不加入愈來愈多的專有代碼來編寫數據源,以得到更好的性能。Spark 2.3 中,新一版的數據源 API 初見雛形,它克服了上一版 API 的種種問題,原來的數據源代碼也在逐步重寫。本文將演示這兩版 API 的使用方法,比較它們的不一樣之處,以及新版 API 的優點在哪裏。html
V1 API 由一系列的抽象類和接口組成,它們位於 spark/sql/sources/interfaces.scala 文件中。主要的內容有:java
1 2 3 4 5 6 7 8 9 10 11 12 |
trait RelationProvider { def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation } abstract class BaseRelation { def sqlContext: SQLContext def schema: StructType } trait TableScan { def buildScan(): RDD[Row] } |
經過實現 RelationProvider
接口,代表該類是一種新定義的數據源,能夠供 Spark SQL 取數所用。傳入 createRelation
方法的參數能夠用來作初始化,如文件路徑、權限信息等。BaseRelation
抽象類則用來定義數據源的表結構,它的來源能夠是數據庫、Parquet 文件等外部系統,也能夠直接由用戶指定。該類還必須實現某個 Scan
接口,Spark 會調用 buildScan
方法來獲取數據源的 RDD,咱們將在下文看到。mysql
下面咱們來使用 V1 API 實現一個經過 JDBC 讀取數據庫的自定義數據源。爲簡單起見,表結構信息是直接寫死在代碼裏的,咱們先從整表掃描開始。完整的代碼能夠在 GitHub(連接)中找到,數據源表則能夠在這個 連接 中查看。git
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
class JdbcSourceV1 extends RelationProvider { override def createRelation(parameters: Map[String, String]): BaseRelation = { new JdbcRelationV1(parameters("url")) } } class JdbcRelationV1(url: String) extends BaseRelation with TableScan { override def schema: StructType = StructType(Seq( StructField("id", IntegerType), StructField("emp_name", StringType) )) override def buildScan(): RDD[Row] = new JdbcRDD(url) } class JdbcRDD(url: String) extends RDD[Row] { override def compute(): Iterator[Row] = { val conn = DriverManager.getConnection(url) val stmt = conn.prepareStatement("SELECT * FROM employee") val rs = stmt.executeQuery() new Iterator[Row] { def hasNext: Boolean = rs.next() def next: Row = Row(rs.getInt("id"), rs.getString("emp_name")) } } } |
JdbcRDD#compute
負責實際的讀取操做,它從上游獲取到數據庫鏈接信息、選取的字段、以及過濾條件,拼裝 SQL 後執行,並返回一個 Row
類型的迭代器對象,每一行數據的結構和請求的字段列表相符。定義好數據源後,咱們就能夠用 DataFrame
對象來直接操做了:github
1 2 3 4 5 6 7 |
val df = spark.read .format("JdbcSourceV2") .option("url", "jdbc:mysql://localhost/spark") .load() df.printSchema() df.show() |
上述代碼輸出的內容是:sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
root |-- id: integer (nullable = true) |-- emp_name: string (nullable = true) |-- dep_name: string (nullable = true) |-- salary: decimal(7,2) (nullable = true) |-- age: decimal(3,0) (nullable = true) +---+--------+----------+-------+---+ | id|emp_name| dep_name| salary|age| +---+--------+----------+-------+---+ | 1| Matthew|Management|4500.00| 55| | 2| Olivia|Management|4400.00| 61| | 3| Grace|Management|4000.00| 42| | 4| Jim|Production|3700.00| 35| | 5| Alice|Production|3500.00| 24| +---+--------+----------+-------+---+ |
咱們能夠看到,V1 API 使用起來很是方便,所以可以知足 Spark SQL 初期的需求,但也難免存在不少侷限性:數據庫
createRelation
接收 SQLContext
做爲參數;buildScan
方法返回的是 RDD[Row]
類型;而在實現寫操做時,insert
方法會直接接收 DataFrame
類型的參數:apache
1 2 3 |
trait InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit } |
這些類型都屬於較爲上層的 Spark API,其中某些類已經發生了變化,如 SQLContext
已被 SparkSession
取代,而 DataFrame
也改成了 Dataset[Row]
類型的一個別稱。這些改變不該該體現到底層的數據源 API 中。api
除了上文中的 TableScan
接口,V1 API 還提供了 PrunedScan
接口,用來裁剪不須要的字段;PrunedFilteredScan
接口則能夠將過濾條件下推到數據源中。在 JdbcSourceV1
示例中,這類下推優化會體如今 SQL 語句裏:服務器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class JdbcRelationV1 extends BaseRelation with PrunedFilteredScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = { new JdbcRDD(requiredColumns, filters) } } class JdbcRDD(columns: Array[String], filters: Array[Filter]) { def compute() = { val wheres = filters.flatMap { case EqualTo(attribute, value) => Some(s"$attribute = '$value'") case _ => None } val sql = s"SELECT ${columns.mkString(", ")} FROM employee WHERE ${wheres.mkString(" AND ")}" } } |
若是咱們想添加新的優化算子(如 LIMIT 子句),就難免要引入一系列的 Scan
接口組合:
1 2 3 4 5 6 7 8 9 10 11 |
trait LimitedScan { def buildScan(limit: Int): RDD[Row] } trait PrunedLimitedScan { def buildScan(requiredColumns: Array[String], limit: Int): RDD[Row] } trait PrunedFilteredLimitedScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter], limit: Int): RDD[Row] } |
對於支持數據分區的數據源,如 HDFS、Kafka 等,V1 API 沒有提供原生的支持,於是也不能利用數據局部性(Data Locality)。咱們須要本身繼承 RDD 來實現,好比下面的代碼就對 Kafka 數據源進行了分區,並告知 Spark 能夠將數據讀取操做放入 Kafka Broker 所在的服務器上執行,以提高效率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
case class KafkaPartition(partitionId: Int, leaderHost: String) extends Partition { def index: Int = partitionId } class KafkaRDD(sc: SparkContext) extends RDD[Row](sc, Nil) { def getPartitions: Array[Partition] = Array( // populate with Kafka PartitionInfo KafkaPartition(0, "broker_0"), KafkaPartition(1, "broker_1") ) override def getPreferredLocations(split: Partition): Seq[String] = Seq( split.asInstanceOf[KafkaPartition].leaderHost ) } |
此外,相似 Cassandra 這樣的數據庫,會按主鍵對數據進行分片。那麼,若是查詢語句中包含了按該主鍵進行分組的子句,Spark 就能夠省去一次 Shuffle 操做。這在 V1 API 中也是不支持的,而 V2 API 則提供了 SupportsReportPartitioning
接口來支持。
Spark 任務是有可能失敗的,使用 V1 API 時就會留下部分寫入的數據。固然,對於 HDFS 這樣的文件系統來講問題不大,由於能夠用 _SUCCESS
來標記該次寫操做是否執行成功。但這一邏輯也須要最終用戶來實現,而 V2 API 則提供了明確的接口來支持事務性的寫操做。
Spark SQL 目前已支持列存儲和流式計算,但二者都不是用 V1 API 實現的。ParquetFileFormat
和 KafkaSource
等類型都使用了專有代碼和內部 API。這些特性也在 V2 API 中獲得支持。
V2 API 首先使用了一個標記性的 DataSourceV2
接口,實現了該接口的類還必須實現 ReadSupport
或 WriteSupport
,用來表示自身支持讀或寫操做。ReadSupport
接口中的方法會被用來建立 DataSourceReader
類,同時接收到初始化參數;該類繼而建立 DataReaderFactory
和 DataReader
類,後者負責真正的讀操做,接口中定義的方法和迭代器相似。此外,DataSourceReader
還能夠實現各種 Support
接口,代表本身支持某些優化下推操做,如裁剪字段、過濾條件等。WriteSupport
API 的層級結構與之相似。這些接口都是用 Java 語言編寫,以得到更好的交互支持。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public interface DataSourceV2 {} public interface ReadSupport extends DataSourceV2 { DataSourceReader createReader(DataSourceOptions options); } public interface DataSourceReader { StructType readSchema(); List<DataReaderFactory<Row>> createDataReaderFactories(); } public interface SupportsPushDownRequiredColumns extends DataSourceReader { void pruneColumns(StructType requiredSchema); } public interface DataReaderFactory<T> { DataReader<T> createDataReader(); } public interface DataReader<T> extends Closeable { boolean next(); T get(); } |
可能你會注意到,DataSourceReader#createDataReaderFactories
仍然捆綁了 Row
類型,這是由於目前 V2 API 只支持 Row
類型的返回值,且這套 API 仍處於進化狀態(Evolving)。
讓咱們使用 V2 API 來重寫 JDBC 數據源。下面是一個整表掃描的示例,完整代碼能夠在 GitHub(連接)上查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
class JdbcDataSourceReader extends DataSourceReader { def readSchema = StructType(Seq( StructField("id", IntegerType), StructField("emp_name", StringType) )) def createDataReaderFactories() = { Seq(new JdbcDataReaderFactory(url)).asJava } } class JdbcDataReader(url: String) extends DataReader[Row] { private var conn: Connection = null private var rs: ResultSet = null def next() = { if (rs == null) { conn = DriverManager.getConnection(url) val stmt = conn.prepareStatement("SELECT * FROM employee") rs = stmt.executeQuery() } rs.next() } def get() = Row(rs.getInt("id"), rs.getString("emp_name")) } |
經過實現 SupportsPushDownRequiredColumns
接口,Spark 會調用其 pruneColumns
方法,傳入用戶所指定的字段列表(StructType
),DataSourceReader
能夠將該信息傳給 DataReader
使用。
1 2 3 4 5 6 7 8 9 10 11 |
class JdbcDataSourceReader with SupportsPushDownRequiredColumns { var requiredSchema = JdbcSourceV2.schema def pruneColumns(requiredSchema: StructType) = { this.requiredSchema = requiredSchema } def createDataReaderFactories() = { val columns = requiredSchema.fields.map(_.name) Seq(new JdbcDataReaderFactory(columns)).asJava } } |
咱們能夠用 df.explain(true)
來驗證執行計劃。例如,SELECT emp_name, age FROM employee
語句的執行計劃在優化先後是這樣的:
1 2 3 4 5 6 7 8 9 |
== Analyzed Logical Plan == emp_name: string, age: decimal(3,0) Project [emp_name#1, age#4] +- SubqueryAlias employee +- DataSourceV2Relation [id#0, emp_name#1, dep_name#2, salary#3, age#4], datasource.JdbcDataSourceReader@15ceeb42 == Optimized Logical Plan == Project [emp_name#1, age#4] +- DataSourceV2Relation [emp_name#1, age#4], datasource.JdbcDataSourceReader@15ceeb42 |
能夠看到,字段裁剪被反映到了數據源中。若是咱們將實際執行的 SQL 語句打印出來,也能看到字段裁剪下推的結果。
相似的,實現 SupportsPushDownFilters
接口能夠將過濾條件下推到數據源中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
class JdbcDataSourceReader with SupportsPushDownFilters { var filters = Array.empty[Filter] var wheres = Array.empty[String] def pushFilters(filters: Array[Filter]) = { val supported = ListBuffer.empty[Filter] val unsupported = ListBuffer.empty[Filter] val wheres = ListBuffer.empty[String] filters.foreach { case filter: EqualTo => { supported += filter wheres += s"${filter.attribute} = '${filter.value}'" } case filter => unsupported += filter } this.filters = supported.toArray this.wheres = wheres.toArray unsupported.toArray } def pushedFilters = filters def createDataReaderFactories() = { Seq(new JdbcDataReaderFactory(wheres)).asJava } } |
createDataReaderFactories
返回的是列表類型,每一個讀取器都會產生一個 RDD 分區。若是咱們想開啓多個讀取任務,就能夠生成多個讀取器工廠,併爲每一個讀取器限定主鍵範圍:
1 2 3 4 5 6 |
def createDataReaderFactories() = { Seq((1, 6), (7, 11)).map { case (minId, maxId) => val partition = s"id BETWEEN $minId AND $maxId" new JdbcDataReaderFactory(partition) }.asJava } |
V2 API 提供了兩組 commit
/ abort
方法,用來實現事務性的寫操做:
1 2 3 4 5 6 7 8 9 10 |
public interface DataSourceWriter { void commit(WriterCommitMessage[] messages); void abort(WriterCommitMessage[] messages); } public interface DataWriter<T> { void write(T record) throws IOException; WriterCommitMessage commit() throws IOException; void abort() throws IOException; } |
DataSourceWriter
在 Spark Driver 中執行,DataWriter
則運行在其餘節點的 Spark Executor 上。當 DataWriter
成功執行了寫操做,就會將提交信息傳遞給 Driver;當 DataSourceWriter
收集到了全部寫任務的提交信息,就會執行最終的提交操做。若是某個寫任務失敗了,它的 abort
方法會獲得執行;若是通過多輪重試後仍然失敗,則全部寫任務的 abort
方法都會被調用,進行數據清理操做。
這兩個特性仍處於實驗性階段,在 Spark 中尚未獲得使用。簡單來講,DataSourceReader
類能夠實現 SupportsScanColumnarBatch
接口來聲明本身會返回 ColumnarBatch
對象,這個對象是 Spark 內部用來存放列式數據的。對於流式數據,則有 MicroBatchReader
和 ContinuousReader
這兩個接口,感興趣的讀者能夠到 Spark 單元測試 代碼中查看。