Spark SQL學習筆記

Spark SQL是Spark中處理結構化數據的模塊。與基礎的Spark RDD API不一樣,Spark SQL的接口提供了更多關於數據的結構信息和計算任務的運行時信息。Spark SQL現在有了三種不一樣的API:SQL語句、DataFrame API和最新的Dataset API。
Spark SQL的一種用法是直接執行SQL查詢語句,你可以使用最基本的SQL語法,也能夠選擇HiveQL語法。Spark SQL能夠從已有的Hive中讀取數據。html

DataFrame是一種分佈式數據集合,每一條數據都由幾個命名字段組成。概念上來講,她和關係型數據庫的表 或者 R和Python中的data frame等價,DataFrame能夠從不少數據源(sources)加載數據並構造獲得,如:結構化數據文件,Hive中的表,外部數據庫,或者已有的RDD。java

Dataset是Spark-1.6新增的一種API。Dataset想要把RDD的優點(強類型,可使用lambda表達式函數)和Spark SQL的優化執行引擎的優點結合到一塊兒。Dataset能夠由JVM對象構建(constructed )獲得,然後Dataset上可使用各類transformation算子(map,flatMap,filter 等)。sql

入口:SQLContext與SparkSession

對於2.0版本之前,Spark SQL全部的功能入口都是SQLContext 類,及其子類。shell

val sc: SparkContext // 假設已經有一個 SparkContext 對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用於包含RDD到DataFrame隱式轉換操做
import sqlContext.implicits._

對於2.0版本之後,入口變成了SparkSession,使用SparkSession.builder()來構建數據庫

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

Spark2.0引入SparkSession的目的是內建支持Hive的一些特性,包括使用HiveQL查詢,訪問Hive UDFs,從Hive表中讀取數據等,使用這些你不須要已存在的Hive配置。而在此以前,你須要引入HiveContext的依賴,並使用HiveContext來支持這些特性。express

DataFrame

DataFrame能夠從不少數據源(sources)加載數據並構造獲得,如:結構化數據文件,Hive中的表,外部數據庫,或者已有的RDD。apache

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Spark2.0以後,DataFrame僅是Dataset of Rows(對於java和Scala是這樣).DataFrame提供告終構化數據的領域專用語言支持.編程

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整的操做方法列表,請查看Dataset的api
Dataset還支持各類字符串,日期,數學等函數,列表見這裏json

編程方式執行SQL查詢

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View - 前面建立的TempView是與SparkSession相關的,隨着session結束而銷燬,若是你想跨多個Session共享,你須要使用Global Temporary View.api

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Dataset

Dataset API和RDD相似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化對象和跨網絡傳輸通訊。若是這個編碼器和標準序列化都能把對象轉字節,那麼編碼器就能夠根據代碼動態生成,並使用一種特殊數據格式,這種格式下的對象不須要反序列化回來,就能容許Spark進行操做,如過濾、排序、哈希等。

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer call(Integer value) throws Exception {
    return value + 1;
  }
}, integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

和RDD互操做

Spark SQL有兩種方法將RDD轉爲DataFrame。

  1. 使用反射機制,推導包含指定類型對象RDD的schema。這種基於反射機制的方法使代碼更簡潔,並且若是你事先知道數據schema,推薦使用這種方式;

  2. 編程方式構建一個schema,而後應用到指定RDD上。這種方式更囉嗦,但若是你事先不知道數據有哪些字段,或者數據schema是運行時讀取進來的,那麼你極可能須要用這種方式。

利用反射推導schema

Spark SQL支持自動轉換一個JavaBean的RDD爲DataFrame. 目前,SparkSQL不支持包含Map域的JavaBean轉換。你能夠建立一個實現了Serializable接口的JavaBean.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(new Function<String, Person>() {
    @Override
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");
      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));
      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.<String>getAs("name");
  }
}, stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

編程方式定義Schema

你可能須要按如下三個步驟,以編程方式的建立一個DataFrame:

  • 從已有的RDD建立一個包含Row對象的RDD

  • 用StructType建立一個schema,和步驟1中建立的RDD的結構相匹配

  • 把獲得的schema應用於包含Row對象的RDD,調用這個方法來實現這一步:SparkSession.createDataFrame

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
  @Override
  public Row call(String record) throws Exception {
    String[] attributes = record.split(",");
    return RowFactory.create(attributes[0], attributes[1].trim());
  }
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

Data Sources數據源

Spark SQL支持基於DataFrame操做一系列不一樣的數據源。DataFrame既能夠當成一個普通RDD來操做,也能夠將其註冊成一個臨時表來查詢。把DataFrame註冊爲table以後,你就能夠基於這個table執行SQL語句了。本節將描述加載和保存數據的一些通用方法,包含了不一樣的Spark數據源
在最簡單的狀況下,全部操做都會以默認類型數據源來加載數據(默認是Parquet,除非修改了spark.sql.sources.default 配置)。

Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

你也能夠手動指定數據源,並設置一些額外的選項參數。數據源可由其全名指定(如,org.apache.spark.sql.parquet),而對於內建支持的數據源,可使用簡寫名(json, parquet, jdbc)。任意類型數據源建立的DataFrame均可以用下面這種語法轉成其餘類型數據格式。

Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

直接對文件使用SQL,Spark SQL還支持直接對文件使用SQL查詢,不須要用read方法把文件加載進來。

Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

保存模式

Save操做有一個可選參數SaveMode,用這個參數能夠指定如何處理數據已經存在的狀況。很重要的一點是,這些保存模式都沒有加鎖,因此其操做也不是原子性的。另外,若是使用Overwrite模式,實際操做是,先刪除數據,再寫新數據。

  • SaveMode.ErrorIfExists (default) "error" (default) (默認模式)從DataFrame向數據源保存數據時,若是數據已經存在,則拋異常。

  • SaveMode.Append "append" 若是數據或表已經存在,則將DataFrame的數據追加到已有數據的尾部。

  • SaveMode.Overwrite "overwrite" 若是數據或表已經存在,則用DataFrame數據覆蓋之。

  • SaveMode.Ignore "ignore" 若是數據已經存在,那就放棄保存DataFrame數據。這和SQL裏CREATE TABLE IF NOT EXISTS有點相似。

保存到持久化表

DataFrame可使用saveAsTable方法將內容持久化到Hive的metastore表中.默認狀況下,saveAsTable會建立一個」managed table「,也就是說這個表數據的位置是由metastore控制的。一樣,若是刪除表,其數據也會同步刪除。

Parquet文件

Parquet 是一種流行的列式存儲格式。Spark SQL提供對Parquet文件的讀寫支持,並且Parquet文件可以自動保存原始數據的schema。寫Parquet文件的時候,全部的字段都會自動轉成nullable,以便向後兼容。

編程方式加載數據

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

其他關鍵特性:請看官方文檔

  • 分區發現

  • Schema合併

  • Hive metastore Parquet table轉換

  • 刷新元數據

  • 配置

JSON數據集

Spark SQL在加載JSON數據的時候,能夠自動推導其schema並返回 Dataset<Row>。用SparkSession.read().json()讀取一個包含String的RDD或者JSON文件,便可實現這一轉換。

注意,一般所說的json文件只是包含一些json數據的文件,而不是咱們所須要的JSON格式文件。JSON格式文件必須每一行是一個獨立、完整的的JSON對象。所以,一個常規的多行json文件常常會加載失敗。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
        new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

Hive表

Spark SQL支持從Hive中讀寫數據.然而,Hive依賴項太多,因此沒有把Hive包含在默認的Spark發佈包裏。要支持Hive,須要把相關的jar包放到classpath中(注意是全部節點的).
配置文件hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) 放在conf/.
首先你須要初始化SparkSession,可是若是你沒有存在的Hive部署,仍然能夠獲得Hive支持。

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = "spark-warehouse";
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Key: " + row.get(0) + ", Value: " + row.get(1);
  }
}, Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...

和不一樣版本的Hive Metastore交互: 略,請看官方文檔

用JDBC鏈接其餘數據庫

Spark SQL也能夠用JDBC訪問其餘數據庫。這一功能應該優先於使用JdbcRDD。由於它返回一個DataFrame,而DataFrame在Spark SQL中操做更簡單,且更容易和來自其餘數據源的數據進行交互關聯。
首先,你須要在spark classpath中包含對應數據庫的JDBC driver,下面這行包括了用於訪問postgres的數據庫driver

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

遠程數據庫的表能夠經過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。如下是選項列表:

  • url : 普通jdbc url

  • dbtable 須要讀取的JDBC表。注意,任何能夠填在SQL的where子句中的東西,均可以填在這裏。(既能夠填完整的表名,也可填括號括起來的子查詢語句)

  • driver JDBC driver的類名。這個類必須在master和worker節點上均可用,這樣各個節點才能將driver註冊到JDBC的子系統中。

  • fetchsize JDBC fetch size,決定每次獲取多少行數據。默認爲 1000.

  • isolationLevel 可選有NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE,默認爲READ_UNCOMMITTED.

  • truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.

  • createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

疑難解答
JDBC driver class必須在全部client session或者executor上,對java的原生classloader可見。這是由於Java的DriverManager在打開一個鏈接以前,會作安全檢查,並忽略全部對原聲classloader不可見的driver。最簡單的一種方法,就是在全部worker節點上修改compute_classpath.sh,幷包含你所需的driver jar包。一些數據庫,如H2,會把全部的名字轉大寫。對於這些數據庫,在Spark SQL中必須也使用大寫。

性能調整

內存緩存
Spark SQL能夠經過調用SQLContext.cacheTable(「tableName」)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨後,Spark SQL將會掃描必要的列,並自動調整壓縮比例,以減小內存佔用和GC壓力。你也能夠用SQLContext.uncacheTable(「tableName」)來刪除內存中的table。

你還可使用SQLContext.setConf 或在SQL語句中運行SET key=value命令,來配置內存中的緩存。

  • spark.sql.inMemoryColumnarStorage.compressed true 若是設置爲true,Spark SQL將會根據數據統計信息,自動爲每一列選擇單獨的壓縮編碼方式。

  • spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式緩存批量的大小。增大批量大小能夠提升內存利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。
    其餘配置選項

如下選項一樣也能夠用來給查詢任務調性能。不過這些選項在將來可能被放棄,由於spark將支持愈來愈多的自動優化。

  • spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.

  • spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).

  • spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

  • spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操做時,可以做爲廣播變量的最大table的大小。設置爲-1,表示禁用廣播。注意,目前的元數據統計僅支持Hive metastore中的表,而且須要運行這個命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan

  • spark.sql.shuffle.partitions 200 配置數據混洗(shuffle)時(join或者聚合操做),使用的分區數。

分佈式SQL引擎

Spark SQL能夠做爲JDBC/ODBC或者命令行工具的分佈式查詢引擎。在這種模式下,終端用戶或應用程序,無需寫任何代碼,就能夠直接在Spark SQL中運行SQL查詢。
略。

使用Spark SQL命令行工具

Spark SQL CLI是一個很方便的工具,它能夠用local mode運行hive metastore service,而且在命令行中執行輸入的查詢。注意Spark SQL CLI目前還不支持和Thrift JDBC server通訊。

用以下命令,在spark目錄下啓動一個Spark SQL CLI

./bin/spark-sql

Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設置。你能夠用這個命令查看完整的選項列表:./bin/spark-sql –help

聚合

內建的DataFrame函數提供瞭如count(), countDistinct(), avg(), max(), min()等經常使用的聚合操做,用戶也能夠自定義一些聚合函數。
Untyped User-Defined Aggregate Functions

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List<StructField> inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List<StructField> bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  }
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  }
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  }
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  }
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  }
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
      long updatedSum = buffer.getLong(0) + input.getLong(0);
      long updatedCount = buffer.getLong(1) + 1;
      buffer.update(0, updatedSum);
      buffer.update(1, updatedCount);
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
  }
  // Calculates the final result
  public Double evaluate(Row buffer) {
    return ((double) buffer.getLong(0)) / buffer.getLong(1);
  }
}

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Type-Safe User-Defined Aggregate Functions

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...

}

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator<Employee, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // Specifies the Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // Specifies the Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();
  }
}

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

參考:

http://spark.apache.org/docs/...
http://ifeve.com/apache-spark/

相關文章
相關標籤/搜索