spark SQL是一個spark結構數據處理模型。不像基本的rdd api,Spark 提供的接口能夠給spark提供更多更多關於數據的結構和正在執行的計算的信息。另外,spark sql在性能優化上比以往的有作改善。目前有更多的方式和spark sql交互:sql,dataset api。不管你是用哪一種api/語言,計算時最終使用相同的sql引擎。html
Spark Sql的一個做用是執行sql查詢。spark sql可以用來從現有的hive中讀取數據。更多關於如何配置使用的問題,請參考Hibe 表章節。當使用其餘編程語言運行sql,結果也將返回Dataset/DataFrame。你能經過使用命令行或者jdbc/odbc和sql接口交互。java
一個dataset是分佈式數據集合。數據集合是一個spark1.6開始提供的接口,想要把rdd的優點(強相似,強大的lambda函數功能)和spark sql的優化執行的優點結合到一塊兒來。dataset可以從jvm對象中建立而後使用transformations(map,flagmap,filter等等)進行轉換。mysql
全部spark功能的入口點是SparkSession類,建立一個基本的sparksession,只要使用一句語句就話哦啊sql
SparkSession.builder()
:shell
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
更完整的代碼能夠在spark安裝包數據庫
examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scalaapache
路徑下查看。編程
在spark2.0中SparkSession提供了內部對Hive功能的支持,包括查詢HiveSql,訪問Hive UDFs,以及從Hive表讀數據的能力。要使用這些特徵,你不須要單獨安裝一個hive。json
經過SparkSession,應用可以由現有的rdd,或者hive表,或者spark數據源建立DataFrames。舉個栗子,經過json文件建立api
val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
非強類型/泛型 數據集操做(又叫作DataFrame操做)
DataFrames提供一個專用的語言來操做結構化數據。
如上所述,在Spark2.0中,DataFrames只是包含Rows的數據集。相對「強類型轉換」這些被稱做弱類型轉換。【爛英文有時候翻譯起來本身都不知道官方文檔想表達啥。】
仍是下面的代碼好理解
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
更全的操做請移步api文檔
除了簡單的列引用和表達式,datasets也有豐富的函數庫包括字符串操做,時間計算,經常使用的數學操做等。具體請移步DataFrame Function Reference.
SparkSession sql功能使應用能夠執行一串sql查詢而且返回一個DataFrame。把sql引入了語言。
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
臨時視圖的生命週期在Session範圍內,隨着session的中止而釋放。若是你要讓全部的session的能夠訪問,那麼你就要搞個全局臨時視圖了。這個視圖綁定在系統保留的global_temp數據庫中,訪問前咱們必須經過引用gloabal_temp來訪問它。 e.g. SELECT * FROM global_temp.view1
.
// Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
datasets和rdds相似,和java序列化相反的是,他們使用特殊的編碼方式去序列化對象。
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
兩種從rdd對象轉換到dataset的方法。第一種是使用反射來反推一個rdd的對象類型。第二種是經過一個現有的rdd和你構建的schema,而且這種方式容許建立泛型,到運行時再來定義。
先建好schema對應的類型,在map過程加進去。show code
// For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
內建的dataframe函數提供經常使用的聚合函數,好比count,countDistinct,avg,max,min等等。
一個本地測試例子:
啓動shell,加入mysql驅動包 spark-shell --master spark://hadoop-master:7077 --jars /root/mysql-connector-java-5.1.47.jar 從hdfs取文本文件 val userTxt = sc.textFile("hdfs://hadoop-master:9000//12306.txt"); import org.apache.spark.sql.Row 根據----分割數據 val userRdd = userTxt.map(_.split("----")).map(parts=>Row(parts(0),parts(1),parts(2),parts(3),parts(4),parts(5),parts(6))) 表元數據準備 val schemaString="email,acount,username,sfz,password,mobile,email1" import org.apache.spark.sql.types._ val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema =StructType(fields) rdd數據轉df數據 val userDf = spark.createDataFrame(userRdd, schema) 數據寫入mysql數據庫 userDf.write.format("jdbc").option("url", "jdbc:mysql://192.168.199.204:3306/aaa").option("dbtable", "aaa").option("SaveMode","Overwrite").option("user", "root").option("password", "root").option("driver","com.mysql.jdbc.Driver").save()