Spark SQL 官方文檔-中文翻譯

Spark SQL 官方文檔-中文翻譯

Spark版本:Spark 1.5.2html


轉載請註明出處:http://www.cnblogs.com/BYRans/

java

1 概述(Overview)

Spark SQL是Spark的一個組件,用於結構化數據的計算。Spark SQL提供了一個稱爲DataFrames的編程抽象,DataFrames能夠充當分佈式SQL查詢引擎。sql

2 DataFrames

DataFrame是一個分佈式的數據集合,該數據集合以命名列的方式進行整合。DataFrame能夠理解爲關係數據庫中的一張表,也能夠理解爲R/Python中的一個data frame。DataFrames能夠經過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程當中生成的RDD等。
DataFrame的API支持4種語言:Scala、Java、Python、R。shell

2.1 入口:SQLContext(Starting Point: SQLContext)

Spark SQL程序的主入口是SQLContext類或它的子類。建立一個基本的SQLContext,你只須要SparkContext,建立代碼示例以下:數據庫

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

除了基本的SQLContext,也能夠建立HiveContext。SQLContext和HiveContext區別與聯繫爲:express

  • SQLContext如今只支持SQL語法解析器(SQL-92語法)
  • HiveContext如今支持SQL語法解析器和HiveSQL語法解析器,默認爲HiveSQL語法解析器,用戶能夠經過配置切換成SQL語法解析器,來運行HiveSQL不支持的語法。
  • 使用HiveContext可使用Hive的UDF,讀寫Hive表數據等Hive操做。SQLContext不能夠對Hive進行操做。
  • Spark SQL將來的版本會不斷豐富SQLContext的功能,作到SQLContext和HiveContext的功能容和,最終可能二者會統一成一個Context

HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,能夠在部署基本的Spark的時候就不須要Hive的依賴包,須要使用HiveContext時再把Hive的各類依賴包加進來。apache

SQL的解析器能夠經過配置spark.sql.dialect參數進行配置。在SQLContext中只能使用Spark SQL提供的」sql「解析器。在HiveContext中默認解析器爲」hiveql「,也支持」sql「解析器。編程

2.2 建立DataFrames(Creating DataFrames)

使用SQLContext,spark應用程序(Application)能夠經過RDD、Hive表、JSON格式數據等數據源建立DataFrames。下面是基於JSON文件建立DataFrame的示例:json

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();

2.3 DataFrame操做(DataFrame Operations)

DataFrames支持Scala、Java和Python的操做接口。下面是Scala和Java的幾個操做示例:api

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
  • Java
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

詳細的DataFrame API請參考 API Documentation

除了簡單列引用和表達式,DataFrames還有豐富的library,功能包括string操做、date操做、常見數學操做等。詳細內容請參考 DataFrame Function Reference

2.4 運行SQL查詢程序(Running SQL Queries Programmatically)

Spark Application可使用SQLContext的sql()方法執行SQL查詢操做,sql()方法返回的查詢結果爲DataFrame格式。代碼以下:

  • Scala
val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
  • Java
SQLContext sqlContext = ...  // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")

2.5 DataFrames與RDDs的相互轉換(Interoperating with RDDs)

Spark SQL支持兩種RDDs轉換爲DataFrames的方式:

  • 使用反射獲取RDD內的Schema
    • 當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔並且效果也很好。
  • 經過編程接口指定Schema
    • 經過Spark SQL的接口建立RDD的Schema,這種方式會讓代碼比較冗長。
    • 這種方法的好處是,在運行時才知道數據的列以及列的類型的狀況下,能夠動態生成Schema

2.5.1 使用反射獲取Schema(Inferring the Schema Using Reflection)

Spark SQL支持將JavaBean的RDD自動轉換成DataFrame。經過反射獲取Bean的基本信息,依據Bean的信息定義Schema。當前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和複雜數據類型(如:List、Array)。建立一個實現Serializable接口包含全部屬性getters和setters的類來建立一個JavaBean。經過調用createDataFrame並提供JavaBean的Class object,指定一個Schema給一個RDD。示例以下:

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

2.5.2 經過編程接口指定Schema(Programmatically Specifying the Schema)

當JavaBean不能被預先定義的時候,編程建立DataFrame分爲三步:

  • 從原來的RDD建立一個Row格式的RDD
  • 建立與RDD 中Rows結構匹配的StructType,經過該StructType建立表示RDD 的Schema
  • 經過SQLContext提供的createDataFrame方法建立DataFrame,方法參數爲RDD 的Schema

示例以下:

import org.apache.spark.api.java.function.Function;
// Import factory methods provided by DataTypes.
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
  fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
  new Function<String, Row>() {
    public Row call(String record) throws Exception {
      String[] fields = record.split(",");
      return RowFactory.create(fields[0], fields[1].trim());
    }
  });

// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

3 數據源(Data Source)

Spark SQL的DataFrame接口支持多種數據源的操做。一個DataFrame能夠進行RDDs方式的操做,也能夠被註冊爲臨時表。把DataFrame註冊爲臨時表以後,就能夠對該DataFrame執行SQL查詢。Data Sources這部分首先描述了對Spark的數據源執行加載和保存的經常使用方法,而後對內置數據源進行深刻介紹。

3.1 通常Load/Save方法

Spark SQL的默認數據源爲Parquet格式。數據源爲Parquet文件時,Spark SQL能夠方便的執行全部的操做。修改配置項spark.sql.sources.default,可修改默認數據源格式。讀取Parquet文件示例以下:

  • Scala
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • Java
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

3.1.1 手動指定選項(Manually Specifying Options)

當數據源格式不是parquet格式文件時,須要手動指定數據源的格式。數據源格式須要指定全名(例如:org.apache.spark.sql.parquet),若是數據源格式爲內置格式,則只須要指定簡稱(json,parquet,jdbc)。經過指定的數據源格式名,能夠對DataFrames進行類型轉換操做。示例以下:

  • Scala
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • Java
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

3.1.2 存儲模式(Save Modes)

能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:

SaveModes

3.1.3 持久化到表(Saving to Persistent Tables)

當使用HiveContext時,能夠經過saveAsTable方法將DataFrames存儲到表中。與registerTempTable方法不一樣的是,saveAsTable將DataFrame中的內容持久化到表中,並在HiveMetastore中存儲元數據。存儲一個DataFrame,可使用SQLContext的table方法。table先建立一個表,方法參數爲要建立的表的表名,而後將DataFrame持久化到這個表中。

默認的saveAsTable方法將建立一個「managed table」,表示數據的位置能夠經過metastore得到。當存儲數據的表被刪除時,managed table也將自動刪除。

3.2 Parquet文件

Parquet是一種支持多種數據處理系統的柱狀的數據格式,Parquet文件中保留了原始數據的模式。Spark SQL提供了Parquet文件的讀寫功能。

3.2.1 讀取Parquet文件(Loading Data Programmatically)

讀取Parquet文件示例以下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • Java
// sqlContext from the previous example is used in this example.

DataFrame schemaPeople = ... // The DataFrame from the previous example.

// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

3.2.2 解析分區信息(Partition Discovery)

對錶進行分區是對數據進行優化的方式之一。在分區的表內,數據經過分區列將數據存儲在不一樣的目錄下。Parquet數據源如今可以自動發現並解析分區信息。例如,對人口數據進行分區存儲,分區列爲gender和country,使用下面的目錄結構:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

經過傳遞path/to/table給 SQLContext.read.parquet或SQLContext.read.load,Spark SQL將自動解析分區信息。返回的DataFrame的Schema以下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

須要注意的是,數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數爲:spark.sql.sources.partitionColumnTypeInference.enabled,默認值爲true。若是想關閉該功能,直接將該參數設置爲disabled。此時,分區列數據格式將被默認設置爲string類型,再也不進行類型解析。

3.2.3 Schema合併(Schema Merging)

像ProtocolBuffer、Avro和Thrift那樣,Parquet也支持Schema evolution(Schema演變)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。如今Parquet數據源能自動檢測這種狀況,併合並這些文件的schemas。

由於Schema合併是一個高消耗的操做,在大多數狀況下並不須要,因此Spark SQL從1.5.0開始默認關閉了該功能。能夠經過下面兩種方式開啓該功能:

  • 當數據源爲Parquet文件時,將數據源選項mergeSchema設置爲true
  • 設置全局SQL選項spark.sql.parquet.mergeSchema爲true

示例以下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

3.2.4 Hive metastore Parquet錶轉換(Hive metastore Parquet table conversion)

當向Hive metastore中讀寫Parquet表時,Spark SQL將使用Spark SQL自帶的Parquet SerDe(SerDe:Serialize/Deserilize的簡稱,目的是用於序列化和反序列化),而不是用Hive的SerDe,Spark SQL自帶的SerDe擁有更好的性能。這個優化的配置參數爲spark.sql.hive.convertMetastoreParquet,默認值爲開啓。

3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)

從表Schema處理的角度對比Hive和Parquet,有兩個區別:

  • Hive區分大小寫,Parquet不區分大小寫
  • hive容許全部的列爲空,而Parquet不容許全部的列全爲空

因爲這兩個區別,當將Hive metastore Parquet錶轉換爲Spark SQL Parquet表時,須要將Hive metastore schema和Parquet schema進行一致化。一致化規則以下:

  • 這兩個schema中的同名字段必須具備相同的數據類型。一致化後的字段必須爲Parquet的字段類型。這個規則同時也解決了空值的問題。
  • 一致化後的schema只包含Hive metastore中出現的字段。
    • 忽略只出如今Parquet schema中的字段
    • 只在Hive metastore schema中出現的字段設爲nullable字段,並加到一致化後的schema中

3.2.4.2 元數據刷新(Metadata Refreshing)

Spark SQL緩存了Parquet元數據以達到良好的性能。當Hive metastore Parquet錶轉換爲enabled時,表修改後緩存的元數據並不能刷新。因此,當表被Hive或其它工具修改時,則必須手動刷新元數據,以保證元數據的一致性。示例以下:

  • Scala
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
  • Java
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

3.2.5 配置(Configuration)

配置Parquet可使用SQLContext的setConf方法或使用SQL執行SET key=value命令。詳細參數說明以下:

Configuration

3.3 JSON數據集

Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換爲DataFrame。

須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:

  • Scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
  • Java
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method.
people.printSchema();
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
  "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

3.4 Hive表

Spark SQL支持對Hive的讀寫操做。須要注意的是,Hive所依賴的包,沒有包含在Spark assembly包中。增長Hive時,須要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。這兩個配置將build一個新的assembly包,這個assembly包含了Hive的依賴包。注意,必須上這個心的assembly包到全部的worker節點上。由於worker節點在訪問Hive中數據時,會調用Hive的 serialization and deserialization libraries(SerDes),此時將用到Hive的依賴包。

Hive的配置文件爲conf/目錄下的hive-site.xml文件。在YARN上執行查詢命令以前,lib_managed/jars目錄下的datanucleus包和conf/目錄下的hive-site.xml必須能夠被driverhe和全部的executors所訪問。確保被訪問,最方便的方式就是在spark-submit命令中經過--jars選項和--file選項指定。

操做Hive時,必須建立一個HiveContext對象,HiveContext繼承了SQLContext,並增長了對MetaStore和HiveQL的支持。除了sql方法,HiveContext還提供了一個hql方法,hql方法能夠執行HiveQL語法的查詢語句。示例以下:

  • Scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
  • Java
// sc is an existing JavaSparkContext.
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc);

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL.
Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

3.4.1 訪問不一樣版本的Hive Metastore(Interacting with Different Versions of Hive Metastore)

Spark SQL常常須要訪問Hive metastore,Spark SQL能夠經過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。注意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操做(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。版本配置項見下面表格:

ShiveMetastore

3.5 JDBC To Other Databases

Spark SQL支持使用JDBC訪問其餘數據庫。當時用JDBC訪問其它數據庫時,最好使用JdbcRDD。使用JdbcRDD時,Spark SQL操做返回的DataFrame會很方便,也會很方便的添加其餘數據源數據。JDBC數據源由於不須要用戶提供ClassTag,因此很適合使用Java或Python進行操做。
使用JDBC訪問數據源,須要在spark classpath添加JDBC driver配置。例如,從Spark Shell鏈接postgres的配置爲:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

遠程數據庫的表,可用DataFrame或Spark SQL臨時表的方式調用數據源API。支持的參數有:

option

代碼示例以下:

  • Scala
val jdbcDF = sqlContext.read.format("jdbc").options( 
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
  • Java
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();

3.6 故障排除(Troubleshooting)

  • 在客戶端session和全部的executors上,JDBC driver必須對啓動類加載器(primordial class loader)設置爲visible。由於當建立一個connection時,Java的DriverManager類會執行安全驗證,安全驗證將忽略全部對啓動類加載器爲非visible的driver。一個很方便的解決方法是,修改全部worker節點上的compute_classpath.sh腳本,將driver JARs添加至腳本。
  • 有些數據庫(例:H2)將全部的名字轉換爲大寫,因此在這些數據庫中,Spark SQL也須要將名字所有大寫。

4 性能調優

4.1 緩存數據至內存(Caching Data In Memory)

Spark SQL能夠經過調用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),將表用一種柱狀格式( an in­memory columnar format)緩存至內存中。而後Spark SQL在執行查詢任務時,只需掃描必需的列,從而以減小掃描數據量、提升性能。經過緩存數據,Spark SQL還能夠自動調節壓縮,從而達到最小化內存使用率和下降GC壓力的目的。調用sqlContext.uncacheTable("tableName")可將緩存的數據移出內存。

可經過兩種配置方式開啓緩存數據功能:

  • 使用SQLContext的setConf方法
  • 執行SQL命令 SET key=value

Cache-In-Memory

4.2 調優參數(Other Configuration Options)

能夠經過配置下表中的參數調節Spark SQL的性能。在後續的Spark版本中將逐漸加強自動調優功能,下表中的參數在後續的版本中或許將再也不須要配置。

optionsTunningPfms

5 分佈式SQL引擎

使用Spark SQL的JDBC/ODBC或者CLI,能夠將Spark SQL做爲一個分佈式查詢引擎。終端用戶或應用不須要編寫額外的代碼,能夠直接使用Spark SQL執行SQL查詢。

5.1 運行Thrift JDBC/ODBC服務

這裏運行的Thrift JDBC/ODBC服務與Hive 1.2.1中的HiveServer2一致。能夠在Spark目錄下執行以下命令來啓動JDBC/ODBC服務:

./sbin/start-thriftserver.sh

這個命令接收全部 bin/spark-submit 命令行參數,添加一個 --hiveconf 參數來指定Hive的屬性。詳細的參數說明請執行命令 ./sbin/start-thriftserver.sh --help
服務默認監聽端口爲localhost:10000。有兩種方式修改默認監聽端口:

  • 修改環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...
  • 修改系統屬性
./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

使用 beeline 來測試Thrift JDBC/ODBC服務:

./bin/beeline

鏈接到Thrift JDBC/ODBC服務

beeline> !connect jdbc:hive2://localhost:10000

在非安全模式下,只須要輸入機器上的一個用戶名便可,無需密碼。在安全模式下,beeline會要求輸入用戶名和密碼。安全模式下的詳細要求,請閱讀beeline documentation的說明。

配置Hive須要替換 conf/ 目錄下的 hive-site.xml

Thrift JDBC服務也支持經過HTTP傳輸發送thrift RPC messages。開啓HTTP模式須要將下面的配參數配置到系統屬性或 conf/: 下的 hive-site.xml

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

測試http模式,可使用beeline連接JDBC/ODBC服務:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

5.2 運行Spark SQL CLI

Spark SQL CLI能夠很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。須要注意的是,Spark SQL CLI不能與Thrift JDBC服務交互。
在Spark目錄下執行以下命令啓動Spark SQL CLI:

./bin/spark-sql

配置Hive須要替換 conf/ 下的 hive-site.xml 。執行 ./bin/spark-sql --help 可查看詳細的參數說明 。

6 Migration Guide

6.1 與Hive的兼容(Compatibility with Apache Hive)

Spark SQL與Hive Metastore、SerDes、UDFs相兼容。Spark SQL兼容Hive Metastore從0.12到1.2.1的全部版本。Spark SQL也與Hive SerDes和UDFs相兼容,當前SerDes和UDFs是基於Hive 1.2.1。

6.1.1 在Hive warehouse中部署Spark SQL

Spark SQL Thrift JDBC服務與Hive相兼容,在已存在的Hive上部署Spark SQL Thrift服務不須要對已存在的Hive Metastore作任何修改,也不須要對數據作任何改動。

6.1.2 Spark SQL支持的Hive特性

Spark SQL支持多部分的Hive特性,例如:

  • Hive查詢語句,包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部Hive運算符,包括
    • 比較操做符(=, ⇔, ==, <>, <, >, >=, <=, etc)
    • 算術運算符(+, -, *, /, %, etc)
    • 邏輯運算符(AND, &&, OR, ||, etc)
    • 複雜類型構造器
    • 數學函數(sign,ln,cos,etc)
    • 字符串函數(instr,length,printf,etc)
  • 用戶自定義函數(UDF)
  • 用戶自定義聚合函數(UDAF)
  • 用戶自定義序列化格式器(SerDes)
  • 窗口函數
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 子查詢
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • 表分區,包括動態分區插入
  • 視圖
  • 全部的Hive DDL函數,包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的Hive數據類型,包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

6.1.3 不支持的Hive功能

下面是當前不支持的Hive特性,其中大部分特性在實際的Hive使用中不多用到。

Major Hive Features

  • Tables with buckets:bucket是在一個Hive表分區內進行hash分區。Spark SQL當前不支持。

Esoteric Hive Features

  • UNION type
  • Unique join
  • Column statistics collecting:當期Spark SQL不智齒列信息統計,只支持填充Hive Metastore的sizeInBytes列。

Hive Input/Output Formats

  • File format for CLI: 這個功能用於在CLI顯示返回結果,Spark SQL只支持TextOutputFormat
  • Hadoop archive

Hive優化
部分Hive優化尚未添加到Spark中。沒有添加的Hive優化(好比索引)對Spark SQL這種in-memory計算模型來講不是特別重要。下列Hive優化將在後續Spark SQL版本中慢慢添加。

  • 塊級別位圖索引和虛擬列(用於創建索引)
  • 自動檢測joins和groupbys的reducer數量:當前Spark SQL中須要使用「 SET spark.sql.shuffle.partitions=[num_tasks]; 」控制post-shuffle的並行度,不能自動檢測。
  • 僅元數據查詢:對於能夠經過僅使用元數據就能完成的查詢,當前Spark SQL仍是須要啓動任務來計算結果。
  • 數據傾斜標記:當前Spark SQL不遵循Hive中的數據傾斜標記
  • jion中STREAMTABLE提示:當前Spark SQL不遵循STREAMTABLE提示
  • 查詢結果爲多個小文件時合併小文件:若是查詢結果包含多個小文件,Hive能合併小文件爲幾個大文件,避免HDFS metadata溢出。當前Spark SQL不支持這個功能。

7 Reference

7.1 Data Types

Spark SQL和DataFrames支持的數據格式以下:

  • 數值類型
    • ByteType: 表明1字節有符號整數. 數值範圍: -128 到 127.
    • ShortType: 表明2字節有符號整數. 數值範圍: -32768 到 32767.
    • IntegerType: 表明4字節有符號整數. 數值範圍: -2147483648 t到 2147483647.
    • LongType: 表明8字節有符號整數. 數值範圍: -9223372036854775808 到 9223372036854775807.
    • FloatType: 表明4字節單精度浮點數。
    • DoubleType: 表明8字節雙精度浮點數。
    • DecimalType: 表示任意精度的有符號十進制數。內部使用java.math.BigDecimal.A實現。
    • BigDecimal由一個任意精度的整數非標度值和一個32位的整數組成。
  • String類型
    • StringType: 表示字符串值。
  • Binary類型
    • BinaryType: 表明字節序列值。
  • Boolean類型
    • BooleanType: 表明布爾值。
  • Datetime類型
    • TimestampType: 表明包含的年、月、日、時、分和秒的時間值
    • DateType: 表明包含的年、月、日的日期值
  • 複雜類型
    • ArrayType(elementType, containsNull): 表明包含一系列類型爲elementType的元素。若是在一個將ArrayType值的元素能夠爲空值,containsNull指示是否容許爲空。
    • MapType(keyType, valueType, valueContainsNull): 表明一系列鍵值對的集合。key不容許爲空,valueContainsNull指示value是否容許爲空
    • StructType(fields): 表明帶有一個StructFields(列)描述結構數據。
      • StructField(name, dataType, nullable): 表示StructType中的一個字段。name表示列名、dataType表示數據類型、nullable指示是否容許爲空。

Spark SQL全部的數據類型在 org.apache.spark.sql.types 包內。不一樣語言訪問或建立數據類型方法不同:

  • Scala
    代碼中添加 import org.apache.spark.sql.types._,再進行數據類型訪問或建立操做。

scalaAccessDataTypes

  • Java
    可使用 org.apache.spark.sql.types.DataTypes 中的工廠方法,以下表:

javaAccessDataTypes

7.2 NaN 語義

當處理float或double類型時,若是類型不符合標準的浮點語義,則使用專門的處理方式NaN。須要注意的是:

  • NaN = NaN 返回 true
  • 能夠對NaN值進行聚合操做
  • 在join操做中,key爲NaN時,NaN值與普通的數值處理邏輯相同
  • NaN值大於全部的數值型數據,在升序排序中排在最後
相關文章
相關標籤/搜索