spark SQL (五)數據源 Data Source----json hive jdbc等數據的的讀取與加載

1,JSON數據集java

 

       Spark SQL能夠自動推斷JSON數據集的模式,並將其做爲一個Dataset[Row]這個轉換能夠SparkSession.read.json()在一個Dataset[String]或者一個JSON文件上完成。mysql

     請注意,做爲json文件提供的文件不是典型的JSON文件。每行必須包含一個單獨的,獨立的有效JSON對象。有關更多信息,請參閱 JSON行文本格式,也稱爲換行符分隔的JSONsql

      對於常規的多行JSON文件,請將該multiLine選項設置true。例以下面的例子:shell

private def runJsonDatasetExample(spark: SparkSession): Unit = {

    import spark.implicits._

    //建立數據集時,經過導入這些
    //元素能夠支持原始類型(Int,String等)和Product類型(case類)編碼器。import  spark.implicits._

    // JSON數據集是經過路徑指向的。
    // 路徑能夠是單個文本文件,也能夠是存放文本文件的目錄

    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)

    //推斷的模式可使用printSchema()方法
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)

    //使用DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL語句可使用spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+

    //或者,也能夠爲表示的JSON數據集建立一個DataFrame
    //數據集[String]每一個字符串
    val otherPeopleDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
  }

2,Hive 表數據庫

       1) Spark SQL也支持讀寫存儲在Apache Hive中的數據。可是,因爲Hive具備大量依賴項,所以這些依賴項不包含在默認的Spark分發中。若是能夠在類路徑上找到Hive依賴關係,則Spark將自動加載它們。請注意,這些Hive依賴項也必須存在於全部工做節點上,由於它們須要訪問Hive序列化和反序列化庫(SerDes)才能訪問存儲在Hive中的數據。
       配置Hive是經過放置你的hive-site.xml,core-site.xml(用於安全配置)和hdfs-site.xml(用於HDFS配置)文件來完成的conf/。
       使用Hive時,必須SparkSession使用Hive支持進行實例化,包括鏈接到持續的Hive Metastore,支持Hive serdes和Hive用戶定義的函數。沒有現有Hive部署的用戶仍然能夠啓用Hive支持。當未配置時hive-site.xml,上下文metastore_db在當前目錄中自動建立,並建立一個目錄spark.sql.warehouse.dir,該目錄默認爲 spark-warehouseSpark應用程序啓動的當前目錄中的目錄。請注意,自Spark 2.0.0以來,該hive.metastore.warehouse.dir屬性hive-site.xml已被棄用。而是使用spark.sql.warehouse.dir指定倉庫中數據庫的默認位置。您可能須要向啓動Spark應用程序的用戶授予寫權限。
apache

import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object SparkHiveExample {

  case class Record(key: Int, value: String)

  def main(args: Array[String]) {
    // warehouseLocation指向託管數據庫的默認位置,表
    val warehouseLocation = new File("spark-warehouse").getAbsolutePath

    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    import spark.sql

    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

    //查詢以HiveQL
    sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    //聚合查詢也被支持。
    sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // SQL查詢的結果自己就是DataFrame,而且支持全部正常的函數。
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

    // DataFrames中的項目類型爲Row,它容許您經過序號訪問每一個列。
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    //您也可使用DataFrame在SparkSession中建立臨時視圖。
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")

    //查詢而後能夠將DataFrame數據與存儲在Hive中的數據結合起來。
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // |  5| val_5|  5| val_5|
    // ...
    spark.stop()
  }
}

  2) 指定Hive表格的存儲格式
      當你建立一個Hive表時,你須要定義這個表應該如何從/向文件系統讀/寫數據,即「輸入格式」和「輸出格式」。您還須要定義這個表應該如何將數據反序列化爲行,或者將行序列化爲數據,即「serde」。如下選項可用於指定存儲格式(「serde」,「輸入格式」,「輸出格式」),例如CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默認狀況下,咱們將以純文本形式讀取表格文件。請注意,Hive存儲處理程序在建立表時不受支持,您可使用Hive端的存儲處理程序建立一個表,而後使用Spark SQL來讀取它。 json

屬性名稱 含義
fileFormat fileFormat是一種存儲格式規範包,包括「serde」,「輸入格式」和「輸出格式」。目前咱們支持6個fileFormats:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 這兩個選項將相應的`InputFormat`和`OutputFormat`類的名字指定爲一個字符串,例如`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。這兩個選項必須成對出現,若是您
已經指定`fileFormat`選項,則不能指定它們。
serde 這個選項指定一個serde類的名字。當指定`fileFormat`選項時,若是給定的`fileFormat`
已經包含了serde的信息,就不要指定這個選項。目前「sequencefile」,「textfile」和「rcfile」不包
括serde信息,你可使用這個選項和這3個fileFormats。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這些選項只能與「textfile」fileFormat一塊兒使用。他們定義瞭如何將分隔文件讀入行。

定義的全部其餘屬性OPTIONS將被視爲Hive serde屬性。安全

  3) 與不一樣版本的Hive Metastore交互服務器

        Spark SQL的Hive支持最重要的部分之一是與Hive Metastore進行交互,這使Spark SQL能夠訪問Hive表的元數據。從Spark 1.4.0開始,使用下面描述的配置,可使用Spark SQL的單個二進制版本查詢不一樣版本的Hive metastore。請注意,獨立於用於與Metastore對話的Hive版本,內部Spark SQL將針對Hive 1.2.1進行編譯,並將這些類用於內部執行(serdes,UDF,UDAF等)。
如下選項可用於配置用於檢索元數據的Hive版本:併發

屬性名稱 默認 含義
spark.sql.hive.metastore.version       1.2.1 hive  Metastore版本。可用的選項是0.12.0經過1.2.1。
spark.sql.hive.metastore.jars        builtin 應該用來實例化HiveMetastoreClient的罐子的位置。該屬性能夠是如下三個選項之一:
1,builtin
-Phive啓用 時,使用與Spark程序集捆綁在一塊兒的Hive 1.2.1 。選擇此選項時,spark.sql.hive.metastore.version
必須是1.2.1或者沒有定義。
2,maven
使用從Maven存儲庫下載的指定版本的Hive jar。一般不建議將此配置用於生產部署。
3,JVM標準格式的類路徑。這個類路徑必須包含全部Hive及其依賴項,包括正確版本的Hadoop。
這些瓶子只須要在驅動程序中出現,可是若是您正在以紗線羣集模式運行,則必須確保它們與您的應用程序一塊兒打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
應該使用在Spark SQL和特定版本的Hive之間共享的類加載程序加載的類前綴的逗號分隔列表。
應該共享的類的示例是須要與Metastore對話的JDBC驅動程序。其餘須要共享的類是那些與已
經共享的類進行交互的類。例如,由log4j使用的自定義appender。
spark.sql.hive.metastore.barrierPrefixes (empty) 一個以逗號分隔的類前綴列表,應該針對Spark SQL正在與之進行通訊的每一個Hive版本顯式
從新加載。例如,Hive UDF聲明在一般會被共享(即org.apache.spark.*)的前綴中。

2,JDBC到其餘數據庫

       1) Spark SQL還包含一個可使用JDBC從其餘數據庫讀取數據的數據源。這個功能應該比使用JdbcRDD更受歡迎。這是由於結果做爲DataFrame返回,而且能夠輕鬆地在Spark SQL中處理或者與其餘數據源結合使用。JDBC數據源也更容易從Java或Python使用,由於它不須要用戶提供ClassTag。(請注意,這與Spark SQL JDBC服務器不一樣,後者容許其餘應用程序使用Spark SQL運行查詢)。
       要開始,您將須要在Spark類路徑中爲您的特定數據庫包含JDBC驅動程序。例如,要從Spark Shell鏈接到postgres,您能夠運行如下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

       遠程數據庫中的表可使用Data Sources API做爲DataFrame或Spark SQL臨時視圖加載。用戶能夠在數據源選項中指定JDBC鏈接屬性。 user和password一般用於登陸到數據源提供爲鏈接屬性。除了鏈接屬性以外,Spark還支持如下不區分大小寫的選項:

屬性名稱 含義
url 要鏈接到的JDBC URL。源特定的鏈接屬性能夠在URL中指定。例如,jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 應該讀取的JDBC表。請注意,FROM可使用在SQL查詢的子句中有效的任何內容。例如,而不是一個完整的表,你也能夠在括號中使用子查詢。
driver 用於鏈接到此URL的JDBC驅動程序的類名。
partitionColumn, lowerBound, upperBound 若是指定了這些選項,則必須指定這些選項。另外, numPartitions必須指定。他們描述瞭如何從多個工做人員平行讀取時對錶格進行分區。 partitionColumn必須是相關表格中的數字列。請注意,lowerBound和upperBound只是用來決定分區步幅,而不是在表中過濾行。因此表中的全部行都將被分區並返回。這個選項只適用於閱讀。
numPartitions 表格讀取和寫入中可用於並行的分區的最大數目。這也決定了併發JDBC鏈接的最大數量。若是要寫入的分區數量超過此限制,則coalesce(numPartitions)在寫入以前調用它,將其減小到此限制。
fetchsize JDBC提取大小,它決定每次往返取多少行。這能夠幫助默認爲低讀取大小的JDBC驅動程序(例如,具備10行的Oracle)執行性能。這個選項只適用於閱讀。
batchsize JDBC批量大小,用於肯定每次往返要插入多少行。這能夠幫助JDBC驅動程序的性能。這個選項只適用於寫做。它默認爲1000。
isolationLevel 事務隔離級別,適用於當前鏈接。它能夠是一個NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對應於由JDBC的鏈接對象定義,缺省值爲標準事務隔離級別READ_UNCOMMITTED。這個選項只適用於寫做。請參閱中的文檔java.sql.Connection。
truncate 這是一個JDBC編寫器相關的選項。當SaveMode.Overwrite啓用時,此選項會致使Spark截斷現有的表,而不是刪除並從新建立它。這能夠更高效,並防止表元數據(例如,索引)被刪除。可是,在某些狀況下,例如新數據具備不一樣的模式時,它將不起做用。它默認爲false。這個選項只適用於寫做。
createTableOptions 這是一個JDBC編寫器相關的選項。若是指定,則此選項容許在建立表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)時設置數據庫特定的表和分區選項。這個選項只適用於寫做。
createTableColumnTypes 建立表時使用的數據庫列數據類型,而不是默認值。數據類型信息應該使用與CREATE TABLE列語法相同的格式來指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的類型應該是有效的spark sql數據類型,該選項僅適用於寫入。


  例子以下:

private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    //注意:能夠經過load / save或jdbc方法來實現JDBC加載和保存
    //從JDBC源加載數據
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    //將數據保存到JDBC源
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

    //在寫入
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()

    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

  }

 

  2) 可能遇到的異常和故障排除 

       JDBC驅動程序類必須對客戶端會話和全部執行者上的原始類加載器可見。這是由於Java的DriverManager類執行了一個安全檢查,致使它忽略了當打開一個鏈接時,原始類加載器不可見的全部驅動程序。一個方便的方法是修改全部工做節點上的compute_classpath.sh以包含驅動程序JAR。        某些數據庫(如H2)將全部名稱轉換爲大寫。您須要使用大寫字母來引用Spark SQL中的這些名稱。

相關文章
相關標籤/搜索