Spark SQL - 對大規模的結構化數據進行批處理和流式處理

Spark SQL - 對大規模的結構化數據進行批處理和流式處理

大致翻譯自:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql.htmlhtml

如同通常的 Spark 處理,Spark SQL 本質上也是大規模的基於內存的分佈式計算。git

Spark SQL 和 RDD 計算模型最大的區別在於數據處理的框架不一樣。Spark SQL 能夠經過多種不一樣的方式對結構化的數據和半結構化的數據進行處理。它既可使用 SQL , HiveQL 這種結構化查詢查詢語言,也可使用類 SQL,聲明式,類型安全的Dataset API 進行查詢,這種被稱爲 Structured Query DSLsql

Note:能夠經過 Schema 對結構化和半結構化的數據進行描述。

Spark SQL 支持 批處理(Batch) 和流式處理(Struct streaming) 兩種處理方式。數據庫

Note:本質上,結構化查詢都會自動編譯爲相應的 RDD 操做。

不管使用什麼樣的查詢方式,全部的查詢都會轉化爲一個由 Catalyst expressions 組成的樹,在這個過程當中會對不斷的對查詢進行優化。express

在 Spark 2.0 之後, Spark SQL 已經成爲了 Spark 計算平臺最主要的接口, 它經過更高層次的抽象封裝了RDD,方便用戶經過 SQL 處理數據。apache

// Define the schema using a case class
case class Person(name: String, age: Int)

// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek", 10)))

// Convert RDD[Person] to Dataset[Person] and run a query

// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]

// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10).where('age <= 19).select('name).as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]

scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+

// You could however want to use good ol' SQL, couldn't you?

// 1. Register people Dataset as a temporary view in Catalog
people.createOrReplaceTempView("people")

// 2. Run SQL query
val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19")
scala> teenagers.show
+-----+---+
| name|age|
+-----+---+
|Jacek| 10|
+-----+---+

經過啓動 Hive 支持 (enableHiveSupport),用戶能夠 HiveQL 對 Hive 中的數據進行處理。json

sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('path'='people.csv', 'header'='true')")

// Queries are expressed in HiveQL
sql("FROM v1").show

scala> sql("desc EXTENDED v1").show(false)
+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|# col_name|data_type|comment|
|key       |int      |null   |
|value     |string   |null   |
+----------+---------+-------+

和其它的數據庫同樣, Spark SQL 經過 Logical Query Plan Optimizer, code generation , Tungsten execution engine 來這些措施進行優化。安全

Spark SQL 引入了一種抽象的表格式的數據結構 Dataset。 經過 Dataset, Spark SQL 能夠更加方便、快速的處理大批量的結構化數據。數據結構

Note:Spark SQL 藉助Apache Drill 直接在一些數據文件上進行查詢

下面的片斷展現瞭如何讀取JSON文件,而後將一種一部分數據保存爲CSV文件。框架

spark.read
  .format("json")
  .load("input-json")
  .select("name", "score")
  .where($"score" > 15)
  .write
  .format("csv")
  .save("output-csv")

DataSet 是 Spark SQL 中最核心的抽象。他表示了一批已知 schema 的結構化數據。這些數據能夠能夠保存在JVM 堆外的內存中,而且變爲列壓縮的二進制串,來增長計算的速度,減小內存的使用和GC。

Spark SQL 支持 predicate pushdown 對 DataSet 的性能進行優化,而且能夠在運行時生成優化代碼。

Spark SQL 包含了如下幾種 API:

  1. Dataset API
  2. Structred Streaming API
  3. SQL
  4. JDBC/ODBC

Spark SQL 經過 DataFrameReader 和 DataFrameWrite 這兩個統一的接口來訪問 HDFS 等存儲系統。

Spark SQL 定義了集中不一樣類型的函數:

  • 標準函數 和 UDF。
  • 基本的集合函數。
  • 窗口聚合函數。

若是你已經將一個 CSV 加載到一個 dataframe 中了,那你能夠經過將 dataframe 註冊爲 table, 而後使用 SQL 進行查詢。

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
  .agg(max('j).as("aggOrdering"))
  .orderBy(sum('j))
  .as[(Int, Int)]
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
|                  1|
|                  0|
+-------------------+
更多參考:
  1. Spark SQL home
  2. Spark’s Role in the Big Data Ecosystem - Matei Zaharia
  3. Introducing Apache Spark 2.0
相關文章
相關標籤/搜索