一、Spark SQL是Spark中的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還能夠做爲分佈式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數據。javascript
二、DataFramecss
就易用性而言,對比傳統的MapReduce API,說Spark的RDD API有了數量級的飛躍並不爲過。然而,對於沒有MapReduce和函數式編程經驗的新手來講,RDD API仍然存在着必定的門檻。另外一方面,數據科學家們所熟悉的R、Pandas等傳統數據框架雖然提供了直觀的API,卻侷限於單機處理,沒法勝任大數據場景。爲了解決這一矛盾,Spark SQL 原有SchemaRDD的基礎上提供了與R和Pandas風格相似的DataFrame API。新的DataFrame API不只能夠大幅度下降普通開發者的學習門檻,同時還支持Scala、Java與Python三種語言。更重要的是,因爲脫胎自SchemaRDD,DataFrame自然適用於分佈式大數據場景。java
在Spark中,DataFrame是一種以RDD爲基礎的分佈式數據集,相似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背後的數據源以及做用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提高運行時效率的目標。反觀RDD,因爲無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。sql
要使用Spark SQL,首先就得建立一個建立一個SQLContext對象,或者是它的子類的對象,好比HiveContext的對象。數據庫
Java版本:apache
JavaSparkContext sc = ...;
SQLContext sqlContext = new SQLContext(sc);
Scala版本:編程
val sc: SparkContext = ...
val sqlContext = new SQLContext(sc) import sqlContext.implicits._
一、除了基本的SQLContext之外,還可使用它的子類——HiveContext。HiveContext的功能除了包含SQLContext提供的全部功能以外,還包括了額外的專門針對Hive的一些功能。這些額外功能包括:使用HiveQL語法來編寫和執行SQL,使用Hive中的UDF函數,從Hive表中讀取數據。json
二、要使用HiveContext,就必須預先安裝好Hive,SQLContext支持的數據源,HiveContext也一樣支持——而不僅是支持Hive。對於Spark 1.3.x以上的版本,都推薦使用HiveContext,由於其功能更加豐富和完善。api
三、Spark SQL還支持用spark.sql.dialect參數設置SQL的方言。使用SQLContext的setConf()便可進行設置。對於SQLContext,它只支持「sql」一種方言。對於HiveContext,它默認的方言是「hiveql」。緩存
使用SQLContext或者HiveContext,能夠從RDD、Hive、ZMQ、Kafka和RabbitMQ等或者其餘數據源,來建立一個DataFrame。咱們來舉例使用JSON文件爲例建立一個DataFrame。
Java版本:
JavaSparkContext sc = new JavaSparkContext(); SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.read().json("hdfs://ns1/spark/sql/person.json"); df.show();
Scala版本:
val sc: SparkContext = new SparkContext(); val sqlContext = new SQLContext(sc) val df = sqlContext.read.json(" hdfs://ns1/spark/sql/person.json") df.show()
json數據以下:
{"name":"Michael", "age":10, "height": 168.8} {"name":"Andy", "age":30, "height": 168.8} {"name":"Justin", "age":19, "height": 169.8} {"name":"Jack", "age":32, "height": 188.8} {"name":"John", "age":10, "height": 158.8} {"name":"Domu", "age":19, "height": 179.8} {"name":"袁帥", "age":13, "height": 179.8} {"name":"殷傑", "age":30, "height": 175.8} {"name":"孫瑞", "age":19, "height": 179.9}
測試代碼以下:
package cn.xpleaf.bigdata.spark.scala.sql.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/** * SparkSQL基礎操做學習 * 操做SparkSQL的核心就是DataFrame,DataFrame帶了一張內存中的二維表,包括元數據信息和表數據 */ object _01SparkSQLOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df:DataFrame = sqlContext.read.json("D:/data/spark/sql/people.json") // 1.打印DF中全部的記錄 println("1.打印DF中全部的記錄") df.show() // 默認的輸出表中數據的操做,至關於db中select * from t limit 20 // 2.打印出DF中全部的schema信息 println("2.打印出DF中全部的schema信息") df.printSchema() // 3.查詢出name的列並打印出來 select name from t // df.select("name").show() println("3.查詢出name的列並打印出來") df.select(new Column("name")).show() // 4.過濾並打印出年齡超過14歲的人 println("4.過濾並打印出年齡超過14歲的人") df.select(new Column("name"), new Column("age")).where("age>14").show() // 5.給每一個人的年齡都加上10歲 println("5.給每一個人的年齡都加上10歲") df.select(new Column("name"), new Column("age").+(10).as("10年後的年齡")).show() // 6.按照身高進行分組 println("6.按照身高進行分組") // select height, count(1) from t group by height; df.select(new Column("height")).groupBy(new Column("height")).count().show() // 註冊表 df.registerTempTable("people") // 執行sql操做 var sql = "select height, count(1) from people group by height" sqlContext.sql(sql).show() sc.stop() } }
輸出結果以下:
1.打印DF中全部的記錄 18/05/08 16:06:09 INFO FileInputFormat: Total input paths to process : 1 +---+------+-------+ |age|height| name| +---+------+-------+ | 10| 168.8|Michael| | 30| 168.8| Andy| | 19| 169.8| Justin| | 32| 188.8| Jack| | 10| 158.8| John| | 19| 179.8| Domu| | 13| 179.8| 袁帥| | 30| 175.8| 殷傑| | 19| 179.9| 孫瑞| +---+------+-------+ 2.打印出DF中全部的schema信息 root |-- age: long (nullable = true) |-- height: double (nullable = true) |-- name: string (nullable = true) 3.查詢出name的列並打印出來 18/05/08 16:06:10 INFO FileInputFormat: Total input paths to process : 1 +-------+ | name| +-------+ |Michael| | Andy| | Justin| | Jack| | John| | Domu| | 袁帥| | 殷傑| | 孫瑞| +-------+ 4.過濾並打印出年齡超過14歲的人 18/05/08 16:06:10 INFO FileInputFormat: Total input paths to process : 1 +------+---+ | name|age| +------+---+ | Andy| 30| |Justin| 19| | Jack| 32| | Domu| 19| | 殷傑| 30| | 孫瑞| 19| +------+---+ 5.給每一個人的年齡都加上10歲 18/05/08 16:06:10 INFO FileInputFormat: Total input paths to process : 1 +-------+-------+ | name|10年後的年齡| +-------+-------+ |Michael| 20| | Andy| 40| | Justin| 29| | Jack| 42| | John| 20| | Domu| 29| | 袁帥| 23| | 殷傑| 40| | 孫瑞| 29| +-------+-------+ 6.按照身高進行分組 18/05/08 16:06:11 INFO FileInputFormat: Total input paths to process : 1 +------+-----+ |height|count| +------+-----+ | 179.9| 1| | 188.8| 1| | 158.8| 1| | 179.8| 2| | 169.8| 1| | 168.8| 2| | 175.8| 1| +------+-----+ 18/05/08 16:06:14 INFO FileInputFormat: Total input paths to process : 1 +------+---+ |height|_c1| +------+---+ | 179.9| 1| | 188.8| 1| | 158.8| 1| | 179.8| 2| | 169.8| 1| | 168.8| 2| | 175.8| 1| +------+---+
下面涉及的測試代碼中,須要使用到的源數據sql-rdd-source.txt
,以下:
1, zhangsan, 13, 175 2, lisi, 14, 180 3, wangwu, 15, 175 4, zhaoliu, 16, 195 5, zhouqi, 17, 165 6, weiba, 18, 155
使用到的Person類,代碼以下:
public class Person { private int id; private String name; private int age; private double height; public Person() { } public Person(int id, String name, int age, double height) { this.id = id; this.name = name; this.age = age; this.height = height; } }
一、一個問題就擺在你們的面前:爲何要將RDD轉換爲DataFrame呀?
主要是能使用Spark SQL進行SQL查詢了。這個功能是無比強大的。
二、是使用反射機制推斷包含了特定數據類型的RDD的元數據。這種基於反射的方式,代碼比較簡潔,事前知道要定義的POJO的元數據信息,當你已經知道你的RDD的元數據時,是一種很是不錯的方式。
一、Java版本:
Spark SQL是支持將包含了POJO的RDD轉換爲DataFrame的。POJO的信息,就定義了元數據。Spark SQL如今是不支持將包含了嵌套POJO或者List等複雜數據的POJO,做爲元數據的。只支持一個包含簡單數據類型的field的POJO。
二、Scala版本:
而Scala因爲其具備隱式轉換的特性,因此Spark SQL的Scala接口,是支持自動將包含了case class的RDD轉換爲DataFrame的。case class就定義了元數據。Spark SQL會經過反射讀取傳遞給case class的參數的名稱,而後將其做爲列名。
不一樣點:
三、與Java不一樣的是,Spark SQL是支持將包含了嵌套數據結構的case class做爲元數據的,好比包含了Array等。
測試代碼以下:
package cn.xpleaf.bigdata.spark.scala.sql.p1
import java.util import java.util.{Arrays, List} import cn.xpleaf.bigdata.spark.java.sql.p1.Person import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, SQLContext} /** * SparkRDD與DataFrame之間的轉換操做 * 1.經過反射的方式,將RDD轉換爲DataFrame * 2.經過動態編程的方式將RDD轉換爲DataFrame * 這裏演示的是第1種 */ object _02SparkRDD2DataFrame { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) // 使用kryo的序列化方式 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[Person])) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines = sc.textFile("D:/data/spark/sql/sql-rdd-source.txt") val personRDD:RDD[Person] = lines.map(line => { val fields = line.split(",") val id = fields(0).trim.toInt val name = fields(1).trim val age = fields(2).trim.toInt val height = fields(3).trim.toDouble new Person(id, name, age, height) }) val persons: util.List[Person] = util.Arrays.asList( new Person(1, "孫人才", 25, 179), new Person(2, "劉銀鵬", 22, 176), new Person(3, "郭少波", 27, 178), new Person(1, "齊彥鵬", 24, 175)) // val df:DataFrame = sqlContext.createDataFrame(persons, classOf[Person]) // 這種方式也能夠 val df:DataFrame = sqlContext.createDataFrame(personRDD, classOf[Person]) df.show() sc.stop() } }
輸出結果以下:
+---+------+---+--------+
|age|height| id| name| +---+------+---+--------+ | 13| 175.0| 1|zhangsan| | 14| 180.0| 2| lisi| | 15| 175.0| 3| wangwu| | 16| 195.0| 4| zhaoliu| | 17| 165.0| 5| zhouqi| | 18| 155.0| 6| weiba| +---+------+---+--------+
測試代碼以下:
package cn.xpleaf.bigdata.spark.java.sql.p1; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Column; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.util.Arrays; import java.util.List; /** * SparkRDD與DataFrame之間的轉換操做 * 1.經過反射的方式,將RDD轉換爲DataFrame * 2.經過動態編程的方式將RDD轉換爲DataFrame * 這裏演示的是第1種 */ public class _01SparkRDD2DataFrame { public static void main(String[] args) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName(_01SparkRDD2DataFrame.class.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{Person.class}); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); List<Person> persons = Arrays.asList( new Person(1, "孫人才", 25, 179), new Person(2, "劉銀鵬", 22, 176), new Person(3, "郭少波", 27, 178), new Person(1, "齊彥鵬", 24, 175) ); DataFrame df = sqlContext.createDataFrame(persons, Person.class); // 構造方法有多個,使用personsRDD的方法也是能夠的 // where age > 23 and height > 176 df.select(new Column("id"), new Column("name"), new Column("age"), new Column("height")) .where(new Column("age").gt(23).and(new Column("height").lt(179))) .show(); df.registerTempTable("person"); sqlContext.sql("select * from person where age > 23 and height < 179").show(); jsc.close(); } }
輸出結果以下:
+---+----+---+------+
| id|name|age|height| +---+----+---+------+ | 3| 郭少波| 27| 178.0| | 1| 齊彥鵬| 24| 175.0| +---+----+---+------+ +---+------+---+----+ |age|height| id|name| +---+------+---+----+ | 27| 178.0| 3| 郭少波| | 24| 175.0| 1| 齊彥鵬| +---+------+---+----+
一、經過編程接口來建立DataFrame,在Spark程序運行階段建立並保持一份最新的元數據信息,而後將此元數據信息應用到RDD上。
二、優勢在於編寫程序時根本就不知道元數據的定義和內容,只有在運行的時候纔有元數據的數據。這種方式是在動態的時候進行動態構建元數據方式。
測試代碼以下:
package cn.xpleaf.bigdata.spark.scala.sql.p1
import cn.xpleaf.bigdata.spark.java.sql.p1.Person import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * SparkRDD與DataFrame之間的轉換操做 * 1.經過反射的方式,將RDD轉換爲DataFrame * 2.經過動態編程的方式將RDD轉換爲DataFrame * 這裏演示的是第2種 */ object _03SparkRDD2DataFrame { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) // 使用kryo的序列化方式 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[Person])) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines = sc.textFile("D:/data/spark/sql/sql-rdd-source.txt") val rowRDD:RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).trim.toInt val name = fields(1).trim val age = fields(2).trim.toInt val height = fields(3).trim.toDouble Row(id, name, age, height) }) val scheme = StructType(List( StructField("id", DataTypes.IntegerType, false), StructField("name", DataTypes.StringType, false), StructField("age", DataTypes.IntegerType, false), StructField("height", DataTypes.DoubleType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("person") sqlContext.sql("select max(age) as max_age, min(age) as min_age from person").show() sc.stop() } }
輸出結果以下:
+-------+-------+ |max_age|min_age| +-------+-------+ | 18| 13| +-------+-------+
測試代碼以下:
package cn.xpleaf.bigdata.spark.java.sql.p1; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Arrays; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; public class _02SparkRDD2DataFrame { public static void main(String[] args) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName(_02SparkRDD2DataFrame.class.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{Person.class}); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); List<Person> persons = Arrays.asList( new Person(1, "孫人才", 25, 179), new Person(2, "劉銀鵬", 22, 176), new Person(3, "郭少波", 27, 178), new Person(1, "齊彥鵬", 24, 175) ); Stream<Row> rowStream = persons.stream().map(new Function<Person, Row>() { @Override public Row apply(Person person) { return RowFactory.create(person.getId(), person.getName(), person.getAge(), person.getHeight()); } }); List<Row> rows = rowStream.collect(Collectors.toList()); StructType schema = new StructType(new StructField[]{ new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), new StructField("name", DataTypes.StringType, false, Metadata.empty()), new StructField("age", DataTypes.IntegerType, false, Metadata.empty()), new StructField("height", DataTypes.DoubleType, false, Metadata.empty()) }); DataFrame df = sqlContext.createDataFrame(rows, schema); df.registerTempTable("person"); sqlContext.sql("select * from person where age > 23 and height < 179").show(); jsc.close(); } }
輸出結果以下:
+---+----+---+------+
| id|name|age|height| +---+----+---+------+ | 3| 郭少波| 27| 178.0| | 1| 齊彥鵬| 24| 175.0| +---+----+---+------+
Spark SQL 能夠將數據緩存到內存中,咱們能夠見到的經過調用cache table tableName便可將一張表緩存到內存中,來極大的提升查詢效率。
sqlContext.cacheTable(tableName)
這就涉及到內存中的數據的存儲形式,咱們知道基於關係型的數據能夠存儲爲基於行存儲結構或者基於列存儲結構,或者基於行和列的混合存儲,即Row Based Storage、Column Based Storage、 PAX Storage。
Spark SQL 的內存數據是如何組織的?
Spark SQL 將數據加載到內存是以列的存儲結構。稱爲In-Memory Columnar Storage。
若直接存儲Java Object 會產生很大的內存開銷,而且這樣是基於Row的存儲結構。查詢某些列速度略慢,雖然數據以及載入內存,查詢效率仍是低於面向列的存儲結構。
內存開銷大,且容易FULL GC,按列查詢比較慢。
內存開銷小,按列查詢速度較快。
Spark SQL的In-Memory Columnar Storage是位於spark列下面org.apache.spark.sql.columnar包內:
核心的類有 ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.
若是列有壓縮的狀況:compression包下面有具體的build列和access列的類。
對於某些工做負載,能夠在經過在內存中緩存數據或者打開一些實驗選項來提升性能。
在內存中緩存數據
Spark SQL能夠經過調用sqlContext.cacheTable("tableName")方法來緩存使用柱狀格式的表。而後,Spark將會僅僅瀏覽須要的列而且自動地壓縮數據以減小內存的使用以及垃圾回收的 壓力。你能夠經過調用sqlContext.uncacheTable("tableName")方法在內存中刪除表。
注意,若是你調用schemaRDD.cache()而不是sqlContext.cacheTable(...),表將不會用柱狀格式來緩存。在這種狀況下,sqlContext.cacheTable(...)是強烈推薦的用法。
能夠在SQLContext上使用setConf方法或者在用SQL時運行SET key=value命令來配置內存緩存。
一、collect 和 collectAsList 將df中的數據轉化成Array和List
二、count 統計df中的總記錄數
三、first 獲取df中的第一條記錄,數據類型爲Row
四、head 獲取df的前幾條記錄
五、show 六、take 獲取df中的前幾條記錄 七、cache 對df進行緩存 八、columns 顯示全部的列的schema列名,類型爲Array[String] 九、dtypes 顯示全部的列的schema信息,類型爲Array[(String, String)] 十、explain 顯示當前df的執行計劃 十一、isLocal 當前spark sql的執行是否爲本地,true爲真,false爲非本地 十二、printSchema 1三、registerTempTable 1四、schema 1五、toDF :備註:toDF帶有參數時,參數個數必須和調用這DataFrame的列個數據是同樣的 相似於sql中的:toDF:insert into t select * from t1; 1六、intersect:返回兩個DataFrame相同的Rows
原文連接:http://blog.51cto.com/xpleaf/2114298