基於spark1.3.1的spark-sql實戰-01


sqlContext總的一個過程以下圖所示:html

  1. SQL語句通過SqlParse解析成UnresolvedLogicalPlan;sql

  2. 使用analyzer結合數據數據字典(catalog)進行綁定,生成resolvedLogicalPlan;數據庫

  3. 使用optimizer對resolvedLogicalPlan進行優化,生成optimizedLogicalPlan;apache

  4. 使用SparkPlan將LogicalPlan轉換成PhysicalPlan;json

  5. 使用prepareForExecution()將PhysicalPlan轉換成可執行物理計劃;app

  6. 使用execute()執行可執行物理計劃;ide

  7. 生成SchemaRDD。
    函數



  1. SQL語句通過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程當中對hiveql語句使用getAst()獲取AST樹,而後再進行解析;工具

  2. 使用analyzer結合數據hive源數據Metastore(新的catalog)進行綁定,生成resolved LogicalPlan;oop

  3. 使用optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan,優化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;

  4. 使用hivePlanner將LogicalPlan轉換成PhysicalPlan;

  5. 使用prepareForExecution()將PhysicalPlan轉換成可執行物理計劃;

  6. 使用execute()執行可執行物理計劃;

  7. 執行後,使用map(_.copy)將結果導入SchemaRDD。


spark sql 三個核心部分:

1. 能夠加載各類結構化數據源(e.g., JSON, Hive, and Parquet).
2.  可讓你經過SQL ,spark 內部程序或者外部工具,經過標準的數據庫鏈接(JDBC/ODBC)鏈接spark,好比一個商業智能的工具Tableau

3.當你經過使用spark程序,spark sql 提供豐富又智能的SQL或者 regular Python/Java/Scala code,包括 join RDDS ,SQL tables ,使用SQL自定義用戶函數

DataFrames

A DataFrame isa distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

SQLContext:


除了SQLContext以外 ,還有HiveContext 來建立,HiveContext包含是SQLContext的,功能比SQLContext更強大,能夠操做HiveQL還能夠定義UDF,在spark1.3.1之後版本更推薦使用HiveContext,可是須要依賴Hive jar包

Creating DataFrames

擁有SQLContext就能夠建立DataFrames from an existing RDD, from a Hive table, or from data sources.

因爲Hadoop使用了 lzo壓縮方式,因此也須要在spark指定Hadoop Lzo的jar包,不然會報錯」Compression codec com.hadoop.compression.lzo.LzoCodec not found.「

在spark_home/conf/spark_env.sh目錄增長以下配置:

export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/home/wangyue/opt/hadoop/hadoop-2.3.0-cdh5.1.0/lib/native/Linux-amd64-64/*:/home/wangyue/opt/hadoop/hadoop-2.3.0-cdh5.1.0/share/hadoop/common/hadoop-lzo-0.4.15-cdh5.1.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/wangyue/opt/hadoop/hadoop-2.3.0-cdh5.1.0/share/hadoop/common/hadoop-lzo-0.4.15-cdh5.1.0.jar

重啓spark集羣后:


已經能夠在classpath中看到lzo jar

加載Json文件:


經過show方法查詢dataframe數據



DataFrame Operations




// Select everybody, but increment the age by 1



// Select people older than 21



// Count people by age



Running SQL Queries Programmatically

The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.

val sqlContext = ...  // An existing SQLContextval df = sqlContext.sql("SELECT * FROM table")

目前1.3.1版本後 能夠經過SQLContext 運行 SQL程序,而後返回DataFrame格式的結果

目前有兩種方式將RDD 轉成DataFrame

1. Inferring the Schema Using Reflection

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._// Define the schema using a case class.// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interface.case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.teenagers.map(t => "Name: " + t(0)).collect().foreach(println)



2. Programmatically Specifying the Schema

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. Create an RDD of Rows from the original RDD;

  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

For example:

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDDval people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Import Row.import org.apache.spark.sql.Row;// Import Spark SQL data typesimport org.apache.spark.sql.types.{StructType,StructField,StringType};// Generate the schema based on the string of schemaval schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.results.map(t => "Name: " + t(0)).collect().foreach(println)


Data Sources

1. Generic Load/Save Functions

yal df = sqlContext.load("people.parquet")df.select("name", "age").save("namesAndAges.parquet")

2. Manually Specifying Options

val df = sqlContext.load("people.json", "json")df.select("name", "age").save("namesAndAges.parquet", "parquet")

Save Modes


Saving to Persistent Tables

在HiveContext 下,DataFrame 會使用saveAsTable命令會將數據等信息保存到HiveMetastore中,這樣即便重啓啓動spark sql還能活取到HiveMetastore中的數據

在SQLContext下,DataFrame 會使用saveAsTable命令會將數據等信息保存到managed table中,但這些數據經過metastore控制,當表執行drop會刪除metastore中數據

Parquet Files

1. Loading Data Programmatically

// sqlContext from the previous example is used in this example.// This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() ... // An RDD of case class objects, from the previous example.// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.people.saveAsParquetFile("people.parquet")// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.// The result of loading a Parquet file is also a DataFrame.val parquetFile = sqlContext.parquetFile("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.parquetFile.registerTempTable("parquetFile")val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
// sqlContext from the previous example is used in this example.// This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._val people = sc.textFile("file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.people.saveAsParquetFile("people.parquet")// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.// The result of loading a Parquet file is also a DataFrame.val parquetFile = sqlContext.parquetFile("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.parquetFile.registerTempTable("parquetFile")val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")teenagers.map(t => "Name: " + t(0)).collect().foreach(println)



Schema merging

/ sqlContext from the previous example is used in this example.// This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directoryval df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")df1.saveAsParquetFile("data/test_table/key=1")// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnval df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")df2.saveAsParquetFile("data/test_table/key=2")// Read the partitioned tableval df3 = sqlContext.parquetFile("data/test_table")df3.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partiioning column appeared in the partition directory paths.// root// |-- single: int (nullable = true)// |-- double: int (nullable = true)// |-- triple: int (nullable = true)// |-- key : int (nullable = true)


Configuration


JSON Datasets

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using one of two methods in a SQLContext:

  • jsonFile - loads data from a directory of JSON files where each line of the files is a JSON object.

  • jsonRDD - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.

Note that the file that is offered as jsonFile is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text files.val path = "file:///home/wangyue/opt/spark/spark-1.3.1-bin-hadoop2.3/examples/src/main/resources/people.json"// Create a DataFrame from the file(s) pointed to by pathval people = sqlContext.jsonFile(path)// The inferred schema can be visualized using the printSchema() method.people.printSchema()// root//  |-- age: integer (nullable = true)//  |-- name: string (nullable = true)// Register this DataFrame as a table.people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by// an RDD[String] storing one JSON object per string.val anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)



所有查詢:

scala> val anotherPeopleSql = sqlContext.sql("select name,address.city  from anotherPeople")

scala> anotherPeopleSql.map(t => "Name: " + t(0)+ " city:"+t(1)).collect().foreach(println)



尊重原創,未經容許不得轉載:http://blog.csdn.net/stark_summer/article/details/45825177

相關文章
相關標籤/搜索