Apache Parquet是Hadoop生態系統中的列式存儲格式,面向分析型業務,與數據處理框架、數據模型、編程語言無關。html
● 優點
下降存儲空間:按列存,可以更好地壓縮數據,由於一列的數據通常都是同質的(homogenous)
提升IO效率:掃描(遍歷/scan)的時候,能夠只讀其中部分列. 並且因爲數據壓縮的更好的緣故,IO所需帶寬也會減少
下降上層應用延遲sql
查詢引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
計算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
數據模型: Avro, Thrift, Protocol Buffers, POJOsshell
當時Twitter的日增數據量達到壓縮以後的100TB+,存儲在HDFS上,工程師會使用多種計算框架(例如MapReduce, Hive, Pig等)對這些數據作分析和挖掘;
日誌結構是複雜的嵌套數據類型,例如一個典型的日誌的schema有87列,嵌套了7層。因此須要一種列式存儲格式,既能支持關係型數據(簡單數據類型),又能支持複雜的嵌套類型的數據,同時可以適配多種數據處理框架。編程
● 難點
處理嵌套的數據結構纔是真正的挑戰。
多個 field 能夠造成一個 group,一個 field 能夠重複出現(叫作 repeated field)。用 group 和 repeated field 的各類組合來描述。json
● Definition Level
知道究竟是從哪一級開始沒定義的,這是還原整條記錄所必須知道的。
不要讓 definition level 太大,這很重要,目標是所用的比特越少越好。數據結構
● Repetition level
標示着新 List 出現的層級。
repetition level 告訴咱們,在從列式表達,還原嵌套結構的時候,是在哪一級插入新值。架構
示意圖:http://lastorder.me/tag/parquet.html
每個基本類型的列,都要建立三個子列(R, D, Value)。
三個子列的總開銷其實並不大. 由於兩種 Levels的最大值,是由 schema 的深度決定的,而且一般只用幾個 bit 就夠用了(1個bit 就可表達1層嵌套,2個bit就能夠表達3層嵌套了,3個bit就可以表達7層嵌套。
爲了經過列是存儲,還原重建這條嵌套結構的記錄,寫一個循環讀列中的值。
R=0, D=2, Value = 「555 987 6543」:
R = 0 這是一個新的 record. 從根開始按照schema 重建結構,直到 repetition level 達到 2
D = 2 是最大值,值是有定義的,因此此時將值插入.
R=1, D=1:
R = 1 level1 的 contact list 中一條新記錄
D = 1 contacts 有定義,但 phoneNumber 沒定義,所建一個空的 contacts 便可.
R=0, D=0:
R = 0 一條新 record. 能夠重建嵌套結構,直到達到 definition level 的值.
D = 0 => contacts 是 null,因此最後拼裝出來的是一個空的 Address Book框架
提升查詢性能、存儲壓縮編程語言
● 《Spark SQL下的Parquet使用最佳實踐和代碼實戰》http://blog.csdn.net/sundujing/article/details/51438306oop
● 《操做技巧:將 Spark 中的文本轉換爲 Parquet 以提高性能》http://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html
● 《[Big Data]從Hadoop到Spark的架構實踐》http://www.cnblogs.com/losbyday/p/5854618.html
● 《Spark SQL 下DateFrame的初步認識(3)》http://blog.csdn.net/erfucun/article/details/52086858
業界的主流公司在作大數據分析的時候,基本上都以Parquet方式存儲數據。
Parquet文件是基於列式存儲的,列之間是分開的,能夠進行各類高效的優化。Spark底層通常都會接 Parquet文件。
Parquet是列式存儲格式的一種文件類型,列式存儲有如下的核心優點:
a.能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。
b.壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
c.只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。
● 離線數據挖掘:HDFS(JSON) -> Spark/Hive(ETL) -> HDFS Parquet -> Spark SQL / ML / GraphX
● 實時即時分析:Kafka -> Spark Streaming -> HDFS(JSON) -> HDFS Parquet -> Spark SQL / ML / GraphX
● 實時流式計算:Kafka -> Spark Streaming -> Redis
1. 原始日誌的保存。將Kafka中的原始日誌以JSON格式無損的保存在HDFS中
2. 數據清洗和轉換,清洗和標準化以後,轉變爲Parquet格式,存儲在HDFS中,方便後續的各類數據計算任務
3. 定義好的流式計算任務,好比基於頻次規則的標籤加工等等,計算結果直接存儲在MongoDB中
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
# 上傳行爲日誌文件至HDFS
hdfs dfs -mkdir /user/hadoop/test
hdfs dfs -put /home/hadoop/test/user_behavior.txt /user/hadoop/test/
# 啓動Spark
cd /usr/local/spark
./bin/spark-shell
// 讀取HDFS上的JSON文件
val df = sqlContext.read.json("hdfs://localhost:9000/user/hadoop/test/user_behavior.txt")
// 保存爲parquet文件
df.saveAsParquetFile("hdfs://localhost:9000/user/hadoop/test/userBehavior.parquet")
// 讀取parquet文件
val parquetData = sqlContext.parquetFile("hdfs://localhost:9000/user/hadoop/test/userBehavior.parquet")
parquetData.show()
parquetData.printSchema()
// 執行計算df.groupBy("Behavior").count().show()parquetData.groupBy("Behavior").count().show()