1,JSON數據集java
Spark SQL能夠自動推斷JSON數據集的模式,並將其做爲一個Dataset[Row]
。這個轉換能夠SparkSession.read.json()
在一個Dataset[String]
或者一個JSON文件上完成。mysql
請注意,做爲json文件提供的文件不是典型的JSON文件。每行必須包含一個單獨的,獨立的有效JSON對象。有關更多信息,請參閱 JSON行文本格式,也稱爲換行符分隔的JSON。sql
對於常規的多行JSON文件,請將該multiLine
選項設置爲true
。例以下面的例子:shell
private def runJsonDatasetExample(spark: SparkSession): Unit = { import spark.implicits._ //建立數據集時,經過導入這些 //元素能夠支持原始類型(Int,String等)和Product類型(case類)編碼器。import spark.implicits._ // JSON數據集是經過路徑指向的。 // 路徑能夠是單個文本文件,也能夠是存放文本文件的目錄 val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) //推斷的模式可使用printSchema()方法 peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) //使用DataFrame peopleDF.createOrReplaceTempView("people") // SQL語句可使用spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ //或者,也能夠爲表示的JSON數據集建立一個DataFrame //數據集[String]每一個字符串 val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ }
2,Hive 表數據庫
1) Spark SQL也支持讀寫存儲在Apache Hive中的數據。可是,因爲Hive具備大量依賴項,所以這些依賴項不包含在默認的Spark分發中。若是能夠在類路徑上找到Hive依賴關係,則Spark將自動加載它們。請注意,這些Hive依賴項也必須存在於全部工做節點上,由於它們須要訪問Hive序列化和反序列化庫(SerDes)才能訪問存儲在Hive中的數據。
配置Hive是經過放置你的hive-site.xml,core-site.xml(用於安全配置)和hdfs-site.xml(用於HDFS配置)文件來完成的conf/。
使用Hive時,必須SparkSession使用Hive支持進行實例化,包括鏈接到持續的Hive Metastore,支持Hive serdes和Hive用戶定義的函數。沒有現有Hive部署的用戶仍然能夠啓用Hive支持。當未配置時hive-site.xml,上下文metastore_db在當前目錄中自動建立,並建立一個目錄spark.sql.warehouse.dir,該目錄默認爲 spark-warehouseSpark應用程序啓動的當前目錄中的目錄。請注意,自Spark 2.0.0以來,該hive.metastore.warehouse.dir屬性hive-site.xml已被棄用。而是使用spark.sql.warehouse.dir指定倉庫中數據庫的默認位置。您可能須要向啓動Spark應用程序的用戶授予寫權限。apache
import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object SparkHiveExample { case class Record(key: Int, value: String) def main(args: Array[String]) { // warehouseLocation指向託管數據庫的默認位置,表 val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") //查詢以HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... //聚合查詢也被支持。 sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // SQL查詢的結果自己就是DataFrame,而且支持全部正常的函數。 val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // DataFrames中的項目類型爲Row,它容許您經過序號訪問每一個列。 val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... //您也可使用DataFrame在SparkSession中建立臨時視圖。 val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") //查詢而後能夠將DataFrame數據與存儲在Hive中的數據結合起來。 sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... spark.stop() } }
2) 指定Hive表格的存儲格式
當你建立一個Hive表時,你須要定義這個表應該如何從/向文件系統讀/寫數據,即「輸入格式」和「輸出格式」。您還須要定義這個表應該如何將數據反序列化爲行,或者將行序列化爲數據,即「serde」。如下選項可用於指定存儲格式(「serde」,「輸入格式」,「輸出格式」),例如CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默認狀況下,咱們將以純文本形式讀取表格文件。請注意,Hive存儲處理程序在建立表時不受支持,您可使用Hive端的存儲處理程序建立一個表,而後使用Spark SQL來讀取它。 json
屬性名稱 | 含義 |
fileFormat | fileFormat是一種存儲格式規範包,包括「serde」,「輸入格式」和「輸出格式」。目前咱們支持6個fileFormats:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。 |
inputFormat, outputFormat | 這兩個選項將相應的`InputFormat`和`OutputFormat`類的名字指定爲一個字符串,例如`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。這兩個選項必須成對出現,若是您 已經指定`fileFormat`選項,則不能指定它們。 |
serde | 這個選項指定一個serde類的名字。當指定`fileFormat`選項時,若是給定的`fileFormat` 已經包含了serde的信息,就不要指定這個選項。目前「sequencefile」,「textfile」和「rcfile」不包 括serde信息,你可使用這個選項和這3個fileFormats。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim | 這些選項只能與「textfile」fileFormat一塊兒使用。他們定義瞭如何將分隔文件讀入行。 |
定義的全部其餘屬性OPTIONS將被視爲Hive serde屬性。安全
3) 與不一樣版本的Hive Metastore交互服務器
Spark SQL的Hive支持最重要的部分之一是與Hive Metastore進行交互,這使Spark SQL能夠訪問Hive表的元數據。從Spark 1.4.0開始,使用下面描述的配置,可使用Spark SQL的單個二進制版本查詢不一樣版本的Hive metastore。請注意,獨立於用於與Metastore對話的Hive版本,內部Spark SQL將針對Hive 1.2.1進行編譯,並將這些類用於內部執行(serdes,UDF,UDAF等)。
如下選項可用於配置用於檢索元數據的Hive版本:併發
屬性名稱 | 默認 | 含義 |
spark.sql.hive.metastore.version | 1.2.1 | hive Metastore版本。可用的選項是0.12.0經過1.2.1。 |
spark.sql.hive.metastore.jars | builtin | 應該用來實例化HiveMetastoreClient的罐子的位置。該屬性能夠是如下三個選項之一: 1,builtin -Phive啓用 時,使用與Spark程序集捆綁在一塊兒的Hive 1.2.1 。選擇此選項時,spark.sql.hive.metastore.version 必須是1.2.1或者沒有定義。 2,maven 使用從Maven存儲庫下載的指定版本的Hive jar。一般不建議將此配置用於生產部署。 3,JVM標準格式的類路徑。這個類路徑必須包含全部Hive及其依賴項,包括正確版本的Hadoop。 這些瓶子只須要在驅動程序中出現,可是若是您正在以紗線羣集模式運行,則必須確保它們與您的應用程序一塊兒打包。 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc |
應該使用在Spark SQL和特定版本的Hive之間共享的類加載程序加載的類前綴的逗號分隔列表。 應該共享的類的示例是須要與Metastore對話的JDBC驅動程序。其餘須要共享的類是那些與已 經共享的類進行交互的類。例如,由log4j使用的自定義appender。 |
spark.sql.hive.metastore.barrierPrefixes | (empty) | 一個以逗號分隔的類前綴列表,應該針對Spark SQL正在與之進行通訊的每一個Hive版本顯式 從新加載。例如,Hive UDF聲明在一般會被共享(即org.apache.spark.*)的前綴中。 |
2,JDBC到其餘數據庫
1) Spark SQL還包含一個可使用JDBC從其餘數據庫讀取數據的數據源。這個功能應該比使用JdbcRDD更受歡迎。這是由於結果做爲DataFrame返回,而且能夠輕鬆地在Spark SQL中處理或者與其餘數據源結合使用。JDBC數據源也更容易從Java或Python使用,由於它不須要用戶提供ClassTag。(請注意,這與Spark SQL JDBC服務器不一樣,後者容許其餘應用程序使用Spark SQL運行查詢)。
要開始,您將須要在Spark類路徑中爲您的特定數據庫包含JDBC驅動程序。例如,要從Spark Shell鏈接到postgres,您能夠運行如下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
遠程數據庫中的表可使用Data Sources API做爲DataFrame或Spark SQL臨時視圖加載。用戶能夠在數據源選項中指定JDBC鏈接屬性。 user和password一般用於登陸到數據源提供爲鏈接屬性。除了鏈接屬性以外,Spark還支持如下不區分大小寫的選項:
屬性名稱 | 含義 |
url | 要鏈接到的JDBC URL。源特定的鏈接屬性能夠在URL中指定。例如,jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 應該讀取的JDBC表。請注意,FROM可使用在SQL查詢的子句中有效的任何內容。例如,而不是一個完整的表,你也能夠在括號中使用子查詢。 |
driver | 用於鏈接到此URL的JDBC驅動程序的類名。 |
partitionColumn, lowerBound, upperBound | 若是指定了這些選項,則必須指定這些選項。另外, numPartitions必須指定。他們描述瞭如何從多個工做人員平行讀取時對錶格進行分區。 partitionColumn必須是相關表格中的數字列。請注意,lowerBound和upperBound只是用來決定分區步幅,而不是在表中過濾行。因此表中的全部行都將被分區並返回。這個選項只適用於閱讀。 |
numPartitions | 表格讀取和寫入中可用於並行的分區的最大數目。這也決定了併發JDBC鏈接的最大數量。若是要寫入的分區數量超過此限制,則coalesce(numPartitions)在寫入以前調用它,將其減小到此限制。 |
fetchsize | JDBC提取大小,它決定每次往返取多少行。這能夠幫助默認爲低讀取大小的JDBC驅動程序(例如,具備10行的Oracle)執行性能。這個選項只適用於閱讀。 |
batchsize | JDBC批量大小,用於肯定每次往返要插入多少行。這能夠幫助JDBC驅動程序的性能。這個選項只適用於寫做。它默認爲1000。 |
isolationLevel | 事務隔離級別,適用於當前鏈接。它能夠是一個NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對應於由JDBC的鏈接對象定義,缺省值爲標準事務隔離級別READ_UNCOMMITTED。這個選項只適用於寫做。請參閱中的文檔java.sql.Connection。 |
truncate | 這是一個JDBC編寫器相關的選項。當SaveMode.Overwrite啓用時,此選項會致使Spark截斷現有的表,而不是刪除並從新建立它。這能夠更高效,並防止表元數據(例如,索引)被刪除。可是,在某些狀況下,例如新數據具備不一樣的模式時,它將不起做用。它默認爲false。這個選項只適用於寫做。 |
createTableOptions | 這是一個JDBC編寫器相關的選項。若是指定,則此選項容許在建立表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)時設置數據庫特定的表和分區選項。這個選項只適用於寫做。 |
createTableColumnTypes | 建立表時使用的數據庫列數據類型,而不是默認值。數據類型信息應該使用與CREATE TABLE列語法相同的格式來指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的類型應該是有效的spark sql數據類型,該選項僅適用於寫入。 |
例子以下:
private def runJdbcDatasetExample(spark: SparkSession): Unit = { //注意:能夠經過load / save或jdbc方法來實現JDBC加載和保存 //從JDBC源加載數據 val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) //將數據保存到JDBC源 connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) //在寫入 jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) }
2) 可能遇到的異常和故障排除
JDBC驅動程序類必須對客戶端會話和全部執行者上的原始類加載器可見。這是由於Java的DriverManager類執行了一個安全檢查,致使它忽略了當打開一個鏈接時,原始類加載器不可見的全部驅動程序。一個方便的方法是修改全部工做節點上的compute_classpath.sh以包含驅動程序JAR。 某些數據庫(如H2)將全部名稱轉換爲大寫。您須要使用大寫字母來引用Spark SQL中的這些名稱。