目錄mysql
Spark SQL是Spark的一個模塊,用於結構化數據的處理。git
++++++++++++++ +++++++++++++++++++++ | SQL | | Dataset API | ++++++++++++++ +++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++ | Spark SQL | +++++++++++++++++++++++++++++++++++++
使用Spark SQL的方式有2種,能夠經過SQL或者Dataset API,這兩種使用方式在本文都會涉及。github
其中,經過SQL接口使用的方法具體又可分爲3種:sql
這裏只會介紹第一種方式。數據庫
Spark關於分佈式數據集的抽象本來是RDD,Dataset是其升級版本。DataFrame是特殊的Dataset,它限定元素是按照命名的列來組織的,從這一點看至關於關係型數據庫中的表。DataFrame等價於Dataset[Row],並且DataFrame是本文內容的核心。apache
DataFrame支持豐富的數據源:json
+++++++++++++++++++ | 結構數據文件 | | | | +++++++++++++ | ++++++++++++++++ | | parquet | | | Hive table | | +++++++++++++ | ++++++++++++++++ | | | +++++++++++++ | ++++++++++++++++ | | csv | | | 關係數據庫 | | +++++++++++++ | ++++++++++++++++ | | | +++++++++++++ | ++++++++++++++++ | | json | | | RDD | | +++++++++++++ | ++++++++++++++++ | | +++++++++++++++++++
這裏的每一種數據源咱們都會進行介紹。數組
本文主要介紹DataFrame和各數據源的IO操做,後面再寫一篇文章介紹基於DataFrame的使用操做。即:本文關注如何獲得一個DataFrame,如何將一個DataFrame進行持久化;後面要寫的文章則關注如何使用DataFrame。session
相關的開源項目demo-spark在github上。app
這個圖概述了本文介紹的主要內容,它也能夠做爲後續的備忘和參考。
這個圖中包含兩種箭頭,寬箭頭表示數據的流向,細箭頭表示提供構造實例的方法。
好比DataFrame - DataFrameWriter - 存儲
的粗箭頭,表示數據從內存經過DataFrameWriter流向存儲;SparkSession - DataFrameReader
的細箭頭,表示能夠從SparkSession對象建立DataFrameReader對象。
使用Spark SQL必須先構造SparkSession實例,時候以後須要調用其stop
方法釋放資源。模板以下:
val spark = org.apache.spark.sql.SparkSession .builder() .appName("Spark SQL basic demo") .master("local") .getOrCreate() // work with spark spark.stop()
下文中出現的全部的spark
,如無特殊說明,都是指按照上述代碼建立的SparkSession
對象。
parquet文件是怎麼樣的?
PAR1 "&, @ Alyssa Ben , 0 red 88, @ \Hexample.avro.User % name% %favorite_color% 5 favorite_numbers %array <&% nameDH& &P5 favorite_color<@&P &?% (favorite_numbersarray ZZ&? ? avro.schema?{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_color","type":["string","null"]},{"name":"favorite_numbers","type":{"type":"array","items":"int"}}]} parquet-mr version 1.4.3 ? PAR1
它不是一個單純的文本文件,包含了一些沒法渲染的特殊字符。
parquet是默認的格式。從一個parquet文件讀取數據的代碼以下:
val usersDF = spark.read.load("src/main/resources/users.parquet")
spark.read
返回DataFrameReader
對象,其load方法加載文件中的數據並返回DataFrame
對象。這個能夠參照上文的閉環圖理解。
咱們能夠調用Dataset#show()
方法查看其內容:
usersDF.show()
輸出結果:
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
將一個DataFrame寫到parquet文件的代碼以下:
usersDF.write.save("output/parquet/")
DataFrame#write()
方法返回DataFrameWriter
對象實例,save
方法將數據持久化爲parquet格式的文件。save
的參數是一個目錄,並且要求最底層的目錄是不存在的,下文類同。
另一種寫的方式是:
peopleDF.write.parquet("output/parquet/")
這兩種方式的本質相同。
csv是什麼樣的?
csv又稱爲逗號分隔符,即:使用逗號分隔一條數據中各字段的值。csv文件能夠被excel解析,可是其本質只是一個文本文件。好比下面是一份csv文件的內容:
age,name ,Michael 30,Andy 19,Justin
第一行是表頭,可是它和下面的數據並無什麼區別。因此在讀取的時候,必須告訴讀入器這個文件是有表頭的,它(第一行)纔會被解析成表頭,不然就會被當成數據。
好比,解析表頭的讀:
spark.read.option("header", true).format("csv").load("output/csv/").show()
其中的option("header", true)
就是告訴讀入器這個文件是有表頭的。
輸出爲:
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
不解析表頭的讀:
spark.read.format("csv").load("output/csv/").show()
輸出爲:
+----+-------+ | _c0| _c1| +----+-------+ | age| name| |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
spark自動構建了兩個字段:_c0
和_c1
,而把age
和name
當成了一行數據。
另一種簡化的讀法:
spark.read.option("header", true).csv("output/csv/")
其原理和上文中介紹的其餘格式的文件相同。
將DataFrame寫入到csv文件時也須要注意表頭,將表頭也寫入文件的方式:
peopleDF.write.option("header", true).format("csv").save("output/csv/")
不寫表頭,只寫數據的方式:
peopleDF.write.format("csv").save("output/csv/")
另一種簡化的寫法是:
peopleDF.write.csv("output/csv/")
json文件是怎麼樣的?
上文中說過,DataFrame至關於關係數據庫中的表,那麼每一條數據至關於一行記錄。關係數據庫表又能夠至關於一個類,每一行數據至關於具體的對象,因此DataFrame的每一條數據至關於一個對象。
DataFrame對要讀取的json有特殊的要求:即每一條數據做爲一行,總體不能包裝成數組。好比:
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
而一個標準的json應該是下面這樣:
[{ "name": "Michael" }, { "name": "Andy", "age": 30 }, { "name": "Justin", "age": 19 } ]
使用下面的方式讀取json文件內容:
val peopleDF = spark.read.format("json").load(path)
這種讀取的方式和上文parquet的讀取方式一致,最終都是調用load
方法。只是多了一段format("json")
,這是由於parquet是默認的格式,而json不是,因此必須明確聲明。
還有一種簡化的方式,其本質仍是上述的代碼:
val peopleDF = spark.read.json(path)
將一個DataFrame寫到json文件的方式:
peopleDF.write.format("json").save("output/json/")
一樣的道理,和保存爲parquet格式文件相比,這裏多了一段format("json")
代碼。
另一種簡略的寫法:
peopleDF.write.json("output/json/")
二者的本質是相同的。
spark能夠直接經過jdbc讀取關係型數據庫中指定的表。有兩種讀取的方式,一種是將全部的參數都做爲option一條條設置:
val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true" val jdbcDF = spark.read .format("jdbc") .option("url", url) .option("dbtable", "vulcanus_ljl.data_dict") .option("user", "vulcanus_ljl") .option("password", "mypassword") .load()
另外一種是預先將參數封裝到Properties
對象裏:
val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true" val connectionProperties = new Properties() connectionProperties.put("user", "vulcanus_ljl") connectionProperties.put("password", "mypassword") val jdbcDF2 = spark.read .jdbc(url, "vulcanus_ljl.data_dict", connectionProperties)
spark還能夠經過jdbc將DataFrame寫入到一張新表(表必須不存在),寫入的方式一樣分爲兩種:
jdbcDF.write .format("jdbc") .option("url", url) .option("dbtable", "vulcanus_ljl.data_dict_temp1") .option("user", "vulcanus_ljl") .option("password", "mypassword") .option("createTableColumnTypes", "dict_name varchar(60), dict_type varchar(60)") // 沒有指定的字段使用默認的類型 .save()
和
jdbcDF2.write .jdbc(url, "vulcanus_ljl.data_dict_temp2", connectionProperties)
其中,url
和connectionProperties
的內容同上文讀取時的設置。
寫入時能夠經過createTableColumnTypes
設置指定多個字段的類型,其餘沒有指定的字段會使用默認的類型。
Spark SQL不須要依賴於一個已經存在的Hive,能夠經過下面的代碼生成本地的倉庫:
import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")
CREATE TABLE...
用來建立表,LOAD DATA
用來將數據加載到表中。kv1.txt
的文件內容以下:
238val_238 86val_86 311val_311 27val_27 165val_165 409val_409 255val_255 278val_278 98val_98 484val_484
使用下面的代碼讀取指定表,並打印前5條數據:
spark.read.table("src").show(5)
輸出:
+---+-------+ |key| value| +---+-------+ |238|val_238| | 86| val_86| |311|val_311| | 27| val_27| |165|val_165| +---+-------+
使用下面的代碼,將DataFrame的數據寫入到一張新表:
tableDF.write.saveAsTable("src_bak")
若是要寫入一張已經存在的表,須要按照下面的方式:
tableDF.write.mode(SaveMode.Append).saveAsTable("src_bak")
將hive-site.xml
放到項目的src/main/resources
目錄下,spark會自動識別該配置文件,以後全部針對Hive table的讀寫都是根據配置做用於一個已存在的Hive的。
text文件是不包含格式信息的,將text讀取爲DataFrame須要額外補充格式信息,具體又細分爲兩種狀況:一種是格式是提早約定好的,另外一種是在運行時才能肯定格式。
下面針對這兩種不一樣的狀況分別介紹如何讀寫text的文件,text文件的內容以下:
Michael, 29 Andy, 30 Justin, 19
讀入text文件:
case class Person(name: String, age: Long) private def runInferSchemaExample(spark: SparkSession): Unit = { // $example on:schema_inferring$ // For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.read.textFile("src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() peopleDF.show() }
case class Person
就是提早約定的text文件的格式,spark.read.textFile
返回的是Dataset[String]
類型,text的每一行做爲一條數據。
import spark.implicits._
是必要的,不然會報異常(我尚未對這塊進行研究,沒法給出詳細的解釋)。
寫DataFrame到text文件,必須先把DataFrame轉換成只有一列的數據集。好比對於上面的peopleDF
,它的元素類型是Person
,含有name和age兩列,直接寫就會拋出下面的異常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/E:/projects/shouzheng/demo-spark/output/text already exists.;
寫的方式以下:
peopleDF.map(person => person.getAs[String]("name") + "," + person.getAs[String]("age")).write.text("output/text")
格式在運行時肯定,是說咱們不是在編碼階段預知數據的格式,因此沒法預先定義好對應的case class。多是由於咱們須要解析不少的數據格式,每一種格式都定義case class不合適;多是由於咱們須要支持格式的動態擴展,能支持新的格式;多是由於咱們要處理的格式不穩定,可能發生變化...無論什麼緣由,其結果一致:咱們只能經過更加動態的方式來解析數據的格式。
在這種狀況下,咱們依然須要獲取數據的格式。初步獲取的結果多是常見的形式,好比字符串,而後解析並構造特定的類型StructType
來表示數據的格式。
而後咱們讀取text文件,將內容轉換爲RDD[Row]
類型,其中每個元素的屬性和StructType
類型中聲明的field是一一對應的。
準備好了表明schema的StructType
和表明數據的RDD[Row]
,咱們就能夠建立DataFrame
對象了:
import spark.implicits._ val peopleRDD = spark.sparkContext.textFile("src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema)
寫的方式同上,再也不贅述。