spark SQL是一個spark結構數據處理模型。不像基本的rdd api,Spark 提供的接口能夠給spark提供更多更多關於數據的結構和正在執行的計算的信息。另外,spark sql在性能優化上比以往的有作改善。目前有更多的方式和spark sql交互:sql,dataset api。不管你是用哪一種api/語言,計算時最終使用相同的sql引擎。html
Spark Sql的一個做用是執行sql查詢。spark sql可以用來從現有的hive中讀取數據。更多關於如何配置使用的問題,請參考Hibe 表章節。當使用其餘編程語言運行sql,結果也將返回Dataset/DataFrame。你能經過使用命令行或者jdbc/odbc和sql接口交互。java
一個dataset是分佈式數據集合。數據集合是一個spark1.6開始提供的接口,想要把rdd的優點(強相似,強大的lambda函數功能)和spark sql的優化執行的優點結合到一塊兒來。dataset可以從jvm對象中建立而後使用transformations(map,flagmap,filter等等)進行轉換。mysql
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
在spark2.0中SparkSession提供了內部對Hive功能的支持,包括查詢HiveSql,訪問Hive UDFs,以及從Hive表讀數據的能力。要使用這些特徵,你不須要單獨安裝一個hive。json
val df ="examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
非強類型/泛型 數據集操做(又叫作DataFrame操做)
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column"name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1$"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
除了簡單的列引用和表達式,datasets也有豐富的函數庫包括字符串操做,時間計算,經常使用的數學操做等。具體請移步DataFrame Function Reference.
SparkSession sql功能使應用能夠執行一串sql查詢而且返回一個DataFrame。把sql引入了語言。
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
臨時視圖的生命週期在Session範圍內,隨着session的中止而釋放。若是你要讓全部的session的能夠訪問,那麼你就要搞個全局臨時視圖了。這個視圖綁定在系統保留的global_temp數據庫中,訪問前咱們必須經過引用gloabal_temp來訪問它。 e.g. SELECT * FROM global_temp.view1
// Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS =[Person] // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
先建好schema對應的類型,在map過程加進去。show code
// For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
啓動shell,加入mysql驅動包 spark-shell --master spark://hadoop-master:7077 --jars /root/mysql-connector-java-5.1.47.jar 從hdfs取文本文件 val userTxt = sc.textFile("hdfs://hadoop-master:9000//12306.txt"); import org.apache.spark.sql.Row 根據----分割數據 val userRdd ="----")).map(parts=>Row(parts(0),parts(1),parts(2),parts(3),parts(4),parts(5),parts(6))) 表元數據準備 val schemaString="email,acount,username,sfz,password,mobile,email1" import org.apache.spark.sql.types._ val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema =StructType(fields) rdd數據轉df數據 val userDf = spark.createDataFrame(userRdd, schema) 數據寫入mysql數據庫 userDf.write.format("jdbc").option("url", "jdbc:mysql://").option("dbtable", "aaa").option("SaveMode","Overwrite").option("user", "root").option("password", "root").option("driver","com.mysql.jdbc.Driver").save()