Hadoop 之 數據壓縮

1 概述

Hadoop 之 數據壓縮

壓縮策略和原則java

Hadoop 之 數據壓縮

2 MR 支持的壓縮編碼

壓縮格式 hadoop自帶 算法 文件擴展名 是否可切分 換成壓縮格式後,原程序是否須要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本處理同樣,不須要修改
Gzip 是,直接使用 DEFLATE .gz 和文本處理同樣,不須要修改
bzip2 是,直接使用 bzip2 .bz2 和文本處理同樣,不須要修改
LZO 否,須要安裝 LZO .lzo 須要建索引,還須要指定輸入格式
Snappy 否,須要安裝 Snappy .snappy 和文本處理同樣,不須要修改

爲了支持多種壓縮/解壓縮算法,Hadoop 引入了編碼/解碼器,以下表所示。算法

壓縮格式 對應的編碼/解碼器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

壓縮性能的比較apache

壓縮算法 原始文件大小 壓縮文件大小 壓縮速度 解壓速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

3 壓縮方式選擇

3.1 Gzip 壓縮

Hadoop 之 數據壓縮

3.2 Bzip2 壓縮

Hadoop 之 數據壓縮

3.3 Lzo 壓縮

Hadoop 之 數據壓縮

3.4 Snappy 壓縮

Hadoop 之 數據壓縮

4 壓縮位置選擇

Hadoop 之 數據壓縮

5 壓縮參數配置

參數 默認值 階段
io.compression.codecs [在core-site.xml] org.apache.hadoop.io.compress.DefaultCodecorg apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.BZip2Codec 輸入壓縮
mapreduce.map.output.compress [mapred-site.xml] false mapper輸出
mapreduce.map.output.compress.codec [mapred-site.xml] org.apache.hadoop.io.compress.DefaultCodec mapper輸出
mapreduce.output.fileoutputformat.compress [mapred-site.xml] false reducer輸出
mapreduce.output.fileoutputformat.compress.codec [mapred-site.xml] org.apache.hadoop.io.compress DefaultCodec reducer輸出
mapreduce.output.fileoutputformat.compress.type [mapred-site.xml] RECORD reducer輸出

6 壓縮實操案例

6.1 數據流的壓縮和解壓縮

package com.djm.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.*;

public class CompressUtils {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        compress(args[0], args[1]);
        decompress(args[0]);
    }

    private static void decompress(String path) throws IOException {
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
        CompressionCodec codec = (CompressionCodec) factory.getCodec(new Path(path));
        if (codec == null) {
            System.out.println("cannot find codec for file " + path);
            return;
        }
        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(path)));
        FileOutputStream fos = new FileOutputStream(new File(path + ".decoded"));
        IOUtils.copyBytes(cis, fos, 1024);
        cis.close();
        fos.close();
    }

    private static void compress(String path, String method) throws IOException, ClassNotFoundException {
        FileInputStream fis = new FileInputStream(new File(path));
        Class codecClass  = Class.forName(method);
        CompressionCodec codec  = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
        FileOutputStream fos = new FileOutputStream(new File(path + codec.getDefaultExtension()));
        CompressionOutputStream cos = codec.createOutputStream(fos);
        IOUtils.copyBytes(fis, cos, 1024);
        cos.close();
        fos.close();
        fis.close();
    }
}

6.2 Map 輸出端採用壓縮

package com.djm.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        configuration.setBoolean("mapreduce.map.output.compress", true);
        // 設置map端輸出壓縮方式
        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
        Job job = Job.getInstance(configuration);
        job.setJarByClass(WcDriver.class);
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

6.3 Reduce 輸出端採用壓縮

package com.djm.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(WcDriver.class);
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 設置reduce端輸出壓縮開啓
        FileOutputFormat.setCompressOutput(job, true);
        // 設置壓縮的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
相關文章
相關標籤/搜索