1 概述

壓縮策略和原則java

2 MR 支持的壓縮編碼
壓縮格式 |
hadoop自帶 |
算法 |
文件擴展名 |
是否可切分 |
換成壓縮格式後,原程序是否須要修改 |
DEFLATE |
是,直接使用 |
DEFLATE |
.deflate |
否 |
和文本處理同樣,不須要修改 |
Gzip |
是,直接使用 |
DEFLATE |
.gz |
否 |
和文本處理同樣,不須要修改 |
bzip2 |
是,直接使用 |
bzip2 |
.bz2 |
是 |
和文本處理同樣,不須要修改 |
LZO |
否,須要安裝 |
LZO |
.lzo |
是 |
須要建索引,還須要指定輸入格式 |
Snappy |
否,須要安裝 |
Snappy |
.snappy |
否 |
和文本處理同樣,不須要修改 |
爲了支持多種壓縮/解壓縮算法,Hadoop 引入了編碼/解碼器,以下表所示。算法
壓縮格式 |
對應的編碼/解碼器 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
LZO |
com.hadoop.compression.lzo.LzopCodec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
壓縮性能的比較apache
壓縮算法 |
原始文件大小 |
壓縮文件大小 |
壓縮速度 |
解壓速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
3 壓縮方式選擇
3.1 Gzip 壓縮

3.2 Bzip2 壓縮

3.3 Lzo 壓縮

3.4 Snappy 壓縮

4 壓縮位置選擇

5 壓縮參數配置
參數 |
默認值 |
階段 |
io.compression.codecs [在core-site.xml] |
org.apache.hadoop.io.compress.DefaultCodecorg apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.BZip2Codec |
輸入壓縮 |
mapreduce.map.output.compress [mapred-site.xml] |
false |
mapper輸出 |
mapreduce.map.output.compress.codec [mapred-site.xml] |
org.apache.hadoop.io.compress.DefaultCodec |
mapper輸出 |
mapreduce.output.fileoutputformat.compress [mapred-site.xml] |
false |
reducer輸出 |
mapreduce.output.fileoutputformat.compress.codec [mapred-site.xml] |
org.apache.hadoop.io.compress DefaultCodec |
reducer輸出 |
mapreduce.output.fileoutputformat.compress.type [mapred-site.xml] |
RECORD |
reducer輸出 |
6 壓縮實操案例
6.1 數據流的壓縮和解壓縮
package com.djm.mapreduce.zip;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.*;
public class CompressUtils {
public static void main(String[] args) throws IOException, ClassNotFoundException {
compress(args[0], args[1]);
decompress(args[0]);
}
private static void decompress(String path) throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = (CompressionCodec) factory.getCodec(new Path(path));
if (codec == null) {
System.out.println("cannot find codec for file " + path);
return;
}
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(path)));
FileOutputStream fos = new FileOutputStream(new File(path + ".decoded"));
IOUtils.copyBytes(cis, fos, 1024);
cis.close();
fos.close();
}
private static void compress(String path, String method) throws IOException, ClassNotFoundException {
FileInputStream fis = new FileInputStream(new File(path));
Class codecClass = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
FileOutputStream fos = new FileOutputStream(new File(path + codec.getDefaultExtension()));
CompressionOutputStream cos = codec.createOutputStream(fos);
IOUtils.copyBytes(fis, cos, 1024);
cos.close();
fos.close();
fis.close();
}
}
6.2 Map 輸出端採用壓縮
package com.djm.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
configuration.setBoolean("mapreduce.map.output.compress", true);
// 設置map端輸出壓縮方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(configuration);
job.setJarByClass(WcDriver.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
6.3 Reduce 輸出端採用壓縮
package com.djm.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WcDriver.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設置reduce端輸出壓縮開啓
FileOutputFormat.setCompressOutput(job, true);
// 設置壓縮的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}