Spark版本:Spark 1.5.2
html
轉載請註明出處:http://www.cnblogs.com/BYRans/
java
Spark SQL是Spark的一個組件,用於結構化數據的計算。Spark SQL提供了一個稱爲DataFrames的編程抽象,DataFrames能夠充當分佈式SQL查詢引擎。sql
DataFrame是一個分佈式的數據集合,該數據集合以命名列的方式進行整合。DataFrame能夠理解爲關係數據庫中的一張表,也能夠理解爲R/Python中的一個data frame。DataFrames能夠經過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程當中生成的RDD等。
DataFrame的API支持4種語言:Scala、Java、Python、R。shell
Spark SQL程序的主入口是SQLContext類或它的子類。建立一個基本的SQLContext,你只須要SparkContext,建立代碼示例以下:數據庫
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
除了基本的SQLContext,也能夠建立HiveContext。SQLContext和HiveContext區別與聯繫爲:express
HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,能夠在部署基本的Spark的時候就不須要Hive的依賴包,須要使用HiveContext時再把Hive的各類依賴包加進來。apache
SQL的解析器能夠經過配置spark.sql.dialect參數進行配置。在SQLContext中只能使用Spark SQL提供的」sql「解析器。在HiveContext中默認解析器爲」hiveql「,也支持」sql「解析器。編程
使用SQLContext,spark應用程序(Application)能夠經過RDD、Hive表、JSON格式數據等數據源建立DataFrames。下面是基於JSON文件建立DataFrame的示例:json
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show()
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();
DataFrames支持Scala、Java和Python的操做接口。下面是Scala和Java的幾個操做示例:api
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1
詳細的DataFrame API請參考 API Documentation。
除了簡單列引用和表達式,DataFrames還有豐富的library,功能包括string操做、date操做、常見數學操做等。詳細內容請參考 DataFrame Function Reference。
Spark Application可使用SQLContext的sql()方法執行SQL查詢操做,sql()方法返回的查詢結果爲DataFrame格式。代碼以下:
val sqlContext = ... // An existing SQLContext val df = sqlContext.sql("SELECT * FROM table")
SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table")
Spark SQL支持兩種RDDs轉換爲DataFrames的方式:
Spark SQL支持將JavaBean的RDD自動轉換成DataFrame。經過反射獲取Bean的基本信息,依據Bean的信息定義Schema。當前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和複雜數據類型(如:List、Array)。建立一個實現Serializable接口包含全部屬性getters和setters的類來建立一個JavaBean。經過調用createDataFrame並提供JavaBean的Class object,指定一個Schema給一個RDD。示例以下:
public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
// sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();
當JavaBean不能被預先定義的時候,編程建立DataFrame分爲三步:
示例以下:
import org.apache.spark.api.java.function.Function; // Import factory methods provided by DataTypes. import org.apache.spark.sql.types.DataTypes; // Import StructType and StructField import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; // Import Row. import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt"); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString.split(" ")) { fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map( new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();
Spark SQL的DataFrame接口支持多種數據源的操做。一個DataFrame能夠進行RDDs方式的操做,也能夠被註冊爲臨時表。把DataFrame註冊爲臨時表以後,就能夠對該DataFrame執行SQL查詢。Data Sources這部分首先描述了對Spark的數據源執行加載和保存的經常使用方法,而後對內置數據源進行深刻介紹。
Spark SQL的默認數據源爲Parquet格式。數據源爲Parquet文件時,Spark SQL能夠方便的執行全部的操做。修改配置項spark.sql.sources.default,可修改默認數據源格式。讀取Parquet文件示例以下:
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
當數據源格式不是parquet格式文件時,須要手動指定數據源的格式。數據源格式須要指定全名(例如:org.apache.spark.sql.parquet),若是數據源格式爲內置格式,則只須要指定簡稱(json,parquet,jdbc)。經過指定的數據源格式名,能夠對DataFrames進行類型轉換操做。示例以下:
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:
當使用HiveContext時,能夠經過saveAsTable方法將DataFrames存儲到表中。與registerTempTable方法不一樣的是,saveAsTable將DataFrame中的內容持久化到表中,並在HiveMetastore中存儲元數據。存儲一個DataFrame,可使用SQLContext的table方法。table先建立一個表,方法參數爲要建立的表的表名,而後將DataFrame持久化到這個表中。
默認的saveAsTable方法將建立一個「managed table」,表示數據的位置能夠經過metastore得到。當存儲數據的表被刪除時,managed table也將自動刪除。
Parquet是一種支持多種數據處理系統的柱狀的數據格式,Parquet文件中保留了原始數據的模式。Spark SQL提供了Parquet文件的讀寫功能。
讀取Parquet文件示例以下:
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// sqlContext from the previous example is used in this example. DataFrame schemaPeople = ... // The DataFrame from the previous example. // DataFrames can be saved as Parquet files, maintaining the schema information. schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();
對錶進行分區是對數據進行優化的方式之一。在分區的表內,數據經過分區列將數據存儲在不一樣的目錄下。Parquet數據源如今可以自動發現並解析分區信息。例如,對人口數據進行分區存儲,分區列爲gender和country,使用下面的目錄結構:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
經過傳遞path/to/table給 SQLContext.read.parquet或SQLContext.read.load,Spark SQL將自動解析分區信息。返回的DataFrame的Schema以下:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
須要注意的是,數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數爲:spark.sql.sources.partitionColumnTypeInference.enabled,默認值爲true。若是想關閉該功能,直接將該參數設置爲disabled。此時,分區列數據格式將被默認設置爲string類型,再也不進行類型解析。
像ProtocolBuffer、Avro和Thrift那樣,Parquet也支持Schema evolution(Schema演變)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。如今Parquet數據源能自動檢測這種狀況,併合並這些文件的schemas。
由於Schema合併是一個高消耗的操做,在大多數狀況下並不須要,因此Spark SQL從1.5.0開始默認關閉了該功能。能夠經過下面兩種方式開啓該功能:
示例以下:
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
當向Hive metastore中讀寫Parquet表時,Spark SQL將使用Spark SQL自帶的Parquet SerDe(SerDe:Serialize/Deserilize的簡稱,目的是用於序列化和反序列化),而不是用Hive的SerDe,Spark SQL自帶的SerDe擁有更好的性能。這個優化的配置參數爲spark.sql.hive.convertMetastoreParquet,默認值爲開啓。
從表Schema處理的角度對比Hive和Parquet,有兩個區別:
因爲這兩個區別,當將Hive metastore Parquet錶轉換爲Spark SQL Parquet表時,須要將Hive metastore schema和Parquet schema進行一致化。一致化規則以下:
Spark SQL緩存了Parquet元數據以達到良好的性能。當Hive metastore Parquet錶轉換爲enabled時,表修改後緩存的元數據並不能刷新。因此,當表被Hive或其它工具修改時,則必須手動刷新元數據,以保證元數據的一致性。示例以下:
// sqlContext is an existing HiveContext sqlContext.refreshTable("my_table")
// sqlContext is an existing HiveContext sqlContext.refreshTable("my_table")
配置Parquet可使用SQLContext的setConf方法或使用SQL執行SET key=value命令。詳細參數說明以下:
Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換爲DataFrame。
須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
// sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people"); // SQL statements can be run by using the sql methods provided by sqlContext. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);
Spark SQL支持對Hive的讀寫操做。須要注意的是,Hive所依賴的包,沒有包含在Spark assembly包中。增長Hive時,須要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。這兩個配置將build一個新的assembly包,這個assembly包含了Hive的依賴包。注意,必須上這個心的assembly包到全部的worker節點上。由於worker節點在訪問Hive中數據時,會調用Hive的 serialization and deserialization libraries(SerDes),此時將用到Hive的依賴包。
Hive的配置文件爲conf/目錄下的hive-site.xml文件。在YARN上執行查詢命令以前,lib_managed/jars目錄下的datanucleus包和conf/目錄下的hive-site.xml必須能夠被driverhe和全部的executors所訪問。確保被訪問,最方便的方式就是在spark-submit命令中經過--jars選項和--file選項指定。
操做Hive時,必須建立一個HiveContext對象,HiveContext繼承了SQLContext,並增長了對MetaStore和HiveQL的支持。除了sql方法,HiveContext還提供了一個hql方法,hql方法能夠執行HiveQL語法的查詢語句。示例以下:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
// sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();
Spark SQL常常須要訪問Hive metastore,Spark SQL能夠經過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。注意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操做(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。版本配置項見下面表格:
Spark SQL支持使用JDBC訪問其餘數據庫。當時用JDBC訪問其它數據庫時,最好使用JdbcRDD。使用JdbcRDD時,Spark SQL操做返回的DataFrame會很方便,也會很方便的添加其餘數據源數據。JDBC數據源由於不須要用戶提供ClassTag,因此很適合使用Java或Python進行操做。
使用JDBC訪問數據源,須要在spark classpath添加JDBC driver配置。例如,從Spark Shell鏈接postgres的配置爲:
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
遠程數據庫的表,可用DataFrame或Spark SQL臨時表的方式調用數據源API。支持的參數有:
代碼示例以下:
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();
Spark SQL能夠經過調用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),將表用一種柱狀格式( an inmemory columnar format)緩存至內存中。而後Spark SQL在執行查詢任務時,只需掃描必需的列,從而以減小掃描數據量、提升性能。經過緩存數據,Spark SQL還能夠自動調節壓縮,從而達到最小化內存使用率和下降GC壓力的目的。調用sqlContext.uncacheTable("tableName")可將緩存的數據移出內存。
可經過兩種配置方式開啓緩存數據功能:
能夠經過配置下表中的參數調節Spark SQL的性能。在後續的Spark版本中將逐漸加強自動調優功能,下表中的參數在後續的版本中或許將再也不須要配置。
使用Spark SQL的JDBC/ODBC或者CLI,能夠將Spark SQL做爲一個分佈式查詢引擎。終端用戶或應用不須要編寫額外的代碼,能夠直接使用Spark SQL執行SQL查詢。
這裏運行的Thrift JDBC/ODBC服務與Hive 1.2.1中的HiveServer2一致。能夠在Spark目錄下執行以下命令來啓動JDBC/ODBC服務:
./sbin/start-thriftserver.sh
這個命令接收全部 bin/spark-submit
命令行參數,添加一個 --hiveconf
參數來指定Hive的屬性。詳細的參數說明請執行命令 ./sbin/start-thriftserver.sh --help
。
服務默認監聽端口爲localhost:10000。有兩種方式修改默認監聽端口:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
使用 beeline
來測試Thrift JDBC/ODBC服務:
./bin/beeline
鏈接到Thrift JDBC/ODBC服務
beeline> !connect jdbc:hive2://localhost:10000
在非安全模式下,只須要輸入機器上的一個用戶名便可,無需密碼。在安全模式下,beeline會要求輸入用戶名和密碼。安全模式下的詳細要求,請閱讀beeline documentation的說明。
配置Hive須要替換 conf/
目錄下的 hive-site.xml
。
Thrift JDBC服務也支持經過HTTP傳輸發送thrift RPC messages。開啓HTTP模式須要將下面的配參數配置到系統屬性或 conf/:
下的 hive-site.xml
中
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
測試http模式,可使用beeline連接JDBC/ODBC服務:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
Spark SQL CLI能夠很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。須要注意的是,Spark SQL CLI不能與Thrift JDBC服務交互。
在Spark目錄下執行以下命令啓動Spark SQL CLI:
./bin/spark-sql
配置Hive須要替換 conf/
下的 hive-site.xml
。執行 ./bin/spark-sql --help
可查看詳細的參數說明 。
Spark SQL與Hive Metastore、SerDes、UDFs相兼容。Spark SQL兼容Hive Metastore從0.12到1.2.1的全部版本。Spark SQL也與Hive SerDes和UDFs相兼容,當前SerDes和UDFs是基於Hive 1.2.1。
Spark SQL Thrift JDBC服務與Hive相兼容,在已存在的Hive上部署Spark SQL Thrift服務不須要對已存在的Hive Metastore作任何修改,也不須要對數據作任何改動。
Spark SQL支持多部分的Hive特性,例如:
下面是當前不支持的Hive特性,其中大部分特性在實際的Hive使用中不多用到。
Major Hive Features
Esoteric Hive Features
Hive Input/Output Formats
Hive優化
部分Hive優化尚未添加到Spark中。沒有添加的Hive優化(好比索引)對Spark SQL這種in-memory計算模型來講不是特別重要。下列Hive優化將在後續Spark SQL版本中慢慢添加。
SET spark.sql.shuffle.partitions=[num_tasks];
」控制post-shuffle的並行度,不能自動檢測。
Spark SQL和DataFrames支持的數據格式以下:
Spark SQL全部的數據類型在 org.apache.spark.sql.types
包內。不一樣語言訪問或建立數據類型方法不同:
import org.apache.spark.sql.types._
,再進行數據類型訪問或建立操做。org.apache.spark.sql.types.DataTypes
中的工廠方法,以下表:
當處理float或double類型時,若是類型不符合標準的浮點語義,則使用專門的處理方式NaN。須要注意的是: