【Hadoop篇08】Hadoop數據壓縮

簡潔而不簡單

Hadoop數據壓縮

數據壓縮優勢和缺點

​ 壓縮技術可以有效減小底層存儲系統(HDFS)讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在 Hadoop下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下,IO操做和網絡數據傳輸要花大量的時間。還有, Shuffle與 Merge過程一樣也面臨着巨大的IO壓力鰵於磁盤IO和網絡帶寬是 Hadoop的寶貴資源,數據壓縮對於節省資源、最小化磁盤IO和網絡傳輸很是有幫助java

​ 不過,儘管壓縮與解壓操做的CPU開銷不髙,其性能的提高和資源的節省並不是沒有代價。若是磁盤IO和網絡帶寬影響了 MapReduce做業性能,在任意 MapReduce階段啓用壓縮均可以改善端到端處理時間並減少IO和網絡流量。git

壓縮策略和原則

​ 壓縮是提升 Hadoop運行效率的一種優化策略經過對 Mapper、 Reducer運行過程的數據進行壓縮,以減小磁盤IO,提升MR程序運行速度
​ 注意:釆用壓縮技術減小了磁盤IO,但同時增長了CPU運算負擔。因此,壓縮特性運用得當能提升性能,但運用不當也可能下降性能壓縮基本原則:github

(1)運算密集型的job,少用壓縮
(2)IO密集型的job,多用壓縮!!算法

MR支持的壓縮編碼

壓縮格式 hadoop自帶? 算法 文件擴展名 是否可切分 換成壓縮格式後,原來的程序是否須要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本處理同樣,不須要修改
Gzip 是,直接使用 DEFLATE .gz 和文本處理同樣,不須要修改
bzip2 是,直接使用 bzip2 .bz2 和文本處理同樣,不須要修改
LZO 否,須要安裝 LZO .lzo 須要建索引,還須要指定輸入格式
Snappy 否,須要安裝 Snappy .snappy 和文本處理同樣,不須要修改

爲了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,以下表所示。apache

壓縮格式 對應的編碼/解碼器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

壓縮性能的比較

壓縮算法 原始文件大小 壓縮文件大小 壓縮速度 解壓速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

壓縮方式選擇

Gzip壓縮

image-20200619174950679

Bzip2壓縮

image-20200619175002143

Lzo壓縮

image-20200619175012846

Snappy壓縮

image-20200619175204704

壓縮位置選擇

image-20200619175257463

壓縮參數配置

參數 默認值 階段 建議
io.compression.codecs (在core-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec 輸入壓縮 Hadoop使用文件擴展名判斷是否支持某種編解碼器
mapreduce.map.output.compress(在mapred-site.xml中配置) false mapper輸出 這個參數設爲true啓用壓縮
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec mapper輸出 使用LZO或Snappy編解碼器在此階段壓縮數據
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) false reducer輸出 這個參數設爲true啓用壓縮
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress. DefaultCodec reducer輸出 使用標準工具或者編解碼器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) RECORD reducer輸出 SequenceFile輸出使用的壓縮類型:NONE和BLOCK

壓縮案例

image-20200619175722079

public class TestCompress {

    public static void main(String[] args) throws Exception {
        compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
//        decompress("e:/hello.txt.bz2");
    }

    // 一、壓縮
    private static void compress(String filename, String method) throws Exception {
        
        // (1)獲取輸入流
        FileInputStream fis = new FileInputStream(new File(filename));
        
        Class codecClass = Class.forName(method);
        
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
        
        // (2)獲取輸出流
        FileOutputStream fos = new FileOutputStream(new File(filename +codec.getDefaultExtension()));
        CompressionOutputStream cos = codec.createOutputStream(fos);
        
        // (3)流的對拷
        IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
        
        // (4)關閉資源
        fis.close();
        cos.close();
        fos.close();
    }

    // 二、解壓縮
    private static void decompress(String filename) throws FileNotFoundException, IOException {
        
        // (0)校驗是否能解壓縮
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());

        CompressionCodec codec = factory.getCodec(new Path(filename));
        
        if (codec == null) {
            System.out.println("cannot find codec for file " + filename);
            return;
        }
        
        // (1)獲取輸入流
        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
        
        // (2)獲取輸出流
        FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
        
        // (3)流的對拷
        IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
        
        // (4)關閉資源
        cis.close();
        fos.close();
    }
}

Map輸出端採用壓縮

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();

        // 開啓map端輸出壓縮
    configuration.setBoolean("mapreduce.map.output.compress", true);
        // 設置map端輸出壓縮方式
    configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 1 : 0);
    }
}

Mapper和Reducer代碼不變網絡

Reduce輸出端採用壓縮

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        Configuration configuration = new Configuration();
        
        Job job = Job.getInstance(configuration);
        
        job.setJarByClass(WordCountDriver.class);
        
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 設置reduce端輸出壓縮開啓
        FileOutputFormat.setCompressOutput(job, true);
        
        // 設置壓縮的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 
        
        boolean result = job.waitForCompletion(true);
        
        System.exit(result?1:0);
    }
}

相關資料

1

本文配套 GitHubhttps://github.com/zhutiansam...
相關文章
相關標籤/搜索