Spark SQL

Spark SQL是支持在Spark中使用Sql、HiveSql、Scala中的關係型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把行對象用一個Schema來描述行裏面的全部列的數據類型,它就像是關係型數據庫裏面的一張表。它能夠從原有的RDD建立,也能夠是Parquet文件,最重要的是它能夠支持用HiveQL從hive裏面讀取數據。sql

下面是一些案例,能夠在Spark shell當中運行。shell

首先咱們要建立一個熟悉的Context,熟悉spark的人都知道吧,有了Context咱們才能夠進行各類操做。數據庫

val sc: SparkContext // 已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

Data Sources(數據源)

Spark SQL經過SchemaRDD接口支持在多種數據源上進行操做。一旦一個數據集被加載,它能夠被註冊成一個表格,甚至能夠和其它數據源有鏈接。express

RDDs

 Spark SQL支持的一種表的類型是Scala的case class,case class定義了表的類型,下面是例子:apache

// sc是一個已經存在的SprakContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc) // import sqlContext._ import sqlContext.createSchemaRDD // case class在Scala 2.10裏面最多支持22個列,爲了突破這個限制,最好是定義一個類實現Product接口 case class Person(name: String, age: Int) // 爲Person的對象建立一個RDD,而後註冊成一張表 val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) people.registerAsTable("people") // 直接寫sql吧,這個方法是sqlContext提供的 val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // teenagers是SchemaRDDs類型,它支持全部普通的RDD操做 teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

從上面這個方法來看,不是很好用,一個表好幾十個字段,我就得一個一個的去賦值,它如今支持的操做都是很簡單的操做,想要實現複雜的操做能夠具體去看HiveContext提供的HiveQL。編程

Parquet Files

Parquet是一種列式存儲格式而且被許多數據處理系統支持。Parquet爲Hadoop生態系統中的全部項目提供支持高效率壓縮的列式數據表達,並且與數據處理框架、數據模型或編程語言都沒有關係。Spark SQL提供了對Parquet的讀和寫,自動保留原始數據的架構。json

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// import sqlContext._ // createSchemaRDD被用來將RDD隱式轉換成一個SchemaRDD
import sqlContext.createSchemaRDD
val people: RDD[Person]
= ... // 同上面的例子. // 這個RDD已經隱式轉換成一個SchemaRDD, 容許它存儲成Parquet格式. people.saveAsParquetFile("people.parquet") // 從上面建立的文件裏面讀取,加載一個Parquet文件的結果也是一種JavaSchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //註冊成表,而後在SQL狀態下使用 parquetFile.registerAsTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name:" + t(0)).collect().foreach(println)

JSON Datasets(JSON數據集)

JSON(JavaScript Object Notation) 是一種輕量級的數據交換格式。它基於JavaScript(Standard ECMA-262 3rd Edition - December 1999)的一個子集。 JSON採用徹底獨立於語言的文本格式,可是也使用了相似於C語言家族的習慣(包括C, C++, C#, Java, JavaScript, Perl, Python等)。這些特性使JSON成爲理想的數據交換語言。易於人閱讀和編寫,同時也易於機器解析和生成(網絡傳輸速度快)。網絡

SparkSQL能夠自動推斷出一個JSON數據集模式並做爲一個SchemaRDD來加載。這種轉換能夠經過使用SQLContext中的兩個方法中的一個獲得:架構

jsonFile - 從JSON文件的目錄中加載數據,其中文件的每一行就是一個JSON對象。框架

jsonRdd - 從一個已存在的RDD中加載數據,其中每個RDD元素都是一個包含一個JSON對象的字符串。

// sc 是已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 一個JSON 數據集用一個路徑指出
// 這個路徑既能夠是一個單獨的文本文件,也能夠是一個存儲文本文件的目錄
val path = "examples/src/main/resources/people.json"
// 根據路徑指出的文件生成一個SchemaRDD 
val people = sqlContext.jsonFile(path)

// 推斷的模式能夠經過使用printSchema() 方法顯式化
people.printSchema()
// root
//  |-- age: IntegerType
//  |-- name: StringType

// 把SchemaRDD註冊成一個表
people.registerAsTable("people")

// SQL狀態能夠經過使用sqlContext提供的sql方法運行
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另外,一個SchemaRDD也能夠經過每一個字符串存儲一個JSON數據集對象的string類型的RDD來生成
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

 

Hive Tables

 Spark SQL也支持讀寫存儲在Apache Hive上的數據。然而,hive的依賴太多了,默認的Spark assembly 是沒帶這些依賴的,須要咱們運行 SPARK_HIVE=true sbt/sbt assembly/assembly從新編譯,或者用maven的時候添加 -Phive參數,它會從新編譯出來一個hive  assembly的jar包,而後須要把這個jar包放到全部的節點上。另外還須要把 hive-site.xml放到conf目錄下。沒進行hive部署的話,下面的例子也能夠用LocalHiveContext來代替HiveContext。

val sc: SparkContext // 已經存在的SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 引入這個Context,而後就會給全部的sql語句進行隱式轉換
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用HiveQL查詢
hql("FROM src SELECT key, value").collect().foreach(println)

或者寫成以下形式:

// sc is an existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hiveContext.hql("FROM src SELECT key, value").collect().foreach(println)

Writing Language-Integrated Relational Queries

文字語言綜合關聯查詢,目前這個功能只是在Scala裏面支持。

Spark SQL還支持一個特定域的語言編寫查詢。再次,利用上述實例數據:

// sc是一個已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val people: RDD[Person] = ... // 同前面的例子. // 和後面這個語句是同樣的 'SELECT name FROM people WHERE age >= 10 AND age <= 19' val teenagers = people.where('age >= 10).where('age <= 19).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

DSL(領域語言)使用scala符號表示隱含表中的列,經過在前面加一個(‘)來標示。隱式轉換將這些符號表達式表示成SQL執行引擎的值。一個完整的功能支持列表能夠在ScalaDoc中找到。

相關文章
相關標籤/搜索