一,Spark SQL概述node
1.1 什麼是Spark SQLsql
1.2 爲何學Spark SQLshell
二,DataFrames數據庫
2.1 什麼是DataFramesapache
2.2 建立DataFrames編程
3.1 DSL風格語法maven
3.2 SQL風格語法分佈式
4.1 前期準備
4.2 經過反射推斷Schema
4.3 經過StructType直接指定Schema
4.4 操做DataFrameAPI的形式進行數據操做
4.5 Spark2.X的sql實現方式
4.6 SparkDataSet
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。
咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!
1.易整合
2.統一的數據訪問方式
3.兼容Hive
4.標準的數據鏈接
與RDD相似,DataFrame也是一個分佈式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據之外,還記錄數據的結構信息,即schema。同時,與Hive相似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關係操做,比函數式的RDD API要更加友好,門檻更低。因爲與R和Pandas的DataFrame相似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。
·
//1.在本地建立一個文件,有三列,分別是id、name、age,用空格分隔,而後上傳到hdfs上 hdfs dfs -put person.txt / //2.在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割 val lineRDD = sc.textFile("hdfs://node1.xiaoniu.com:9000/person.txt").map(_.split(" ")) //3.定義case class(至關於表的schema) case class Person(id:Int, name:String, age:Int) //4.將RDD和case class關聯 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //5.將RDD轉換成DataFrame val personDF = personRDD.toDF //6.對DataFrame進行處理 personDF.show
//查看DataFrame中的內容 personDF.show //查看DataFrame部分列中的內容 personDF.select(personDF.col("name")).show personDF.select(col("name"), col("age")).show personDF.select("name").show //打印DataFrame的Schema信息 personDF.printSchema //查詢全部的name和age,並將age+1 personDF.select(col("id"), col("name"), col("age") + 1).show personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show //過濾age大於等於18的 personDF.filter(col("age") >= 18).show //按年齡進行分組並統計相同年齡的人數 personDF.groupBy("age").count().show()
//若是想使用SQL風格的語法,須要將DataFrame註冊成表 personDF.registerTempTable("t_person") //查詢年齡最大的前兩名 sqlContext.sql("select * from t_person order by age desc limit 2").show //顯示錶的Schema信息 sqlContext.sql("desc t_person").show
前面咱們學習瞭如何在Spark Shell中使用SQL完成查詢,如今咱們來實如今自定義的程序中編寫Spark SQL查詢程序。首先在maven項目的pom.xml中添加Spark SQL的依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency>
建立一個object以下:
package cn.edu360.spark06 import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object spark1XDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("spark1XDemo1").setMaster("local[2]") val sc = new SparkContext(conf) //將SparkContext包裝進而加強 val sQLContext = new SQLContext(sc) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkSQL") val boyRDD: RDD[Boy] = lines.map(line => { val fields: Array[String] = line.split(",") val id: Long = fields(0).toLong val name: String = fields(1) val age: Int = fields(2).toInt val fv: Double = fields(3).toDouble Boy(id, name, age, fv) } ) //該RDD裝的是Boy類型的數據,有了shcma信息,可是仍是一個RDD //將RDD轉換成DataFrame //導入隱式轉換 import sQLContext.implicits._ val bdf: DataFrame = boyRDD.toDF //變成DF後就可使用兩種API進行編程了 //把DataFrame先註冊臨時表 bdf.registerTempTable("t_boy") //書寫SQL(SQL方法應實際上是Transformation) val result: DataFrame = sQLContext.sql("select * from t_boy order by fv desc, age asc") //查看結果(觸發Action) result.show() sc.stop() } } // 若要將這些字段解析後進行結構化的描述信息 // 這裏進行表的描述 case class Boy(id: Long, name: String, age: Int, fv: Double)
建立一個object爲:
package cn.edu360.spark06 import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkConf, SparkContext} object spark1XDemo2 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("spark1XDemo2").setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkSQL/") //將數據進行整理 val rowRDD: RDD[Row] = lines.map(line => { val fields: Array[String] = line.split(",") val id: Long = fields(0).toLong val name: String = fields(1) val age: Int = fields(2).toInt val fv: Double = fields(3).toDouble // 生成ROWRDD Row(id, name, age, fv) }) //結果類型,其實就是表頭,用於描述DataFrame val scme = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", DoubleType, true) )) //將RowRDD關聯schema val bdf: DataFrame = sqlContext.createDataFrame(rowRDD, scme) bdf.registerTempTable("t_boy") val result: DataFrame = sqlContext.sql("select * from t_boy order by fv desc, age asc") result.show() sc.stop() } }
package cn.edu360.spark06 import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object spark1DataFrameOperate { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("spark1DataFrameOperate").setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkSQL/") val rowRDD: RDD[Row] = lines.map(line => { val fields: Array[String] = line.split(",") val id: Long = fields(0).toLong val name: String = fields(1) val age: Int = fields(2).toInt val fv: Double = fields(3).toDouble // 生成ROWRDD Row(id, name, age, fv) }) //結果類型,其實就是表頭,用於描述DataFrame val scme = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", DoubleType, true) )) val bdf: DataFrame = sqlContext.createDataFrame(rowRDD, scme) //不使用SQL的方式,就不用註冊臨時表了 val df1: DataFrame = bdf.select("name", "age", "fv") df1.show() // 操做數據的函數須要導入 import sqlContext.implicits._ val df2: Any = bdf.orderBy($"fv" desc, $"age" asc) } }
package cn.edu360.spark06 import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object spark2XDemo1 { def main(args: Array[String]): Unit = { //spark2.x SQL的編程API(SparkSession) //是spark2.x SQL執行的入口 val sparkSession: SparkSession = SparkSession.builder() .appName("SQLTest1") .master("local[2]") .getOrCreate() //建立RDD val lines: RDD[String] = sparkSession.sparkContext.textFile("hdfs://hd1:9000/sparkSQL") val rowRDD: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toDouble Row(id, name, age, fv) }) val scm = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", DoubleType, true) )) val df: DataFrame = sparkSession.createDataFrame(rowRDD, scm) df.registerTempTable("t_boy") val result: DataFrame = sparkSession.sql("select * from t_boy") val result2: DataFrame = df.select("name", "age", "fv") import sparkSession.implicits._ val result3: Dataset[Row] = df.orderBy($"fv" desc, $"age" asc) result.show() result2.show() result3.show() sparkSession.stop() } }
Dataset分佈式數據集,是對RDD的進一步封裝,是更加智能的RDD, 在這進行wordcount實例:
package cn.edu360.spark06 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object sparkDateSet1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("sparkDateSet1") .master("local[2]") .getOrCreate() val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/") import spark.implicits._ val words: Dataset[String] = lines.flatMap(_.split(" ")) // 經過操做Dataset進行數據操做 val result: DataFrame = words.select("value") result.show() // 註冊視圖操做SQL形式 words.createTempView("v_wc") val result2: DataFrame = spark.sql("select value, count(*) from v_wc group by value") result2.show() spark.stop() } }