Spark 支持如下六個核心數據源,同時 Spark 社區還提供了多達上百種數據源的讀取方式,可以知足絕大部分使用場景。html
注:如下全部測試文件都可從本倉庫的resources 目錄進行下載java
全部讀取 API 遵循如下調用格式:mysql
// 格式 DataFrameReader.format(...).option("key", "value").schema(...).load() // 示例 spark.read.format("csv") .option("mode", "FAILFAST") // 讀取模式 .option("inferSchema", "true") // 是否自動推斷 schema .option("path", "path/to/file(s)") // 文件路徑 .schema(someSchema) // 使用預約義的 schema .load()
讀取模式有如下三種可選項:git
讀模式 | 描述 |
---|---|
permissive |
當遇到損壞的記錄時,將其全部字段設置爲 null,並將全部損壞的記錄放在名爲 _corruption t_record 的字符串列中 |
dropMalformed |
刪除格式不正確的行 |
failFast |
遇到格式不正確的數據時當即失敗 |
// 格式 DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save() //示例 dataframe.write.format("csv") .option("mode", "OVERWRITE") //寫模式 .option("dateFormat", "yyyy-MM-dd") //日期格式 .option("path", "path/to/file(s)") .save()
寫數據模式有如下四種可選項:github
Scala/Java | 描述 |
---|---|
SaveMode.ErrorIfExists |
若是給定的路徑已經存在文件,則拋出異常,這是寫數據默認的模式 |
SaveMode.Append |
數據以追加的方式寫入 |
SaveMode.Overwrite |
數據以覆蓋的方式寫入 |
SaveMode.Ignore |
若是給定的路徑已經存在文件,則不作任何操做 |
CSV 是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每一個字段用逗號分隔。sql
自動推斷類型讀取讀取示例:數據庫
spark.read.format("csv") .option("header", "false") // 文件中的第一行是否爲列的名稱 .option("mode", "FAILFAST") // 是否快速失敗 .option("inferSchema", "true") // 是否自動推斷 schema .load("/usr/file/csv/dept.csv") .show()
使用預約義類型:apache
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType} //預約義數據格式 val myManualSchema = new StructType(Array( StructField("deptno", LongType, nullable = false), StructField("dname", StringType,nullable = true), StructField("loc", StringType,nullable = true) )) spark.read.format("csv") .option("mode", "FAILFAST") .schema(myManualSchema) .load("/usr/file/csv/dept.csv") .show()
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
也能夠指定具體的分隔符:json
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
爲節省主文篇幅,全部讀寫配置項見文末 9.1 小節。api
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
須要注意的是:默認不支持一條數據記錄跨越多行 (以下),能夠經過配置 multiLine
爲 true
來進行更改,其默認值爲 false
。
// 默認支持單行 {"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"} //默認不支持多行 { "DEPTNO": 10, "DNAME": "ACCOUNTING", "LOC": "NEW YORK" }
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
爲節省主文篇幅,全部讀寫配置項見文末 9.2 小節。
Parquet 是一個開源的面向列的數據存儲,它提供了多種存儲優化,容許讀取單獨的列非整個文件,這不只節省了存儲空間並且提高了讀取效率,它是 Spark 是默認的文件格式。
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
Parquet 文件有着本身的存儲規則,所以其可選配置項比較少,經常使用的有以下兩個:
讀寫操做 | 配置項 | 可選值 | 默認值 | 描述 |
---|---|---|---|---|
Write | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
None | 壓縮文件格式 |
Read | mergeSchema | true, false | 取決於配置項 spark.sql.parquet.mergeSchema |
當爲真時,Parquet 數據源將全部數據文件收集的 Schema 合併在一塊兒,不然將從摘要文件中選擇 Schema,若是沒有可用的摘要文件,則從隨機數據文件中選擇 Schema。 |
更多可選配置能夠參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
ORC 是一種自描述的、類型感知的列文件格式,它針對大型數據的讀寫進行了優化,也是大數據中經常使用的文件格式。
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
Spark 一樣支持與傳統的關係型數據庫進行數據讀寫。可是 Spark 程序默認是沒有提供數據庫驅動的,因此在使用前須要將對應的數據庫驅動上傳到安裝目錄下的 jars
目錄中。下面示例使用的是 Mysql 數據庫,使用前須要將對應的 mysql-connector-java-x.x.x.jar
上傳到 jars
目錄下。
讀取全表數據示例以下,這裏的 help_keyword
是 mysql 內置的字典表,只有 help_keyword_id
和 name
兩個字段。
spark.read .format("jdbc") .option("driver", "com.mysql.jdbc.Driver") //驅動 .option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //數據庫地址 .option("dbtable", "help_keyword") //表名 .option("user", "root").option("password","root").load().show(10)
從查詢結果讀取數據:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords""" spark.read.format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/mysql") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root").option("password", "root") .option("dbtable", pushDownQuery) .load().show() //輸出 +---------------+-----------+ |help_keyword_id| name| +---------------+-----------+ | 0| <>| | 1| ACTION| | 2| ADD| | 3|AES_DECRYPT| | 4|AES_ENCRYPT| | 5| AFTER| | 6| AGAINST| | 7| AGGREGATE| | 8| ALGORITHM| | 9| ALL| | 10| ALTER| | 11| ANALYSE| | 12| ANALYZE| | 13| AND| | 14| ARCHIVE| | 15| AREA| | 16| AS| | 17| ASBINARY| | 18| ASC| | 19| ASTEXT| +---------------+-----------+
也可使用以下的寫法進行數據的過濾:
val props = new java.util.Properties props.setProperty("driver", "com.mysql.jdbc.Driver") props.setProperty("user", "root") props.setProperty("password", "root") val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定數據過濾條件 spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() //輸出: +---------------+-----------+ |help_keyword_id| name| +---------------+-----------+ | 0| <>| | 1| ACTION| | 2| ADD| | 3|AES_DECRYPT| | 4|AES_ENCRYPT| | 5| AFTER| | 6| AGAINST| | 7| AGGREGATE| | 8| ALGORITHM| | 9| ALL| | 604| WHEN| +---------------+-----------+
可使用 numPartitions
指定讀取數據的並行度:
option("numPartitions", 10)
在這裏,除了能夠指定分區外,還能夠設置上界和下界,任何小於下界的值都會被分配在第一個分區中,任何大於上界的值都會被分配在最後一個分區中。
val colName = "help_keyword_id" //用於判斷上下界的列 val lowerBound = 300L //下界 val upperBound = 500L //上界 val numPartitions = 10 //分區綜述 val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword", colName,lowerBound,upperBound,numPartitions,props)
想要驗證分區內容,可使用 mapPartitionsWithIndex
這個算子,代碼以下:
jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => { val buffer = new ListBuffer[String] while (iterator.hasNext) { buffer.append(index + "分區:" + iterator.next()) } buffer.toIterator }).foreach(println)
執行結果以下:help_keyword
這張表只有 600 條左右的數據,原本數據應該均勻分佈在 10 個分區,可是 0 分區裏面卻有 319 條數據,這是由於設置了下限,全部小於 300 的數據都會被限制在第一個分區,即 0 分區。同理全部大於 500 的數據被分配在 9 分區,即最後一個分區。
val df = spark.read.format("json").load("/usr/file/json/emp.json") df.write .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/mysql") .option("user", "root").option("password", "root") .option("dbtable", "emp") .save()
Text 文件在讀寫性能方面並無任何優點,且不能表達明確的數據結構,因此其使用的比較少,讀寫操做以下:
spark.read.textFile("/usr/file/txt/dept.txt").show()
df.write.text("/tmp/spark/txt/dept")
多個 Executors 不能同時讀取同一個文件,但它們能夠同時讀取不一樣的文件。這意味着當您從一個包含多個文件的文件夾中讀取數據時,這些文件中的每個都將成爲 DataFrame 中的一個分區,並由可用的 Executors 並行讀取。
寫入的文件或數據的數量取決於寫入數據時 DataFrame 擁有的分區數量。默認狀況下,每一個數據分區寫一個文件。
分區和分桶這兩個概念和 Hive 中分區表和分桶表是一致的。都是將數據按照必定規則進行拆分存儲。須要注意的是 partitionBy
指定的分區和 RDD 中分區不是一個概念:這裏的分區表現爲輸出目錄的子目錄,數據分別存儲在對應的子目錄中。
val df = spark.read.format("json").load("/usr/file/json/emp.json") df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
輸出結果以下:能夠看到輸出被按照部門編號分爲三個子目錄,子目錄中才是對應的輸出文件。
分桶寫入就是將數據按照指定的列和桶數進行散列,目前分桶寫入只支持保存爲表,實際上這就是 Hive 的分桶表。
val numberBuckets = 10 val columnToBucketBy = "empno" df.write.format("parquet").mode("overwrite") .bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
若是寫入產生小文件數量過多,這時會產生大量的元數據開銷。Spark 和 HDFS 同樣,都不能很好的處理這個問題,這被稱爲「small file problem」。同時數據文件也不能過大,不然在查詢時會有沒必要要的性能開銷,所以要把文件大小控制在一個合理的範圍內。
在上文咱們已經介紹過能夠經過分區數量來控制生成文件的數量,從而間接控制文件大小。Spark 2.2 引入了一種新的方法,以更自動化的方式控制文件大小,這就是 maxRecordsPerFile
參數,它容許你經過控制寫入文件的記錄數來控制文件大小。
// Spark 將確保文件最多包含 5000 條記錄 df.write.option(「maxRecordsPerFile」, 5000)
讀\寫操做 | 配置項 | 可選值 | 默認值 | 描述 |
---|---|---|---|---|
Both | seq | 任意字符 | , (逗號) |
分隔符 |
Both | header | true, false | false | 文件中的第一行是否爲列的名稱。 |
Read | escape | 任意字符 | \ | 轉義字符 |
Read | inferSchema | true, false | false | 是否自動推斷列類型 |
Read | ignoreLeadingWhiteSpace | true, false | false | 是否跳過值前面的空格 |
Both | ignoreTrailingWhiteSpace | true, false | false | 是否跳過值後面的空格 |
Both | nullValue | 任意字符 | 「」 | 聲明文件中哪一個字符表示空值 |
Both | nanValue | 任意字符 | NaN | 聲明哪一個值表示 NaN 或者缺省值 |
Both | positiveInf | 任意字符 | Inf | 正無窮 |
Both | negativeInf | 任意字符 | -Inf | 負無窮 |
Both | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
none | 文件壓縮格式 |
Both | dateFormat | 任何能轉換爲 Java 的 SimpleDataFormat 的字符串 |
yyyy-MM-dd | 日期格式 |
Both | timestampFormat | 任何能轉換爲 Java 的 SimpleDataFormat 的字符串 |
yyyy-MMdd’T’HH:mm:ss.SSSZZ | 時間戳格式 |
Read | maxColumns | 任意整數 | 20480 | 聲明文件中的最大列數 |
Read | maxCharsPerColumn | 任意整數 | 1000000 | 聲明一個列中的最大字符數。 |
Read | escapeQuotes | true, false | true | 是否應該轉義行中的引號。 |
Read | maxMalformedLogPerPartition | 任意整數 | 10 | 聲明每一個分區中最多容許多少條格式錯誤的數據,超過這個值後格式錯誤的數據將不會被讀取 |
Write | quoteAll | true, false | false | 指定是否應該將全部值都括在引號中,而不僅是轉義具備引號字符的值。 |
Read | multiLine | true, false | false | 是否容許每條完整記錄跨域多行 |
讀\寫操做 | 配置項 | 可選值 | 默認值 |
---|---|---|---|
Both | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
none |
Both | dateFormat | 任何能轉換爲 Java 的 SimpleDataFormat 的字符串 | yyyy-MM-dd |
Both | timestampFormat | 任何能轉換爲 Java 的 SimpleDataFormat 的字符串 | yyyy-MMdd’T’HH:mm:ss.SSSZZ |
Read | primitiveAsString | true, false | false |
Read | allowComments | true, false | false |
Read | allowUnquotedFieldNames | true, false | false |
Read | allowSingleQuotes | true, false | true |
Read | allowNumericLeadingZeros | true, false | false |
Read | allowBackslashEscapingAnyCharacter | true, false | false |
Read | columnNameOfCorruptRecord | true, false | Value of spark.sql.column&NameOf |
Read | multiLine | true, false | false |
屬性名稱 | 含義 |
---|---|
url | 數據庫地址 |
dbtable | 表名稱 |
driver | 數據庫驅動 |
partitionColumn, lowerBound, upperBoun |
分區總數,上界,下界 |
numPartitions | 可用於表讀寫並行性的最大分區數。若是要寫的分區數量超過這個限制,那麼能夠調用 coalesce(numpartition) 重置分區數。 |
fetchsize | 每次往返要獲取多少行數據。此選項僅適用於讀取數據。 |
batchsize | 每次往返插入多少行數據,這個選項只適用於寫入數據。默認值是 1000。 |
isolationLevel | 事務隔離級別:能夠是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即標準事務隔離級別。 默認值是 READ_UNCOMMITTED。這個選項只適用於數據讀取。 |
createTableOptions | 寫入數據時自定義建立表的相關配置 |
createTableColumnTypes | 寫入數據時自定義建立列的列類型 |
數據庫讀寫更多配置能夠參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南