Parquet文件讀寫與合併小Parquet文件

1、簡介

先來一張官網的圖片,也許可以幫助咱們更好理解Parquet的文件格式和內容。java

Parquet格式

parquet設計讓它更好的壓縮比例和更快的過濾速度。node

一個File有多個Row Group 一個Row Group有多個Column 一個Column有多個Pagegit

FileMetaData\Row Group metadata\column metadatagithub

並行單元:spring

  1. MapReduce - File/Row Group
  2. IO - Column chunk
  3. Encoding/Compression - Page

2、schema(MessageType)

每一個schema有一個根叫作message,message包含多個fieldsapache

每一個field包含三個屬性: repetition, type, namejson

repetition能夠是如下三種:api

  1. required:有且只有1次
  2. optional:出現0次或者1次(最多出現一次)
  3. repeated:出現0次或者屢次

type能夠是:int3二、int6四、int9六、float、double、boolean、binary、groupapp

除了group,其餘的被稱爲primitive類型,group組合了primitive類型。dom

message Book {
  required binary bookName (UTF8);
  required boolean market;
  required double price;
  repeated group author {
    required binary name (UTF8);
    required int32 age;
  }
}

3、MessageType獲取

3.1 從字符串構造

public static MessageType getMessageTypeFromString (){
    String schemaString = "message Book {\n" +
            "  required binary bookName (UTF8);\n" +
            "  required boolean market;\n" +
            "  required double price;\n" +
            "  repeated group author {\n" +
            "    required binary name (UTF8);\n" +
            "    required int32 age;\n" +
            "  }\n" +
            "}";
    MessageType schema = MessageTypeParser.parseMessageType(schemaString);
    return schema;
}

3.2 從代碼建立

public static MessageType getMessageTypeFromCode(){
    MessageType messageType = Types.buildMessage()
            .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("bookName")
            .required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("market")
            .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("price")
            .requiredGroup()
            .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name")
            .required(PrimitiveType.PrimitiveTypeName.INT32).named("age")
            .named("author")
            .named("Book");
    System.out.println(messageType.toString());
    return messageType;
}

3.3 經過Parquet文件獲取

public static MessageType getMessageType(Path path,Configuration configuration) throws IOException {
        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, configuration);
        ParquetFileReader parquetFileReader = ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build());
        ParquetMetadata metaData = parquetFileReader.getFooter();
        MessageType schema = metaData.getFileMetaData().getSchema();
        //記得關閉
        parquetFileReader.close();
        return schema;
}

除了經過metadata獲取,還能夠經過文件獲取,也能夠經過下面的方式獲取

org.apache.parquet.example.data.Group
org.apache.parquet.example.data.simple.SimpleGroup
// group能夠是上面2個對象之一
System.out.println("schema:" + group.getType().toString());

讀取schema以後記得關閉,否則可能會出現java.net.SocketException: Too many open files錯誤。

3.4 完整示例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.schema.*;

import java.io.IOException;

public class GetParquetSchema {

    public static void main(String[] args) throws Exception{
        //本地文件
        String localPath = "file:///D:\\tmp\\parquet\\book.parquet";
        //hdfs文件
        String hdfsPath = "/tmp/parquet/book.parquet";

        Configuration localConfiguration = new Configuration();

        Configuration hdfsConfiguration = new Configuration();
        hdfsConfiguration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://192.168.8.206:9000");

        MessageType newMessageType = getNewMessageType(localPath,localConfiguration);
        System.out.println(newMessageType);
        System.out.println("--------------");
        newMessageType = getMessageType(hdfsPath,hdfsConfiguration);
        System.out.println(newMessageType);
//        getMessageTypeFromCode();
//        getMessageTypeFromString();
    }

    public static MessageType getMessageType(Path path,Configuration configuration) throws IOException {
        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, configuration);
        ParquetFileReader parquetFileReader = ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build());
        ParquetMetadata metaData = parquetFileReader.getFooter();
        MessageType schema = metaData.getFileMetaData().getSchema();
        //記得關閉
        parquetFileReader.close();
        return schema;
    }

    public static MessageType getMessageTypeFromCode(){
        MessageType messageType = Types.buildMessage()
                .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("bookName")
                .required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("market")
                .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("price")
                .requiredGroup()
                .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name")
                .required(PrimitiveType.PrimitiveTypeName.INT32).named("age")
                .named("author")
                .named("Book");
        System.out.println(messageType.toString());
        return messageType;
    }

    public static MessageType getMessageTypeFromString (){
        String schemaString = "message Book {\n" +
                "  required binary bookName (UTF8);\n" +
                "  required boolean market;\n" +
                "  required double price;\n" +
                "  repeated group author {\n" +
                "    required binary name (UTF8);\n" +
                "    required int32 age;\n" +
                "  }\n" +
                "}";
        MessageType schema = MessageTypeParser.parseMessageType(schemaString);
        return schema;
    }
}

4、Parquet讀寫

4.1 讀寫本地文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.IOException;
import java.util.Random;

public class LocalParquetWritRead {

    public static final String DATA_PATH = "file:///D:\\tmp\\parquet\\book.parquet";

    private static String schemaStr = "message Book {\n" +
            "  required binary bookName (UTF8);\n" +
            "  required boolean market;\n" +
            "  required double price;\n" +
            "  repeated group author {\n" +
            "    required binary name (UTF8);\n" +
            "    required int32 age;\n" +
            "  }\n" +
            "}";

    private final static MessageType schema = MessageTypeParser.parseMessageType(schemaStr);


    public static void main(String[] args) throws IOException {
//        write();
        read();
    }

    public static void write() throws IOException {
        Path path = new Path(DATA_PATH);
        Configuration configuration = new Configuration();
        ExampleParquetWriter.Builder builder = ExampleParquetWriter
                .builder(path).withWriteMode(ParquetFileWriter.Mode.CREATE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withConf(configuration)
                .withType(schema);

        ParquetWriter<Group> writer = builder.build();
        SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);

        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            Group group = groupFactory.newGroup();
            group.append("bookName","bookName" + i)
                    .append("market",random.nextBoolean())
                    .append("price",random.nextDouble())
                    .addGroup("author")
                    .append("name","aname" + i)
                    .append("age",18 + random.nextInt(72));
            writer.write(group);
        }
        writer.close();
    }


    public static void read() throws IOException {
        Path path = new Path(DATA_PATH);
        ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), path);
        ParquetReader<Group> reader = builder.build();
        Group group;
        while ((group = reader.read()) != null){
            System.out.println("schema:" + group.getType().toString());
            System.out.println(group.getString("bookName",0));
            System.out.println(group.getBoolean("market",0));
            System.out.println(group.getDouble("price",0));

            Group author = group.getGroup("author", 0);
            System.out.println(author.getString("name",0));
            System.out.println(author.getInteger("age",0));
        }
    }
}

4.2 讀寫HDFS文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.IOException;
import java.util.Random;

public class HdfsParquetWritRead {

    public static final String DATA_PATH = "/tmp/parquet/book.parquet";

    private static String schemaStr = "message Book {\n" +
            "  required binary bookName (UTF8);\n" +
            "  required boolean market;\n" +
            "  required double price;\n" +
            "  repeated group author {\n" +
            "    required binary name (UTF8);\n" +
            "    required int32 age;\n" +
            "  }\n" +
            "}";

    private final static MessageType schema = MessageTypeParser.parseMessageType(schemaStr);


    public static void main(String[] args) throws IOException {
        //避免Permission denied: user=xxx, access=WRITE, inode="/tmp/parquet":root:supergroup:drwxr-xr-x
        System.setProperty("HADOOP_USER_NAME","root");
        write();
//        read();
    }

    public static void write() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://192.168.8.206:9000");
        Path path = new Path(DATA_PATH);
        ExampleParquetWriter.Builder builder = ExampleParquetWriter
                .builder(path).withWriteMode(ParquetFileWriter.Mode.CREATE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withConf(configuration)
                .withType(schema);

        ParquetWriter<Group> writer = builder.build();
        SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);

        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            Group group = groupFactory.newGroup();
            group.append("bookName","bookName" + i)
                    .append("market",random.nextBoolean())
                    .append("price",random.nextDouble())
                    .addGroup("author")
                    .append("name","aname" + i)
                    .append("age",18 + random.nextInt(72));
            writer.write(group);
        }
        writer.close();
    }


    public static void read() throws IOException {
        Path path = new Path(DATA_PATH);
        Configuration configuration = new Configuration();
        configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://192.168.8.206:9000");
        GroupReadSupport groupReadSupport = new GroupReadSupport();
        ParquetReader.Builder<Group> builder = ParquetReader.builder(groupReadSupport, path).withConf(configuration);
        ParquetReader<Group> reader = builder.build();
        Group group;
        while ((group = reader.read()) != null){
            System.out.println("schema:" + group.getType().toString());
            System.out.println(group.getString("bookName",0));
            System.out.println(group.getBoolean("market",0));
            System.out.println(group.getDouble("price",0));

            Group author = group.getGroup("author", 0);
            System.out.println(author.getString("name",0));
            System.out.println(author.getInteger("age",0));
        }
    }
}

group.getString("bookName",0)第二個參數0是爲可重複字段(repeated)準備的,獲取第幾個值,從0開始,因此對應required類型,都是0。

5、合併Parquet小文件

合併Parquet文件應該是一個很是實用的操做,由於不少時候咱們使用相似於Kafka這樣的消息系統來收集數據的時候,就可能會產生不少小文件。

過多的小文件會讓應用變慢,因此咱們須要合併小的文件。

package org.curitis.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class MergeHdfsParquetFile {

    private static FileSystem fileSystem;

    static {
        System.setProperty("HADOOP_USER_NAME","root");
        try {
            fileSystem = FileSystem.get(getConfiguration());
        } catch (IOException e) {
            System.exit(1);
        }
    }

    public static void main(String[] args) throws Exception {
        mergeParquet("/tmp/merge");
    }


    private static void mergeParquet(String dir) throws Exception {
        MessageType messageType = checkSchemaSame(dir);
        if(messageType == null){//MessageType不一致
            return;
        }
        List<Path> parquetPaths = getParquetPaths(dir);

        String dest = dir + "/merge-" + System.currentTimeMillis() + ".parquet";
        Path destPath = new Path(dest);
        ParquetWriter parquetWriter = getParquetWriter(messageType, destPath);
        ParquetReader<Group> parquetReader;
        Group book;
        for(Path path : parquetPaths) {
            parquetReader = getParquetReader(path);
            while ((book = parquetReader.read()) != null) {
                parquetWriter.write(book);
            }
        }
        parquetWriter.close();
        if(fileSystem.exists(destPath)){
            FileStatus fileStatus = fileSystem.getFileStatus(destPath);
            if(fileStatus.getLen() <= 1024){
                System.err.println(dir + "files len to small pleach ack need delete");
            }else {
                for(Path path : parquetPaths){
                    fileSystem.delete(path,false);
                }
            }
        }
    }

    public static List<Path> getParquetPaths(String dir) throws Exception {
        Path dirPath = new Path(dir);
        RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(dirPath, false);
        List<Path> fileList = new ArrayList<Path>();
        while (locatedFileStatusRemoteIterator.hasNext()) {
            LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
            Path path = next.getPath();
            FileStatus fileStatus = fileSystem.getFileStatus(path);
            if(fileStatus.isFile() && path.getName().endsWith(".parquet")) {//若是是parquet文件
                fileList.add(path);
            }
        }
        return fileList;
    }

    private static MessageType checkSchemaSame(String dir) throws Exception {
        List<MessageType> groupTypes = getMessageType(dir);
        int size = groupTypes.size();
        if(size == 0 || size == 1){//0個和1個都不處理
            return null;
        }
        MessageType groupType = groupTypes.get(0);
        for(MessageType gt : groupTypes){
            if(!groupType.equals(gt)){
                return null;
            }
        }
        return groupType;
    }

    private static List<MessageType> getMessageType(String dir) throws Exception {
        List<Path> parquetPaths = getParquetPaths(dir);
        LinkedList<MessageType> groupTypes = new LinkedList<>();
        for(Path path : parquetPaths){
            groupTypes.add(getMessageType(path));
        }
        return groupTypes;
    }

    public static MessageType getMessageType(Path path) throws IOException {
        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(path, getConfiguration());
        ParquetFileReader parquetFileReader = ParquetFileReader.open(hadoopInputFile, ParquetReadOptions.builder().build());
        ParquetMetadata metaData = parquetFileReader.getFooter();
        MessageType schema = metaData.getFileMetaData().getSchema();
        parquetFileReader.close();
        return schema;
    }

    private static Configuration getConfiguration(){
        Configuration configuration = new Configuration();
        //path中就不用加hdfs://127.0.0.1:9000了
        configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9000");
        return configuration;
    }

    public static ParquetReader getParquetReader(Path path) throws IOException {
        GroupReadSupport readSupport = new GroupReadSupport();
        ParquetReader.Builder<Group> builder = ParquetReader.builder(readSupport, path);
        builder.withConf(getConfiguration());
        ParquetReader<Group> parquetReader =builder.build();
        return parquetReader;
    }

    public static ParquetWriter getParquetWriter(MessageType schema, Path path) throws IOException {
        ExampleParquetWriter.Builder writebuilder = ExampleParquetWriter.builder(path);
        writebuilder.withWriteMode(ParquetFileWriter.Mode.CREATE);
        writebuilder.withCompressionCodec(CompressionCodecName.SNAPPY);
        writebuilder.withConf(getConfiguration());
        writebuilder.withType(schema);
        ParquetWriter writer = writebuilder.build();
        return writer;
    }
}

上面會先檢查目錄下的parquet的schema是否一致,一致才合併。

若是schema不一樣,就不能直接寫group,就要分字段寫,若是沒有嵌套的group仍是比較好處理的,若是有嵌套的group就稍微麻煩一些。

6、pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.curitis</groupId>
    <artifactId>hadoop-learn</artifactId>
    <version>1.0.0</version>

    <properties>
        <spring.version>5.1.3.RELEASE</spring.version>
        <junit.version>4.11</junit.version>
        <hadoop.version>3.0.2</hadoop.version>
        <parquet.version>1.10.1</parquet.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- parquet -->
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-common</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-encoding</artifactId>
            <version>${parquet.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>

        <!--log-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.7</version>
        </dependency>

        <!--test-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.25.Final</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

7、文檔

parquet parquet-format parquet-mr

相關文章
相關標籤/搜索