Spark SQL數據源

背景

Spark SQL是Spark的一個模塊,用於結構化數據的處理。git

++++++++++++++  +++++++++++++++++++++
|     SQL    |  |    Dataset API    |
++++++++++++++  +++++++++++++++++++++

+++++++++++++++++++++++++++++++++++++
|              Spark SQL            |
+++++++++++++++++++++++++++++++++++++

使用Spark SQL的方式有2種,能夠經過SQL或者Dataset API,這兩種使用方式在本文都會涉及。github

其中,經過SQL接口使用的方法具體又可分爲3種:sql

  • 在程序中執行
  • 使用命令行
  • Jdbc/ODBC

這裏只會介紹第一種方式。數據庫

Spark關於分佈式數據集的抽象本來是RDD,Dataset是其升級版本。DataFrame是特殊的Dataset,它限定元素是按照命名的列來組織的,從這一點看至關於關係型數據庫中的表。DataFrame等價於Dataset[Row],並且DataFrame是本文內容的核心。apache

DataFrame支持豐富的數據源:json

+++++++++++++++++++
| 結構數據文件     |
|                 |
|  +++++++++++++  |   ++++++++++++++++
|  |  parquet  |  |   |  Hive table  |
|  +++++++++++++  |   ++++++++++++++++
|                 |
|  +++++++++++++  |   ++++++++++++++++
|  |    csv    |  |   |  關係數據庫   |
|  +++++++++++++  |   ++++++++++++++++
|                 |
|  +++++++++++++  |   ++++++++++++++++
|  |   json    |  |   |     RDD      |
|  +++++++++++++  |   ++++++++++++++++
|                 |
+++++++++++++++++++

這裏的每一種數據源咱們都會進行介紹。數組

本文主要介紹DataFrame和各數據源的IO操做,後面再寫一篇文章介紹基於DataFrame的使用操做。即:本文關注如何獲得一個DataFrame,如何將一個DataFrame進行持久化;後面要寫的文章則關注如何使用DataFrame。session

相關的開源項目demo-spark在github上。app

數據源

這個圖概述了本文介紹的主要內容,它也能夠做爲後續的備忘和參考。

這個圖中包含兩種箭頭,寬箭頭表示數據的流向,細箭頭表示提供構造實例的方法。

好比DataFrame - DataFrameWriter - 存儲的粗箭頭,表示數據從內存經過DataFrameWriter流向存儲;SparkSession - DataFrameReader的細箭頭,表示能夠從SparkSession對象建立DataFrameReader對象。

SparkSession

使用Spark SQL必須先構造SparkSession實例,時候以後須要調用其stop方法釋放資源。模板以下:

val spark = org.apache.spark.sql.SparkSession
  .builder()
  .appName("Spark SQL basic demo")
  .master("local")
  .getOrCreate()

// work with spark

spark.stop()

下文中出現的全部的spark,如無特殊說明,都是指按照上述代碼建立的SparkSession對象。

parquet

parquet文件是怎麼樣的?

PAR1 "&,   @   Alyssa   Ben ,   
0      red 88,
      @                \Hexample.avro.User % name%  %favorite_color%  5 favorite_numbers %array <&% nameDH&  &P5 favorite_color<@&P  &?% (favorite_numbersarray
ZZ&?  ? avro.schema?{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_color","type":["string","null"]},{"name":"favorite_numbers","type":{"type":"array","items":"int"}}]} parquet-mr version 1.4.3 ?  PAR1

它不是一個單純的文本文件,包含了一些沒法渲染的特殊字符。

parquet是默認的格式。從一個parquet文件讀取數據的代碼以下:

val usersDF = spark.read.load("src/main/resources/users.parquet")

spark.read返回DataFrameReader對象,其load方法加載文件中的數據並返回DataFrame對象。這個能夠參照上文的閉環圖理解。

咱們能夠調用Dataset#show()方法查看其內容:

usersDF.show()

輸出結果:

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

將一個DataFrame寫到parquet文件的代碼以下:

usersDF.write.save("output/parquet/")

DataFrame#write()方法返回DataFrameWriter對象實例,save方法將數據持久化爲parquet格式的文件。save的參數是一個目錄,並且要求最底層的目錄是不存在的,下文類同。

另一種寫的方式是:

peopleDF.write.parquet("output/parquet/")

這兩種方式的本質相同。

csv

csv是什麼樣的?

csv又稱爲逗號分隔符,即:使用逗號分隔一條數據中各字段的值。csv文件能夠被excel解析,可是其本質只是一個文本文件。好比下面是一份csv文件的內容:

age,name
,Michael
30,Andy
19,Justin

第一行是表頭,可是它和下面的數據並無什麼區別。因此在讀取的時候,必須告訴讀入器這個文件是有表頭的,它(第一行)纔會被解析成表頭,不然就會被當成數據。

好比,解析表頭的讀:

spark.read.option("header", true).format("csv").load("output/csv/").show()

其中的option("header", true)就是告訴讀入器這個文件是有表頭的。

輸出爲:

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

不解析表頭的讀:

spark.read.format("csv").load("output/csv/").show()

輸出爲:

+----+-------+
| _c0|    _c1|
+----+-------+
| age|   name|
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

spark自動構建了兩個字段:_c0_c1,而把agename當成了一行數據。

另一種簡化的讀法:

spark.read.option("header", true).csv("output/csv/")

其原理和上文中介紹的其餘格式的文件相同。

將DataFrame寫入到csv文件時也須要注意表頭,將表頭也寫入文件的方式:

peopleDF.write.option("header", true).format("csv").save("output/csv/")

不寫表頭,只寫數據的方式:

peopleDF.write.format("csv").save("output/csv/")

另一種簡化的寫法是:

peopleDF.write.csv("output/csv/")

json

json文件是怎麼樣的?

上文中說過,DataFrame至關於關係數據庫中的表,那麼每一條數據至關於一行記錄。關係數據庫表又能夠至關於一個類,每一行數據至關於具體的對象,因此DataFrame的每一條數據至關於一個對象。

DataFrame對要讀取的json有特殊的要求:即每一條數據做爲一行,總體不能包裝成數組。好比:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

而一個標準的json應該是下面這樣:

[{
        "name": "Michael"
    }, {
        "name": "Andy",
        "age": 30
    }, {
        "name": "Justin",
        "age": 19
    }
]

使用下面的方式讀取json文件內容:

val peopleDF = spark.read.format("json").load(path)

這種讀取的方式和上文parquet的讀取方式一致,最終都是調用load方法。只是多了一段format("json"),這是由於parquet是默認的格式,而json不是,因此必須明確聲明。

還有一種簡化的方式,其本質仍是上述的代碼:

val peopleDF = spark.read.json(path)

將一個DataFrame寫到json文件的方式:

peopleDF.write.format("json").save("output/json/")

一樣的道理,和保存爲parquet格式文件相比,這裏多了一段format("json")代碼。

另一種簡略的寫法:

peopleDF.write.json("output/json/")

二者的本質是相同的。

jdbc

spark能夠直接經過jdbc讀取關係型數據庫中指定的表。有兩種讀取的方式,一種是將全部的參數都做爲option一條條設置:

val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "vulcanus_ljl.data_dict")
  .option("user", "vulcanus_ljl")
  .option("password", "mypassword")
  .load()

另外一種是預先將參數封裝到Properties對象裏:

val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"

val connectionProperties = new Properties()
connectionProperties.put("user", "vulcanus_ljl")
connectionProperties.put("password", "mypassword")

val jdbcDF2 = spark.read
  .jdbc(url, "vulcanus_ljl.data_dict", connectionProperties)

spark還能夠經過jdbc將DataFrame寫入到一張新表(表必須不存在),寫入的方式一樣分爲兩種:

jdbcDF.write
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "vulcanus_ljl.data_dict_temp1")
  .option("user", "vulcanus_ljl")
  .option("password", "mypassword")
  .option("createTableColumnTypes", "dict_name varchar(60), dict_type varchar(60)") // 沒有指定的字段使用默認的類型
  .save()

jdbcDF2.write
  .jdbc(url, "vulcanus_ljl.data_dict_temp2", connectionProperties)

其中,urlconnectionProperties的內容同上文讀取時的設置。

寫入時能夠經過createTableColumnTypes設置指定多個字段的類型,其餘沒有指定的字段會使用默認的類型。

table

準備table

Spark SQL不須要依賴於一個已經存在的Hive,能夠經過下面的代碼生成本地的倉庫:

import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")

CREATE TABLE...用來建立表,LOAD DATA用來將數據加載到表中。kv1.txt的文件內容以下:

238val_238
86val_86
311val_311
27val_27
165val_165
409val_409
255val_255
278val_278
98val_98
484val_484

讀取

使用下面的代碼讀取指定表,並打印前5條數據:

spark.read.table("src").show(5)

輸出:

+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
+---+-------+

寫入

使用下面的代碼,將DataFrame的數據寫入到一張新表:

tableDF.write.saveAsTable("src_bak")

若是要寫入一張已經存在的表,須要按照下面的方式:

tableDF.write.mode(SaveMode.Append).saveAsTable("src_bak")

鏈接一個已存在的Hive

hive-site.xml放到項目的src/main/resources目錄下,spark會自動識別該配置文件,以後全部針對Hive table的讀寫都是根據配置做用於一個已存在的Hive的。

text

text文件是不包含格式信息的,將text讀取爲DataFrame須要額外補充格式信息,具體又細分爲兩種狀況:一種是格式是提早約定好的,另外一種是在運行時才能肯定格式。

下面針對這兩種不一樣的狀況分別介紹如何讀寫text的文件,text文件的內容以下:

Michael, 29
Andy, 30
Justin, 19

格式提早肯定

讀入text文件:

case class Person(name: String, age: Long)

private def runInferSchemaExample(spark: SparkSession): Unit = {
  // $example on:schema_inferring$
  // For implicit conversions from RDDs to DataFrames
  import spark.implicits._

  // Create an RDD of Person objects from a text file, convert it to a Dataframe
  val peopleDF = spark.read.textFile("src/main/resources/people.txt")
    .map(_.split(","))
    .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
    .toDF()
  peopleDF.show()
}

case class Person就是提早約定的text文件的格式,spark.read.textFile返回的是Dataset[String]類型,text的每一行做爲一條數據。

import spark.implicits._是必要的,不然會報異常(我尚未對這塊進行研究,沒法給出詳細的解釋)。

寫DataFrame到text文件,必須先把DataFrame轉換成只有一列的數據集。好比對於上面的peopleDF,它的元素類型是Person,含有name和age兩列,直接寫就會拋出下面的異常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/E:/projects/shouzheng/demo-spark/output/text already exists.;

寫的方式以下:

peopleDF.map(person => person.getAs[String]("name") + "," + person.getAs[String]("age")).write.text("output/text")

格式在運行時肯定

格式在運行時肯定,是說咱們不是在編碼階段預知數據的格式,因此沒法預先定義好對應的case class。多是由於咱們須要解析不少的數據格式,每一種格式都定義case class不合適;多是由於咱們須要支持格式的動態擴展,能支持新的格式;多是由於咱們要處理的格式不穩定,可能發生變化...無論什麼緣由,其結果一致:咱們只能經過更加動態的方式來解析數據的格式。

在這種狀況下,咱們依然須要獲取數據的格式。初步獲取的結果多是常見的形式,好比字符串,而後解析並構造特定的類型StructType來表示數據的格式。

而後咱們讀取text文件,將內容轉換爲RDD[Row]類型,其中每個元素的屬性和StructType類型中聲明的field是一一對應的。

準備好了表明schema的StructType和表明數據的RDD[Row],咱們就能夠建立DataFrame對象了:

import spark.implicits._
    val peopleRDD = spark.sparkContext.textFile("src/main/resources/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    // Convert records of the RDD (people) to Rows
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))

    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)

寫的方式同上,再也不贅述。

總結

  1. 最核心的思想都在上面的那張閉環圖上。
  2. 大部分數據源都有兩種讀寫的方式:一種是指定format,一種是直接以格式名做爲方法名。
相關文章
相關標籤/搜索