spark SQL Parquet 文件的讀取與加載
sql
是由許多其餘數據處理系統支持的柱狀格式。Spark SQL支持閱讀和編寫自動保留原始數據模式的Parquet文件。在編寫Parquet文件時,出於兼容性緣由,全部列都會自動轉換爲空。編程
1, 以編程方式加載數據json
這裏使用上一節的例子中的數據:常規數據加載緩存
private def runBasicParquetExample(spark: SparkSession): Unit = { import spark.implicits._ // val peopleDF = spark.read.json("examples/src/main/resources/people.json") //DataFrames能夠保存爲Parquet文件,維護模式信息 peopleDF.write.parquet("people.parquet") //在上面建立的parquet文件中讀取 // Parquet文件是自描述的,因此模式被保存 //加載Parquet文件的結果也是一個DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet文件也能夠用來建立臨時視圖,而後在SQL語句 parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ }2,分區操做
表分區是像Hive這樣的系統中經常使用的優化方法。在分區表中,數據一般存儲在不一樣的目錄中,分區列值在每一個分區目錄的路徑中編碼。如今,Parquet數據源可以自動發現和推斷分區信息。例如,咱們可使用如下目錄結構,兩個額外的列gender和country分區列將全部之前使用的人口數據存儲到分區表中:
架構
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...經過傳遞path/to/table給SparkSession.read.parquet或者SparkSession.read.load,Spark SQL將自動從路徑中提取分區信息。如今,返回的DataFrame的模式變成:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)請注意,分區列的數據類型是自動推斷的。目前支持數字數據類型和字符串類型。有時用戶可能不但願自動推斷分區列的數據類型。對於這些用例,可使用spark.sql.sources.partitionColumnTypeInference.enabled默認 的自動類型推斷來配置true。當禁用類型推斷時,字符串類型將用於分區列。
像ProtocolBuffer,Avro和Thrift同樣,Parquet也支持模式演變。用戶能夠從簡單的模式開始,並根據須要逐漸向模式添加更多的列。經過這種方式,用戶可能會以不一樣的可是 相互兼容的模式結束多個Parquet文件。Parquet數據源如今能夠自動檢測這種狀況併合並全部這些文件的模式。
因爲模式合併是一個相對昂貴的操做,而且在大多數狀況下不是必需的,因此咱們從1.5.0開始默認關閉它。你能夠經過app
1) 將數據源選項設置mergeSchema爲true讀取Parquet文件(以下面的示例所示)工具
2)設置全局SQL選項spark.sql.parquet.mergeSchema來true。
性能
例子以下:優化
private def runParquetSchemaMergingExample(spark: SparkSession): Unit = { import spark.implicits._ // 建立一個簡單的DataFrame,存儲到一個分區目錄 val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") //在新的分區目錄中建立另外一個DataFrame, //添加一個新的列並刪除一個現存的列 val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") //讀取分區表 val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() //最終的模式由Parquet文件中的全部3列組成 //分區列出如今分區目錄路徑中 // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) // $example off:schema_merging$ }4, Hive metastore Parquet
Hive / Parquet Schema調解
Hive和Parquet從表模式處理的角度來看,有兩個關鍵的區別。
1)hive 是不區分大小寫的,而Parquet不是 編碼
2) Hive認爲全部列都是能夠空的,而Parquet的可空性是顯着的
因爲這個緣由,在將Hive metastore Parquet錶轉換爲Spark SQL Parquet表時,咱們必須將Hive Metastore模式與Parquet模式協調一致。協調規則是:
在兩個模式中具備相同名稱的字段必須具備相同的數據類型,而不論是否爲空。協調字段應該具備Parquet方面的數據類型,以保證可空性。
協調的模式剛好包含在Hive Metastore模式中定義的那些字段。
1)僅出如今Parquet模式中的任何字段將被放置在協調的模式中。
2) 僅在Hive Metastore模式中出現的任何字段纔會做爲可協調字段添加到協調模式中。
元數據刷新
Spark SQL緩存Parquet元數據以得到更好的性能。當Hive Metastore Parquet錶轉換啓用時,這些轉換表的元數據也被緩存。若是這些表由Hive或其餘外部工具更新,則須要手動刷新以確保一致的元數據。
spark.catalog.refreshTable("my_table")5,Configuration配置
Parquet的結構能夠用作setConf方法上SparkSession或經過運行 SET key=value使用SQL命令
Property Name |
Default | Meaning |
spark.sql.parquet.binaryAsString | false | 一些其餘派奎斯生產系統,特別是Impala,Hive和舊版本的Spark SQL, 在寫出Parquet架構時不會區分二進制數據和字符串。該標誌告訴Spark SQL 將二進制數據解釋爲字符串以提供與這些系統的兼容性。 |
spark.sql.parquet.int96AsTimestamp | true | 一些Parquet生產系統,特別是Impala和Hive,將時間戳存儲到INT96中。 該標誌告訴Spark SQL將INT96數據解釋爲一個時間戳,以提供與這些系統的兼容性。 |
spark.sql.parquet.cacheMetadata | true | 打開Parquet模式元數據的緩存。能夠加快查詢靜態數據。 |
spark.sql.parquet.compression.codec | snappy | 設置寫入Parquet文件時使用的壓縮編解碼器。可接受的值包括:未壓縮,快速, gzip,lzo。 |
spark.sql.parquet.filterPushdown | true | 設置爲true時啓用Parquet過濾器下推優化。 |
spark.sql.hive.convertMetastoreParquet | true | 當設置爲false時,Spark SQL將使用Hive SerDe來替代內置支持的Parquet表。 |
spark.sql.parquet.mergeSchema | false | 若是爲true,則Parquet數據源合併從全部數據文件收集的模式,不然若是 沒有摘要文件可用,則從摘要文件或隨機數據文件中選取模式。 |
spark.sql.optimizer.metadataOnly | true | 若是爲true,則啓用使用表元數據的僅限元數據查詢優化來生成分區列,而 不是表掃描。當掃描的全部列都是分區列時,該查詢將適用,而且查詢具備 知足不一樣語義的聚合運算符。 |