==> 什麼是parquetjava
Parquet 是列式存儲的一種文件類型
mysql
==> 官網描述:sql
Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming languageshell
不管數據處理框架,數據模型或編程語言的選擇如何,Apache Parquet都是Hadoop生態系統中任何項目可用的列式存儲格式數據庫
==> 由來apache
Parquet的靈感來自於2010年Google發表的Dremel論文,文中介紹了一種支持嵌套結構的存儲格式,而且使用了列式存儲的方式提高查詢性能,在Dremel論文中還介紹了Google如何使用這種存儲格式實現並行查詢的,若是對此感興趣能夠參考論文和開源實現Apache Drill。編程
==> 特色:json
---> 能夠跳過不符合條件的數據,只讀取須要的數據,下降 IO 數據量數據結構
---> 壓縮編碼能夠下降磁盤存儲空間(因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(如 Run Length Encoding t Delta Encoding)進一步節約存儲空間)oracle
---> 只讀取須要的列,支持向量運算,可以獲取更好的掃描性能
---> Parquet 格式是 Spark SQL 的默認數據源,可經過 spark.sql.sources.default 配置
==> parquet 經常使用操做
---> load 和 save 函數
// 讀取 Parquet 文件 val usersDF = spark.read.load("/test/users.parquet") // 查詢 Schema 和數據 usersDF.printSchema usersDF.show // 查詢用戶的 name 和喜好顏色並保存 usersDF.select($"name", $"favorite_color").write.save("/test/result/parquet") // 驗證結果 可經過 printSchema 查詢數據結構,使用 show 查看數據 // 顯式指定文件格式: 加載 json 格式 val usersDF = spark.read.format("json").load("/test/people.json") // 存儲模式(Save Modes) // 能夠採用 SaveMode 執行存儲操做, SaveMode 定義 了對數據的處理模式,須要注意的是,這些保存模式不使用任何鎖定,不是原子操做 // 當使用 Overwrite 方式執行時,在輸出新數據以前,原數據就已經被刪除 usersDF.select($"name").write.save("/test/parquet1") // 若 /test/parquet1 存在會報錯 usersDF.select($"name").wirte.mode("overwrite").save("/test/parquet1") // 使用 overwrite 便可 // 將結果保存爲表, 也能夠進行分區, 分桶等操做: partitionBy bucketBy usersDF.select($"name").write.saveAsTable("table1")
---> Parquet文件
Parquet 是一個列格式並且用於多個數據處理系統中
Spark SQL 提供支持對於 Parquet 文件的讀寫,也就是自動保存原始 數據的 Schema, 當寫 Parquet 文件時,全部的列被自動轉化爲 nullable,由於兼容性的緣故
---- 讀取 Json 格式的數據,將其轉換成 parquet 格式,建立相應的表,使用 SQL 語句查詢
// 從 json 文件中讀入數據 val empJson = spark.read.json("/test/emp.json") // 將數據保存爲 parquet empJson.write.mode("overwrite").parquet("/test/parquet") // 讀取 parquet val empParquet = spark.read.parquet("/test/parquet") // 建立臨時表 emptable empParquet.createOrReplaceTempView("emptalbe") // 使用 SQL 語句執行查詢 spark.sql("select * from emptable where deptno=10 and sal>1500").show
---- Schematic 的合併: 先定義一個簡單的 Schema,而後逐漸增長列描述,用戶能夠獲取多個有多個不一樣 Schema 但相互兼容的 Parquet 文件
// 建立第一個文件 val df1 = sc.makeRDD(1 to 5).map(x=> (x, x*2)).toDF("single", "double") scala> df1.printSchema root |-- single: integer (nullable = false) |-- double: integer (nullable = false) // 建立第二個文件 scala> val df2 = sc.makeRDD(6 to 10).map(x=> (x, x*2)).toDF("single", "triple") df2: org.apache.spark.sql.DataFrame = [single: int, triple: int] scala> df2.printSchema root |-- single: integer (nullable = false) |-- triple: integer (nullable = false) scala> df2.write.parquet("/data/testtable/key=2") // 合併上面的兩個文件 scala> val df3 = spark.read.option("mergeSchema", "true").parquet("/data/testtable") df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields] scala> df3.printSchema root |-- single: integer (nullable = true) |-- double: integer (nullable = true) |-- triple: integer (nullable = true) |-- key: integer (nullable = true) scala> df3.show +------+------+------+---+ |single|double|triple|key| +------+------+------+---+ | 8| null| 16| 2| | 9| null| 18| 2| | 10| null| 20| 2| | 3| 6| null| 1| | 4| 8| null| 1| | 5| 10| null| 1| | 6| null| 12| 2| | 7| null| 14| 2| | 1| 2| null| 1| | 2| 4| null| 1| +------+------+------+---+
---> Json Datasets(兩種寫法)
// 第一種 scala> val df4 = spark.read.json("/app/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json") df4: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df4.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ // 第二種 scala> val df5 = spark.read.format("json").load("/app/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json") df5: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df5.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
---> JDBC 方式讀取關係型數據庫中的數據(須要將 JDBC 的驅動加入)
// 將 JDBC 的驅動加入 bin/spark-shell --master spark://bigdata11:7077 --jars /root/temp/ojdbc6.jar --driver-class-path /root/temp/ojdbc6.jar // 讀取 Oracle val oracleEmp = spark.read.format("jdbc") .option("url","jdbc:oracle:thin:@192.168.10.100:1521/orcl.example.com") .option("dbtable","scott.emp") .option("user","scott") .option("password","tiger").load
---> 操做 Hive 的表
---- 把 hive 和 hadoop 的配置文件拷貝到sprke 的 conf 目錄下: hive-sit.xml, core-sit.xml, hdfs-sit.xml
---- 啓動 Spark-shell 時 指定mysql 數據庫的驅動程序
./bin/spark-shell --master spark://bigdata0:7077 --jars /data/tools/mysql-connector-java-5.1.43-bin.jar --driver-class-path /data/tools/mysql-connector-java-5.1.43-bin.jar
---- 使用 Spark Shell 操做 Hive
// 建立表 spark.sql("create table ccc(key INT, value STRING) row format delimited fields terminated by ','") // 導入數據 spark.sql("load data local path '/test/data.txt' into table ccc") // 查詢數據 spark.sql("select * from ccc").show
---- 使用 Spark SQL 操做 Hive
show tables; select * from ccc;