1、SparkSQL介紹sql
1、概述: sparkSQL是spark用來處理結構化數據的一個模塊。 sparkSQL提供了一個編程的抽象叫作DataFrame而且做爲咱們分佈式SQL的查詢引擎 2、做用:用來處理結構化數據,先將非結構化的數據轉成結構化數據。 3、SparkSQL提供了兩種編程模型: 1)SQL的方式 select * from user; 2)DataFrame方式(DSL) HQL:將SQL轉換爲mr任務 SparkSQL:將SQL轉換爲RDD,效率快 4、特色: 1)容易整合 spark 2)統一數據的訪問方式 3)標準的數據鏈接 支持JDBC/ODBC,能夠對接BI工具 4)兼容HIVE
2、DataFrame介紹數據庫
與RDD相似,DataFrame也是一個分佈式數據容器。
SparkSQL屬於SQL解析引擎。在spark,將SQL解析RDD。注意:這個RDD比較特殊,是帶有schema信息的RDD。
這個RDD就叫DataFrame。
DataFrame像數據庫的二維表格(有行有列表描述),它除了數據以外還記錄了數據的結構信息(schema)。
與RDD區別:
DataFrame:存放告終構化數據的描述信息
RDD:存儲文本數據、二進制、音頻、視頻...
3、SQL風格apache
一、SqlTest1編程
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * spark2.x * SQL風格 */ object SqlTest1 { def main(args: Array[String]): Unit = { //1.構建SparkSession val sparkSession = SparkSession.builder().appName("SqlTest1") .master("local[2]") .getOrCreate() //2.建立RDD val dataRdd: RDD[String] = sparkSession.sparkContext .textFile("hdfs://192.168.146.111:9000/user.txt") //3.切分數據 val splitRdd: RDD[Array[String]] = dataRdd.map(_.split("\t")) //4.封裝數據 val rowRdd = splitRdd.map(x => { val id = x(0).toInt val name = x(1).toString val age = x(2).toInt //封裝一行數據 Row(id, name, age) }) //5.建立schema(描述DataFrame信息) sql=表 val schema: StructType = StructType(List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) )) //6.建立DataFrame val userDF: DataFrame = sparkSession.createDataFrame(rowRdd, schema) //7.註冊表 userDF.registerTempTable("user_t") //8.寫sql val uSql: DataFrame = sparkSession.sql("select * from user_t order by age") //9.查看結果 show databases; uSql.show() //10.釋放資源 sparkSession.stop() } }
二、user.txtapp
1 zhangsan 18 2 lisi 23 3 tom 26 4 mary 16 5 zhangsanfeng 128
三、結果分佈式
4、toDF使用工具
scala> val rdd = sc.textFile("hdfs://192.168.146.111:9000/user.txt").map(_.split("\t")) rdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24 scala> case class User(id:Int,name:String,age:Int) defined class User scala> val userRdd = rdd.map(x => User(x(0).toInt,x(1),x(2).toInt)) userRdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[4] at map at <console>:28 scala> val udf = userRdd.toDF udf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> udf.show() +---+------------+---+ | id| name|age| +---+------------+---+ | 1| zhangsan| 18| | 2| lisi| 23| | 3| tom| 26| | 4| mary| 16| | 5|zhangsanfeng|128| +---+------------+---+ scala> udf.select("name","age").show() +------------+---+ | name|age| +------------+---+ | zhangsan| 18| | lisi| 23| | tom| 26| | mary| 16| |zhangsanfeng|128| +------------+---+ scala> udf.filter(col("id") <= 3).show() +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 18| | 2| lisi| 23| | 3| tom| 26| +---+--------+---+ scala> udf.filter(col("id") > 3).show() +---+------------+---+ | id| name|age| +---+------------+---+ | 4| mary| 16| | 5|zhangsanfeng|128| +---+------------+---+ scala> udf.groupBy(("name")).count.show() +------------+-----+ | name|count| +------------+-----+ |zhangsanfeng| 1| | mary| 1| | zhangsan| 1| | tom| 1| | lisi| 1| +------------+-----+ scala> udf.sort("age").show() +---+------------+---+ | id| name|age| +---+------------+---+ | 4| mary| 16| | 1| zhangsan| 18| | 2| lisi| 23| | 3| tom| 26| | 5|zhangsanfeng|128| +---+------------+---+ scala> udf.orderBy("age").show() +---+------------+---+ | id| name|age| +---+------------+---+ | 4| mary| 16| | 1| zhangsan| 18| | 2| lisi| 23| | 3| tom| 26| | 5|zhangsanfeng|128| +---+------------+---+ scala> udf.registerTempTable("user_t") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sqlContext.sql("select * from user_t").show() +---+------------+---+ | id| name|age| +---+------------+---+ | 1| zhangsan| 18| | 2| lisi| 23| | 3| tom| 26| | 4| mary| 16| | 5|zhangsanfeng|128| +---+------------+---+ scala> spark.sqlContext.sql("select name,age from user_t where age>18").show() +------------+---+ | name|age| +------------+---+ | lisi| 23| | tom| 26| |zhangsanfeng|128| +------------+---+ scala>
5、DSL風格ui
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * DSL風格 */ object SqlTest2 { def main(args: Array[String]): Unit = { //1.建立sparkSession val sparkSession: SparkSession = SparkSession.builder() .appName("SqlTest2") .master("local[2]") .getOrCreate() //2.建立rdd val dataRDD: RDD[String] = sparkSession.sparkContext .textFile("hdfs://192.168.146.111:9000/user.txt") //3.切分數據 val splitRDD: RDD[Array[String]] = dataRDD.map(_.split("\t")) val rowRDD: RDD[Row] = splitRDD.map(x => { val id = x(0).toInt val name = x(1).toString val age = x(2).toInt //Row表明一行數據 Row(id, name, age) }) val schema: StructType = StructType(List( //結構字段 StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) )) //4.rdd轉換爲dataFrame val userDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) //5.DSL風格 查詢年齡大於18 rdd dataFrame dataSet import sparkSession.implicits._ val user1DF: Dataset[Row] = userDF.where($"age" > 18) user1DF.show() //6.關閉資源 sparkSession.stop() } }
結果:spa
6、WordCountscala
一、SqlWordCount
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object SqlWordCount { def main(args: Array[String]): Unit = { //1.建立SparkSession val sparkSession: SparkSession = SparkSession.builder() .appName("SqlWordCount") .master("local[2]") .getOrCreate() //2.加載數據 使用dataSet處理數據 dataSet是一個更加智能的rdd,默認有一列叫value val datas: Dataset[String] = sparkSession.read .textFile("hdfs://192.168.146.111:9000/words.txt") //3.sparkSql 註冊表/註冊視圖 rdd.flatMap import sparkSession.implicits._ val word: Dataset[String] = datas.flatMap(_.split("\t")) //4.註冊視圖 word.createTempView("wc_t") //5.執行sql wordCount val r: DataFrame = sparkSession .sql("select value as word,count(*) sum from wc_t group by value order by sum desc") r.show() sparkSession.stop() } }
二、words.txt
hello world
hello China
hello Beijing
haha heihei
三、結果
7、Join操做
一、JoinDemo
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * SQL方式 */ object JoinDemo { def main(args: Array[String]): Unit = { //1.建立SparkSession val sparkSession: SparkSession = SparkSession.builder().appName("JoinDemo") .master("local[2]").getOrCreate() import sparkSession.implicits._ //2.直接建立dataSet val datas1: Dataset[String] = sparkSession .createDataset(List("1 Tom 18", "2 John 22", "3 Mary 16")) //3.整理數據 val dataDS1: Dataset[(Int, String, Int)] = datas1.map(x => { val fields: Array[String] = x.split(" ") val id = fields(0).toInt val name = fields(1).toString val age = fields(2).toInt //元組輸出 (id, name, age) }) val dataDF1: DataFrame = dataDS1.toDF("id", "name", "age") //2.建立第二份數據 val datas2: Dataset[String] = sparkSession .createDataset(List("18 young", "22 old")) val dataDS2: Dataset[(Int, String)] = datas2.map(x => { val fields: Array[String] = x.split(" ") val age = fields(0).toInt val desc = fields(1).toString //元組輸出 (age, desc) }) //3.轉化爲dataFrame val dataDF2: DataFrame = dataDS2.toDF("dage", "desc") //4.註冊視圖 dataDF1.createTempView("d1_t") dataDF2.createTempView("d2_t") //5.寫sql(join) val r = sparkSession.sql("select name,desc from d1_t join d2_t on age = dage") //6.觸發任務 r.show() //7.關閉資源 sparkSession.stop() } }
二、結果
三、JoinDemo1
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object JoinDemo1 { def main(args: Array[String]): Unit = { //1.建立SparkSession val sparkSession: SparkSession = SparkSession.builder() .appName("JoinDemo1") .master("local[2]").getOrCreate() import sparkSession.implicits._ //2.直接建立dataSet val datas1: Dataset[String] = sparkSession .createDataset(List("1 Tom 18", "2 John 22", "3 Mary 16")) //3.整理數據 val dataDS1: Dataset[(Int, String, Int)] = datas1.map(x => { val fields: Array[String] = x.split(" ") val id = fields(0).toInt val name = fields(1).toString val age = fields(2).toInt //元組輸出 (id, name, age) }) val dataDF1: DataFrame = dataDS1.toDF("id", "name", "age") //2.建立第二份數據 val datas2: Dataset[String] = sparkSession .createDataset(List("18 young", "22 old")) val dataDS2: Dataset[(Int, String)] = datas2.map(x => { val fields: Array[String] = x.split(" ") val age = fields(0).toInt val desc = fields(1).toString //元組輸出 (age, desc) }) //3.轉化爲dataFrame val dataDF2: DataFrame = dataDS2.toDF("dage", "desc") //默認方式 inner join //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage") //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "left") //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "right") //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "left_outer") val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "cross") r.show() //7.關閉資源 sparkSession.stop() } }
四、結果