大數據系列——Spark學習筆記之 Spark SQL

1. Spark SQL是什麼?

  • 處理結構化數據的一個spark的模塊
  • 它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用

2. Spark SQL的特色

  • 多語言的接口支持(java python scala)
  • 統一的數據訪問
  • 徹底兼容hive
  • 支持標準的鏈接

3. 爲何學習SparkSQL?

咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!java

4. DataFrame(數據框)

  • 與RDD相似,DataFrame也是一個分佈式數據容器
  • 然而DataFrame更像傳統數據庫的二維表格,除了數據之外,還記錄數據的結構信息,即schema
  • DataFrame其實就是帶有schema信息的RDD

5. SparkSQL1.x的API編程

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

5.1 使用sqlContext建立DataFrame(測試用)

object Ops3 {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man")))
        val df1: DataFrame = sqlContext.createDataFrame(rdd1)
        df1.show(1)
    }
}
case class Person(name: String, age: Int, sex: String);

5.2 使用sqlContxet中提供的隱式轉換函數(測試用)

import org.apache.spark
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man")))
import sqlContext.implicits._
val df1: DataFrame = rdd1.toDF
df1.show()

5.3 使用SqlContext建立DataFrame(經常使用)

val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val lineSplit: Array[String] = line.split(",")
  Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2))
})
val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema)
rowDF.show()

6. 使用新版本的2.x的API

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//數據清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
    val splits: Array[String] = line.split(",")
    Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")
val df2 = sparkSession.sql("select * from p1")
df2.show()

7. 操做SparkSQL的方式

7.1 使用SQL語句的方式對DataFrame進行操做

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API至關於Spark1.x的SQLContext
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//數據清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
    val splits: Array[String] = line.split(",")
    Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")//這是Sprk2.x新的API  至關於Spark1.x的registTempTable()
val df2 = sparkSession.sql("select * from p1")
df2.show()

7.2 使用DSL語句的方式對DataFrame進行操做

DSL(domain specific language ) 特定領域語言python

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//數據清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
    val splits: Array[String] = line.split(",")
    Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.show()

8. SparkSQL的輸出

8.1 寫出到JSON文件

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//數據清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
    val splits: Array[String] = line.split(",")
    Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")

8.2 寫出到關係型數據庫(mysql)

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//數據清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
    val splits: Array[String] = line.split(",")
    Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
val url = "jdbc:mysql://localhost:3306/test"
//表會自動建立
val tbName = "person1";
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "root")
//SaveMode  默認爲ErrorIfExists
df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)

9. 做業

9.1 大數據項目

1. 在線學習日誌分析

​ xxx.logmysql

2. 數據清洗,轉換成RDD

3. 使用sparksql來作離線計算,計算結果落地到mysql中

9.2 Web項目作數據的可視化

SpringBoot+Echarssql

相關文章
相關標籤/搜索