隨着大數據時代的來臨,Hadoop風靡一時。爲了使熟悉RDBMS但又不理解MapReduce的技術人員快速進行大數據開發,Hive應運而生。Hive是當時惟一運行在Hadoop上的SQL-on-Hadoop工具。java
可是MapReduce計算過程當中大量的中間磁盤落地過程消耗了大量的I/O,下降的運行效率。爲了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產生,其中表現較爲突出的是:python
Shark是伯克利實驗室Spark生態的組件之一,它修改了Hive Driver的內存管理、物理計劃、執行三個模塊,使之能運行在Spark引擎上,從而使得SQL查詢的速度獲得10-100倍的提高。sql
Shark對於Hive的太多依賴(如採用Hive的語法解析器、查詢優化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個組件的相互集成,因此提出了SparkSQL項目。數據庫
SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優勢,如內存列存儲(In-Memory Columnar Storage)、Hive兼容性等,從新開發了SparkSQL代碼。因爲擺脫了對Hive的依賴性,SparkSQL不管在數據兼容、性能優化、組件擴展方面都獲得了極大地提高。apache
不但兼容Hive,還能夠從RDD、parquet文件、JSON文件中獲取數據,也支持獲取RDBMS數據以及cassandra等NOSQL數據。編程
除了採起In-Memory Columnar Storage、byte-code generation等優化技術外,引進Cost Model對查詢進行動態評估、獲取最佳物理計劃等。json
不管是SQL的語法解析器、分析器仍是優化器均可以從新定義,進行擴展。api
2014年Shark中止開發,團隊將全部資源放SparkSQL項目上,至此,Shark的發展畫上了句號,但也所以發展出兩條線:SparkSQL和Hive on Spark。性能優化
其中SparkSQL做爲Spark生態的一員繼續發展,而再也不受限於Hive,只是兼容Hive;而Hive on Spark是一個Hive的發展計劃,該計劃將Spark做爲Hive的底層引擎之一,也就是說,Hive將再也不受限於一個引擎,能夠採用Map-Reduce、Tez、Spark等引擎。網絡
Spark SQL是一個用於結構化數據處理的模塊。Spark SQL賦予待處理數據一些結構化信息,可使用SQL語句或DataSet API接口與Spark SQL進行交互。
Spark SQL可使用sql讀寫Hive中的數據;也能夠在編程語言中使用sql,返回Dataset/DataFrame結果集。
Dataset是一個分佈式數據集,它結合了RDD與SparkSQL執行引擎的優勢。Dataset能夠經過JVM對象構造,而後使用算子操做進行處理。Java和Scala都有Dataset API;Python和R自己支持Dataset特性。
DataFrame是一個二維結構的DataSet,至關於RDBMS中的表。DataFrame能夠有多種方式構造,好比結構化數據文件、hive表、外部數據庫、RDD等。在Scala、Java、Python及R中都有DataFrame API。
import org.apache.spark.sql.SparkSession // 構造SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // 建立DataFrame val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // DataFrame操做 // This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
import org.apache.spark.sql.SparkSession; //構造SparkSession SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); //建立DataFrame import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ //DataFrame操做 // col("...") is preferable to df.col("...") import static org.apache.spark.sql.functions.col; // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
from pyspark.sql import SparkSession # 構造SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() # 建立DataFrame # spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # DataFrame操做 # spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
Datasets和RDD相似,但使用專門的Encoder編碼器來序列化須要通過網絡傳輸的數據對象,而不用RDD使用的Java序列化或Kryo庫。Encoder編碼器是動態生成的代碼,容許直接執行各類算子操做,而不用反序列化。
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
import java.util.Arrays; import java.util.Collections; import java.io.Serializable; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // Create an instance of a Bean class Person person = new Person(); person.setName("Andy"); person.setAge(32); // Encoders are created for Java beans Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+ // Encoders for most common types are provided in class Encoders Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset<Integer> transformedDS = primitiveDS.map( (MapFunction<Integer, Integer>) value -> value + 1, integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path = "examples/src/main/resources/people.json"; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") //df.createGlobalTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people"); //df.createGlobalTempView("people") Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
# Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") # df.createGlobalTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。