spark2.2官方教程筆記-Spark SQL, DataFrames and Datasets嚮導

歸納

spark SQL是一個spark結構數據處理模型。不像基本的rdd api,Spark 提供的接口能夠給spark提供更多更多關於數據的結構和正在執行的計算的信息。另外,spark sql在性能優化上比以往的有作改善。目前有更多的方式和spark sql交互:sql,dataset api。不管你是用哪一種api/語言,計算時最終使用相同的sql引擎。html

SQL

  Spark Sql的一個做用是執行sql查詢。spark sql可以用來從現有的hive中讀取數據。更多關於如何配置使用的問題,請參考Hibe 表章節。當使用其餘編程語言運行sql,結果也將返回Dataset/DataFrame。你能經過使用命令行或者jdbc/odbc和sql接口交互。java

Datasets and DataFrames

一個dataset是分佈式數據集合。數據集合是一個spark1.6開始提供的接口,想要把rdd的優點(強相似,強大的lambda函數功能)和spark sql的優化執行的優點結合到一塊兒來。dataset可以從jvm對象中建立而後使用transformations(map,flagmap,filter等等)進行轉換。mysql

開始

SparkSession

全部spark功能的入口點是SparkSession類,建立一個基本的sparksession,只要使用一句語句就話哦啊sql

SparkSession.builder():shell

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

更完整的代碼能夠在spark安裝包數據庫

examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scalaapache

路徑下查看。編程

在spark2.0中SparkSession提供了內部對Hive功能的支持,包括查詢HiveSql,訪問Hive UDFs,以及從Hive表讀數據的能力。要使用這些特徵,你不須要單獨安裝一個hive。json

建立DataFrames

經過SparkSession,應用可以由現有的rdd,或者hive表,或者spark數據源建立DataFrames。舉個栗子,經過json文件建立api

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

非強類型/泛型 數據集操做(又叫作DataFrame操做)

DataFrames提供一個專用的語言來操做結構化數據。

如上所述,在Spark2.0中,DataFrames只是包含Rows的數據集。相對「強類型轉換」這些被稱做弱類型轉換。【爛英文有時候翻譯起來本身都不知道官方文檔想表達啥。】

仍是下面的代碼好理解

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

更全的操做請移步api文檔

除了簡單的列引用和表達式,datasets也有豐富的函數庫包括字符串操做,時間計算,經常使用的數學操做等。具體請移步DataFrame Function Reference.

執行Sql查詢

SparkSession sql功能使應用能夠執行一串sql查詢而且返回一個DataFrame。把sql引入了語言。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

全局臨時視圖

臨時視圖的生命週期在Session範圍內,隨着session的中止而釋放。若是你要讓全部的session的能夠訪問,那麼你就要搞個全局臨時視圖了。這個視圖綁定在系統保留的global_temp數據庫中,訪問前咱們必須經過引用gloabal_temp來訪問它。 e.g. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

建立Datasets

 datasets和rdds相似,和java序列化相反的是,他們使用特殊的編碼方式去序列化對象。

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

和rdd對象交互

兩種從rdd對象轉換到dataset的方法。第一種是使用反射來反推一個rdd的對象類型。第二種是經過一個現有的rdd和你構建的schema,而且這種方式容許建立泛型,到運行時再來定義。

反射推理schema

先建好schema對應的類型,在map過程加進去。show code

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

編程式定義schema

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

聚合

內建的dataframe函數提供經常使用的聚合函數,好比count,countDistinct,avg,max,min等等。

 

一個本地測試例子:

啓動shell,加入mysql驅動包
spark-shell --master spark://hadoop-master:7077 --jars /root/mysql-connector-java-5.1.47.jar
從hdfs取文本文件
val userTxt = sc.textFile("hdfs://hadoop-master:9000//12306.txt");
import org.apache.spark.sql.Row
根據----分割數據
val userRdd = userTxt.map(_.split("----")).map(parts=>Row(parts(0),parts(1),parts(2),parts(3),parts(4),parts(5),parts(6)))
表元數據準備
val schemaString="email,acount,username,sfz,password,mobile,email1"
import org.apache.spark.sql.types._
val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema =StructType(fields)
rdd數據轉df數據
val userDf = spark.createDataFrame(userRdd, schema)
數據寫入mysql數據庫
userDf.write.format("jdbc").option("url", "jdbc:mysql://192.168.199.204:3306/aaa").option("dbtable", "aaa").option("SaveMode","Overwrite").option("user", "root").option("password", "root").option("driver","com.mysql.jdbc.Driver").save()
相關文章
相關標籤/搜索