Spark1.0新特性-->Spark SQL

Spark1.0出來了,變化仍是挺大的,文檔比之前齊全了,RDD支持的操做比之前多了一些,Spark on yarn功能我竟然跑通了。可是最最重要的就是多了一個Spark SQL的功能,它能對RDD進行Sql操做,目前它只是一個alpha版本,喜歡嚐鮮的同志們進來看看吧,下面是它的官網的翻譯。sql

 

Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的關係型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把行對象用一個Schema來描述行裏面的全部列的數據類型,它就像是關係型數據庫裏面的一張表。它能夠從原有的RDD建立,也能夠是Parquet文件,最重要的是它能夠支持用HiveQL從hive裏面讀取數據。shell

下面是一些案例,能夠在Spark shell當中運行。數據庫

首先咱們要建立一個熟悉的Context,熟悉spark的人都知道吧,有了Context咱們才能夠進行各類操做。apache

val sc: SparkContext // 已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

Running SQL on RDDs

 Spark SQL支持的一種表的類型是Scala的case class,case class定義了表的類型,下面是例子:maven

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

// case class在Scala 2.10裏面最多支持22個列,,爲了突破這個現實,最好是定義一個類實現Product接口
case class Person(name: String, age: Int)

// 爲Person的對象建立一個RDD,而後註冊成一張表
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")

// 直接寫sql吧,這個方法是sqlContext提供的
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// teenagers是SchemaRDDs類型,它支持全部普通的RDD操做
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

從上面這個方法來看,不是很好用,一個表好幾十個字段,我就得一個一個的去賦值,它如今支持的操做都是很簡單的操做,想要實現複雜的操做能夠具體去看HiveContext提供的HiveQL。spa

Using Parquet

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val people: RDD[Person] = ... // 同上面的例子.

// 這個RDD已經隱式轉換成一個SchemaRDD, 容許它存儲成Parquet格式.
people.saveAsParquetFile("people.parquet")

// 從上面建立的文件裏面讀取,加載一個Parquet文件的結果也是一種JavaSchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")

//註冊成表,而後使用
parquetFile.registerAsTable("parquetFile")
val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.collect().foreach(println)

 

Writing Language-Integrated Relational Queries

目前這個功能只是在Scala裏面支持,挺雞肋的一個功能翻譯

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val people: RDD[Person] = ... // 同前面的例子.

// 和後面這個語句是同樣的 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)

 

Hive Support

 這下面的纔是高潮,它能夠從hive裏面取數據。可是hive的依賴太多了,默認Spark assembly是沒帶這些依賴的,須要咱們運行SPARK_HIVE=true sbt/sbt assembly/assembly從新編譯,或者用maven的時候添加-Phive參數,它會從新編譯出來一個hive assembly的jar包,而後須要把這個jar包放到全部的節點上。另外還須要把hive-site.xml放到conf目錄下。沒進行hive部署的話,下面的例子也能夠用LocalHiveContext來代替HiveContextcode

val sc: SparkContext // 已經存在的SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 引入這個Context,而後就會給全部的sql語句進行隱式轉換
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用HiveQL查詢
hql("FROM src SELECT key, value").collect().foreach(println)

這個功能看起來還挺像樣,前面兩個看起來就像渣同樣,沒勁兒,不知道爲何不自帶那些依賴,還要咱們再編譯一下,可是我下的那個版本運行的時候提示我已經編譯包括了hive的。尼瑪,真噁心。xml

相關文章
相關標籤/搜索