先來一張官網的圖片,也許可以幫助咱們更好理解Parquet的文件格式和內容。java
parquet設計讓它更好的壓縮比例和更快的過濾速度。node
一個File有多個Row Group 一個Row Group有多個Column 一個Column有多個Pagegit
FileMetaData\Row Group metadata\column metadatagithub
並行單元:spring
每一個schema有一個根叫作message,message包含多個fieldsapache
每一個field包含三個屬性: repetition, type, namejson
repetition能夠是如下三種:api
type能夠是:int3二、int6四、int9六、float、double、boolean、binary、groupapp
除了group,其餘的被稱爲primitive類型,group組合了primitive類型。dom
message Book { required binary bookName (UTF8); required boolean market; required double price; repeated group author { required binary name (UTF8); required int32 age; } }
public static MessageType getMessageTypeFromString (){ String schemaString = "message Book {\n" + " required binary bookName (UTF8);\n" + " required boolean market;\n" + " required double price;\n" + " repeated group author {\n" + " required binary name (UTF8);\n" + " required int32 age;\n" + " }\n" + "}"; MessageType schema = MessageTypeParser.parseMessageType(schemaString); return schema; }
public static MessageType getMessageTypeFromCode(){ MessageType messageType = Types.buildMessage() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("bookName") .required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("market") .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("price") .requiredGroup() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name") .required(PrimitiveType.PrimitiveTypeName.INT32).named("age") .named("author") .named("Book"); System.out.println(messageType.toString()); return messageType; }
public static MessageType getMessageType(Path path,Configuration configuration) throws IOException { HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, configuration); ParquetFileReader parquetFileReader = ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build()); ParquetMetadata metaData = parquetFileReader.getFooter(); MessageType schema = metaData.getFileMetaData().getSchema(); //記得關閉 parquetFileReader.close(); return schema; }
除了經過metadata獲取,還能夠經過文件獲取,也能夠經過下面的方式獲取
org.apache.parquet.example.data.Group org.apache.parquet.example.data.simple.SimpleGroup // group能夠是上面2個對象之一 System.out.println("schema:" + group.getType().toString());
讀取schema以後記得關閉,否則可能會出現java.net.SocketException: Too many open files錯誤。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HiddenFileFilter; import org.apache.parquet.schema.*; import java.io.IOException; public class GetParquetSchema { public static void main(String[] args) throws Exception{ //本地文件 String localPath = "file:///D:\\tmp\\parquet\\book.parquet"; //hdfs文件 String hdfsPath = "/tmp/parquet/book.parquet"; Configuration localConfiguration = new Configuration(); Configuration hdfsConfiguration = new Configuration(); hdfsConfiguration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://192.168.8.206:9000"); MessageType newMessageType = getNewMessageType(localPath,localConfiguration); System.out.println(newMessageType); System.out.println("--------------"); newMessageType = getMessageType(hdfsPath,hdfsConfiguration); System.out.println(newMessageType); // getMessageTypeFromCode(); // getMessageTypeFromString(); } public static MessageType getMessageType(Path path,Configuration configuration) throws IOException { HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, configuration); ParquetFileReader parquetFileReader = ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build()); ParquetMetadata metaData = parquetFileReader.getFooter(); MessageType schema = metaData.getFileMetaData().getSchema(); //記得關閉 parquetFileReader.close(); return schema; } public static MessageType getMessageTypeFromCode(){ MessageType messageType = Types.buildMessage() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("bookName") .required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("market") .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("price") .requiredGroup() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name") .required(PrimitiveType.PrimitiveTypeName.INT32).named("age") .named("author") .named("Book"); System.out.println(messageType.toString()); return messageType; } public static MessageType getMessageTypeFromString (){ String schemaString = "message Book {\n" + " required binary bookName (UTF8);\n" + " required boolean market;\n" + " required double price;\n" + " repeated group author {\n" + " required binary name (UTF8);\n" + " required int32 age;\n" + " }\n" + "}"; MessageType schema = MessageTypeParser.parseMessageType(schemaString); return schema; } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import java.io.IOException; import java.util.Random; public class LocalParquetWritRead { public static final String DATA_PATH = "file:///D:\\tmp\\parquet\\book.parquet"; private static String schemaStr = "message Book {\n" + " required binary bookName (UTF8);\n" + " required boolean market;\n" + " required double price;\n" + " repeated group author {\n" + " required binary name (UTF8);\n" + " required int32 age;\n" + " }\n" + "}"; private final static MessageType schema = MessageTypeParser.parseMessageType(schemaStr); public static void main(String[] args) throws IOException { // write(); read(); } public static void write() throws IOException { Path path = new Path(DATA_PATH); Configuration configuration = new Configuration(); ExampleParquetWriter.Builder builder = ExampleParquetWriter .builder(path).withWriteMode(ParquetFileWriter.Mode.CREATE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .withCompressionCodec(CompressionCodecName.SNAPPY) .withConf(configuration) .withType(schema); ParquetWriter<Group> writer = builder.build(); SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); Random random = new Random(); for (int i = 0; i < 100; i++) { Group group = groupFactory.newGroup(); group.append("bookName","bookName" + i) .append("market",random.nextBoolean()) .append("price",random.nextDouble()) .addGroup("author") .append("name","aname" + i) .append("age",18 + random.nextInt(72)); writer.write(group); } writer.close(); } public static void read() throws IOException { Path path = new Path(DATA_PATH); ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), path); ParquetReader<Group> reader = builder.build(); Group group; while ((group = reader.read()) != null){ System.out.println("schema:" + group.getType().toString()); System.out.println(group.getString("bookName",0)); System.out.println(group.getBoolean("market",0)); System.out.println(group.getDouble("price",0)); Group author = group.getGroup("author", 0); System.out.println(author.getString("name",0)); System.out.println(author.getInteger("age",0)); } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import java.io.IOException; import java.util.Random; public class HdfsParquetWritRead { public static final String DATA_PATH = "/tmp/parquet/book.parquet"; private static String schemaStr = "message Book {\n" + " required binary bookName (UTF8);\n" + " required boolean market;\n" + " required double price;\n" + " repeated group author {\n" + " required binary name (UTF8);\n" + " required int32 age;\n" + " }\n" + "}"; private final static MessageType schema = MessageTypeParser.parseMessageType(schemaStr); public static void main(String[] args) throws IOException { //避免Permission denied: user=xxx, access=WRITE, inode="/tmp/parquet":root:supergroup:drwxr-xr-x System.setProperty("HADOOP_USER_NAME","root"); write(); // read(); } public static void write() throws IOException { Configuration configuration = new Configuration(); configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://192.168.8.206:9000"); Path path = new Path(DATA_PATH); ExampleParquetWriter.Builder builder = ExampleParquetWriter .builder(path).withWriteMode(ParquetFileWriter.Mode.CREATE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .withCompressionCodec(CompressionCodecName.SNAPPY) .withConf(configuration) .withType(schema); ParquetWriter<Group> writer = builder.build(); SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); Random random = new Random(); for (int i = 0; i < 100; i++) { Group group = groupFactory.newGroup(); group.append("bookName","bookName" + i) .append("market",random.nextBoolean()) .append("price",random.nextDouble()) .addGroup("author") .append("name","aname" + i) .append("age",18 + random.nextInt(72)); writer.write(group); } writer.close(); } public static void read() throws IOException { Path path = new Path(DATA_PATH); Configuration configuration = new Configuration(); configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://192.168.8.206:9000"); GroupReadSupport groupReadSupport = new GroupReadSupport(); ParquetReader.Builder<Group> builder = ParquetReader.builder(groupReadSupport, path).withConf(configuration); ParquetReader<Group> reader = builder.build(); Group group; while ((group = reader.read()) != null){ System.out.println("schema:" + group.getType().toString()); System.out.println(group.getString("bookName",0)); System.out.println(group.getBoolean("market",0)); System.out.println(group.getDouble("price",0)); Group author = group.getGroup("author", 0); System.out.println(author.getString("name",0)); System.out.println(author.getInteger("age",0)); } } }
group.getString("bookName",0)第二個參數0是爲可重複字段(repeated)準備的,獲取第幾個值,從0開始,因此對應required類型,都是0。
合併Parquet文件應該是一個很是實用的操做,由於不少時候咱們使用相似於Kafka這樣的消息系統來收集數據的時候,就可能會產生不少小文件。
過多的小文件會讓應用變慢,因此咱們須要合併小的文件。
package org.curitis.parquet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; public class MergeHdfsParquetFile { private static FileSystem fileSystem; static { System.setProperty("HADOOP_USER_NAME","root"); try { fileSystem = FileSystem.get(getConfiguration()); } catch (IOException e) { System.exit(1); } } public static void main(String[] args) throws Exception { mergeParquet("/tmp/merge"); } private static void mergeParquet(String dir) throws Exception { MessageType messageType = checkSchemaSame(dir); if(messageType == null){//MessageType不一致 return; } List<Path> parquetPaths = getParquetPaths(dir); String dest = dir + "/merge-" + System.currentTimeMillis() + ".parquet"; Path destPath = new Path(dest); ParquetWriter parquetWriter = getParquetWriter(messageType, destPath); ParquetReader<Group> parquetReader; Group book; for(Path path : parquetPaths) { parquetReader = getParquetReader(path); while ((book = parquetReader.read()) != null) { parquetWriter.write(book); } } parquetWriter.close(); if(fileSystem.exists(destPath)){ FileStatus fileStatus = fileSystem.getFileStatus(destPath); if(fileStatus.getLen() <= 1024){ System.err.println(dir + "files len to small pleach ack need delete"); }else { for(Path path : parquetPaths){ fileSystem.delete(path,false); } } } } public static List<Path> getParquetPaths(String dir) throws Exception { Path dirPath = new Path(dir); RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(dirPath, false); List<Path> fileList = new ArrayList<Path>(); while (locatedFileStatusRemoteIterator.hasNext()) { LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); Path path = next.getPath(); FileStatus fileStatus = fileSystem.getFileStatus(path); if(fileStatus.isFile() && path.getName().endsWith(".parquet")) {//若是是parquet文件 fileList.add(path); } } return fileList; } private static MessageType checkSchemaSame(String dir) throws Exception { List<MessageType> groupTypes = getMessageType(dir); int size = groupTypes.size(); if(size == 0 || size == 1){//0個和1個都不處理 return null; } MessageType groupType = groupTypes.get(0); for(MessageType gt : groupTypes){ if(!groupType.equals(gt)){ return null; } } return groupType; } private static List<MessageType> getMessageType(String dir) throws Exception { List<Path> parquetPaths = getParquetPaths(dir); LinkedList<MessageType> groupTypes = new LinkedList<>(); for(Path path : parquetPaths){ groupTypes.add(getMessageType(path)); } return groupTypes; } public static MessageType getMessageType(Path path) throws IOException { HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, getConfiguration()); ParquetFileReader parquetFileReader = ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build()); ParquetMetadata metaData = parquetFileReader.getFooter(); MessageType schema = metaData.getFileMetaData().getSchema(); parquetFileReader.close(); return schema; } private static Configuration getConfiguration(){ Configuration configuration = new Configuration(); //path中就不用加hdfs://127.0.0.1:9000了 configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9000"); return configuration; } public static ParquetReader getParquetReader(Path path) throws IOException { GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader.Builder<Group> builder = ParquetReader.builder(readSupport, path); builder.withConf(getConfiguration()); ParquetReader<Group> parquetReader =builder.build(); return parquetReader; } public static ParquetWriter getParquetWriter(MessageType schema, Path path) throws IOException { ExampleParquetWriter.Builder writebuilder = ExampleParquetWriter.builder(path); writebuilder.withWriteMode(ParquetFileWriter.Mode.CREATE); writebuilder.withCompressionCodec(CompressionCodecName.SNAPPY); writebuilder.withConf(getConfiguration()); writebuilder.withType(schema); ParquetWriter writer = writebuilder.build(); return writer; } }
上面會先檢查目錄下的parquet的schema是否一致,一致才合併。
若是schema不一樣,就不能直接寫group,就要分字段寫,若是沒有嵌套的group仍是比較好處理的,若是有嵌套的group就稍微麻煩一些。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.curitis</groupId> <artifactId>hadoop-learn</artifactId> <version>1.0.0</version> <properties> <spring.version>5.1.3.RELEASE</spring.version> <junit.version>4.11</junit.version> <hadoop.version>3.0.2</hadoop.version> <parquet.version>1.10.1</parquet.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- parquet --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-common</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-encoding</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <!--log--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency> <!--test--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>