Spark1.0出來了,變化仍是挺大的,文檔比之前齊全了,RDD支持的操做比之前多了一些,Spark on yarn功能我竟然跑通了。可是最最重要的就是多了一個Spark SQL的功能,它能對RDD進行Sql操做,目前它只是一個alpha版本,喜歡嚐鮮的同志們進來看看吧,下面是它的官網的翻譯。sql
Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的關係型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把行對象用一個Schema來描述行裏面的全部列的數據類型,它就像是關係型數據庫裏面的一張表。它能夠從原有的RDD建立,也能夠是Parquet文件,最重要的是它能夠支持用HiveQL從hive裏面讀取數據。shell
下面是一些案例,能夠在Spark shell當中運行。數據庫
首先咱們要建立一個熟悉的Context,熟悉spark的人都知道吧,有了Context咱們才能夠進行各類操做。apache
val sc: SparkContext // 已經存在的SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._
Spark SQL支持的一種表的類型是Scala的case class,case class定義了表的類型,下面是例子:maven
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ // case class在Scala 2.10裏面最多支持22個列,,爲了突破這個現實,最好是定義一個類實現Product接口 case class Person(name: String, age: Int) // 爲Person的對象建立一個RDD,而後註冊成一張表 val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) people.registerAsTable("people") // 直接寫sql吧,這個方法是sqlContext提供的 val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // teenagers是SchemaRDDs類型,它支持全部普通的RDD操做 teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
從上面這個方法來看,不是很好用,一個表好幾十個字段,我就得一個一個的去賦值,它如今支持的操做都是很簡單的操做,想要實現複雜的操做能夠具體去看HiveContext提供的HiveQL。spa
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val people: RDD[Person] = ... // 同上面的例子. // 這個RDD已經隱式轉換成一個SchemaRDD, 容許它存儲成Parquet格式. people.saveAsParquetFile("people.parquet") // 從上面建立的文件裏面讀取,加載一個Parquet文件的結果也是一種JavaSchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //註冊成表,而後使用 parquetFile.registerAsTable("parquetFile") val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.collect().foreach(println)
目前這個功能只是在Scala裏面支持,挺雞肋的一個功能翻譯
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val people: RDD[Person] = ... // 同前面的例子. // 和後面這個語句是同樣的 'SELECT name FROM people WHERE age >= 10 AND age <= 19' val teenagers = people.where('age >= 10).where('age <= 19).select('name)
這下面的纔是高潮,它能夠從hive裏面取數據。可是hive的依賴太多了,默認Spark assembly是沒帶這些依賴的,須要咱們運行SPARK_HIVE=true sbt/sbt assembly/assembly從新編譯,或者用maven的時候添加-Phive參數,它會從新編譯出來一個hive assembly的jar包,而後須要把這個jar包放到全部的節點上。另外還須要把hive-site.xml放到conf目錄下。沒進行hive部署的話,下面的例子也能夠用LocalHiveContext來代替HiveContext。
code
val sc: SparkContext // 已經存在的SparkContext val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) // 引入這個Context,而後就會給全部的sql語句進行隱式轉換 import hiveContext._ hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // 使用HiveQL查詢 hql("FROM src SELECT key, value").collect().foreach(println)
這個功能看起來還挺像樣,前面兩個看起來就像渣同樣,沒勁兒,不知道爲何不自帶那些依賴,還要咱們再編譯一下,可是我下的那個版本運行的時候提示我已經編譯包括了hive的。尼瑪,真噁心。xml