簡潔而不簡單
壓縮技術可以有效減小底層存儲系統(HDFS)讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在 Hadoop下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下,IO操做和網絡數據傳輸要花大量的時間。還有, Shuffle與 Merge過程一樣也面臨着巨大的IO壓力鰵於磁盤IO和網絡帶寬是 Hadoop的寶貴資源,數據壓縮對於節省資源、最小化磁盤IO和網絡傳輸很是有幫助。java
不過,儘管壓縮與解壓操做的CPU開銷不髙,其性能的提高和資源的節省並不是沒有代價。若是磁盤IO和網絡帶寬影響了 MapReduce做業性能,在任意 MapReduce階段啓用壓縮均可以改善端到端處理時間並減少IO和網絡流量。git
壓縮是提升 Hadoop運行效率的一種優化策略經過對 Mapper、 Reducer運行過程的數據進行壓縮,以減小磁盤IO,提升MR程序運行速度。
注意:釆用壓縮技術減小了磁盤IO,但同時增長了CPU運算負擔。因此,壓縮特性運用得當能提升性能,但運用不當也可能下降性能壓縮基本原則:github
(1)運算密集型的job,少用壓縮
(2)IO密集型的job,多用壓縮!!算法
壓縮格式 | hadoop自帶? | 算法 | 文件擴展名 | 是否可切分 | 換成壓縮格式後,原來的程序是否須要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本處理同樣,不須要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本處理同樣,不須要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本處理同樣,不須要修改 |
LZO | 否,須要安裝 | LZO | .lzo | 是 | 須要建索引,還須要指定輸入格式 |
Snappy | 否,須要安裝 | Snappy | .snappy | 否 | 和文本處理同樣,不須要修改 |
爲了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,以下表所示。apache
壓縮格式 | 對應的編碼/解碼器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
壓縮算法 | 原始文件大小 | 壓縮文件大小 | 壓縮速度 | 解壓速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
參數 | 默認值 | 階段 | 建議 |
---|---|---|---|
io.compression.codecs (在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec | 輸入壓縮 | Hadoop使用文件擴展名判斷是否支持某種編解碼器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper輸出 | 這個參數設爲true啓用壓縮 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper輸出 | 使用LZO或Snappy編解碼器在此階段壓縮數據 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer輸出 | 這個參數設爲true啓用壓縮 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress. DefaultCodec | reducer輸出 | 使用標準工具或者編解碼器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) | RECORD | reducer輸出 | SequenceFile輸出使用的壓縮類型:NONE和BLOCK |
public class TestCompress { public static void main(String[] args) throws Exception { compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec"); // decompress("e:/hello.txt.bz2"); } // 一、壓縮 private static void compress(String filename, String method) throws Exception { // (1)獲取輸入流 FileInputStream fis = new FileInputStream(new File(filename)); Class codecClass = Class.forName(method); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration()); // (2)獲取輸出流 FileOutputStream fos = new FileOutputStream(new File(filename +codec.getDefaultExtension())); CompressionOutputStream cos = codec.createOutputStream(fos); // (3)流的對拷 IOUtils.copyBytes(fis, cos, 1024*1024*5, false); // (4)關閉資源 fis.close(); cos.close(); fos.close(); } // 二、解壓縮 private static void decompress(String filename) throws FileNotFoundException, IOException { // (0)校驗是否能解壓縮 CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration()); CompressionCodec codec = factory.getCodec(new Path(filename)); if (codec == null) { System.out.println("cannot find codec for file " + filename); return; } // (1)獲取輸入流 CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename))); // (2)獲取輸出流 FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded")); // (3)流的對拷 IOUtils.copyBytes(cis, fos, 1024*1024*5, false); // (4)關閉資源 cis.close(); fos.close(); } }
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); // 開啓map端輸出壓縮 configuration.setBoolean("mapreduce.map.output.compress", true); // 設置map端輸出壓縮方式 configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 1 : 0); } }
Mapper和Reducer代碼不變網絡
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設置reduce端輸出壓縮開啓 FileOutputFormat.setCompressOutput(job, true); // 設置壓縮的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
本文配套 GitHub: https://github.com/zhutiansam...