Hadoop(九)Hadoop IO之Compression和Codecs

前言java

  前面一篇介紹了Java怎麼去查看數據塊的相關信息和怎麼去查看文件系統。咱們只要知道怎麼去查看就好了!接下來我分享的是Hadoop的I/O操做。算法

  在Hadoop中爲何要去使用壓縮(Compression)呢?接下來咱們就知道了。apache

1、壓縮(Compression)概述

1.一、壓縮的好處

  減小儲存文件所須要的磁盤空間,並加速數據在網絡和磁盤上的傳輸。這兩個在大數據處理大齡數據時至關重要!編程

1.二、壓縮格式總結

  

  Hadoop對前面三種有默認集成,有就是說Hadoop支持DEFLATE、Gzip、bzip2三種壓縮格式。然後面三種Hadoop沒有支持,要用的話要本身去官網服務器

  下載相應的源碼去編譯加入到Hadoop才能用。網絡

  注意:ide

  1)這裏我要說的是「是否分割」,當咱們一個文件去壓縮即便有很是好的壓縮算法,可是它的大小仍是超過了一個數據塊的大小,這時就涉及到分割了。oop

      因此說在之後的壓縮咱們大多數狀況下會使用bzip2測試

  2)Gzip和bzip2比較時,bzip2的壓縮率(壓縮以後的大小除以源文件的大小)要小,因此說bzip2的壓縮效果好。而這裏就會壓縮和解壓縮的時候浪費更多的時間。大數據

    就是咱們常說的「用時間換取空間」。

2、編解碼器(Codec)概述

  codec實現了一種壓縮-加壓縮算法(意思就是codec使用相關的算法對數據進行編解碼)。在Hadoop中,一個對CompressionCodec接口的實現表明一個codec

  

  對於不一樣的壓縮算法有不一樣的編解碼器

  咱們要對一個文件進行壓縮須要編碼器,對一個壓縮文件進行解壓須要解碼器。那咱們怎麼樣去獲取編解碼器呢?

    有兩種方式:

      一是:根據擴展名讓程序本身去選擇相應的編解碼器。好比說:我在本地有一個文件是 user.txt咱們經過-Dinput=user.txt去上傳這個文件到集羣,

        在集羣中咱們把它指定到-Doutput=/user.txt.gz.。這是咱們程序的相關的類會根據你的擴展名(這裏是.gz)獲取相應的壓縮編解碼器。

        在Hadoop中有一個CompressionCodecFactory會根據擴展名獲取相應的編解碼器對象 。

      二是:咱們本身去指定編解碼器。爲何要去指定呢?好比說,我在本地有一個文件是user.txt.gz,其實這個壓縮文件是使用的是bzip2的壓縮算法壓縮的。

          (由於我本身去更改了它的擴展名),因此這時候就要本身去指定編解碼器。

3、Java編程實現文件的壓縮與解壓縮

3.一、原理分析

  在咱們把本地的文件上傳的集羣的時候,究竟是哪裏須要壓縮,哪裏須要解壓縮,在哪裏壓縮?這都是須要明白,下面畫一張圖給你們理解:

  

3.二、相關類和方法

  在Hadoop中關於壓縮和解壓縮的包、接口和類:

    

  1)CompressionCodec接口中

    

  2)CompressionCodecFactory類

       

    第一個是:根據文件的文件名後綴找到相應的壓縮編解碼器
    第二個是:爲編解碼器的標準類名找到相關的壓縮編解碼器。
    第三個是:爲編解碼器的標準類名或經過編解碼器別名找到相關的壓縮編解碼器。

3.三、Java將本地文件壓縮上傳到集羣當中

  1)核心代碼

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WriteDemo_0010
    extends Configured implements Tool{
    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        String input=conf.get("input");
        String output=conf.get("output");
        LocalFileSystem lfs=
            FileSystem.getLocal(conf);
        FileSystem rfs=
            FileSystem.get(
                URI.create(output),conf);
        FSDataInputStream is=
            lfs.open(new Path(input));
        FSDataOutputStream os=
            rfs.create(new Path(output));
 CompressionCodecFactory ccf=
            new CompressionCodecFactory(conf);
    //把路徑傳進去,根據指定的後綴名獲取編解碼器 CompressionCodec codec= ccf.getCodec(new Path(output)); CompressionOutputStream cos=
codec.createOutputStream(os); System.out.println( codec.getClass().getName()); IOUtils.copyBytes(is,cos,1024,true); //close return 0; } public static void main(String[] args) throws Exception{ System.exit( ToolRunner.run( new WriteDemo_0010(),args)); } }

  2)測試

    將IEDA中打好的jar包上傳到Linux中(安裝了HDFS集羣的客戶端的服務器中)執行:

      

    結果:

      

      咱們能夠從前面的那種表中能夠看的出來,獲取到了相應的編解碼器。

    再次測試:

      

    結果:

      

3.四、Java將集羣文件解壓縮到本地

  1)核心代碼

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReadDemo_0010
    extends Configured
    implements Tool{
    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        String input=conf.get("input");
        String output=conf.get("output");
        FileSystem rfs=
            FileSystem.get(
                URI.create(input),conf);
        LocalFileSystem lfs=
            FileSystem.getLocal(conf);

        FSDataInputStream is=
            rfs.open(new Path(input));
        FSDataOutputStream os=
            lfs.create(new Path(output));

        CompressionCodecFactory factory=
            new CompressionCodecFactory(conf);
        CompressionCodec codec=
            factory.getCodec(new Path(input));
        CompressionInputStream cis=
            codec.createInputStream(is);

        IOUtils.copyBytes(cis,os,1024,true);
        return 0;
    }

    public static void main(String[] args) throws Exception{
        System.exit(
            ToolRunner.run(
                new ReadDemo_0010(),args));
    }
}

  2)測試

    

    結果:

    

  查看結果:

    

 

喜歡就點個「推薦」哦!

相關文章
相關標籤/搜索