Hadoop中一些Java Api操做(23)

Hadoop基本文件系統操做:
    1.首先從本地文件系統將一個文件複製到HDFS:
        hadoop fs-copyFromLocal input/docs/quangle.txthdfs://localhost/user/tom/quangle.txt
        把文件複製回本地文件系統,並檢查是否一致:
            hadoop fs -copyToLocal quangle.txt quangle.copy.txt
            md5 input/docs/quangle.txt quangle.copy.txt
            輸出:
                MD5(input/docs/quangle.txt)=a16f231da6b05e2ba7a339320e7dacd9
                MD5(quangle.copy.txt)=a16f231da6b05e2ba7a339320e7dacd9
            因爲MD5鍵值相同,代表這個文件在HDFS之旅中得以倖存並保存完整.
    2.從HadoopURL中讀取數據:
        InputStreamin=null;
        try{
            in=new URL("hdfs://host/path").openStream();
        }finally{
            IOUtils.closeStream(in);
        }
        經過URLStreamHandler實現以標準輸出方式顯示Hadoop文件系統的文件
        publicclassURLCat(){
            static{
                URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
            }
        }
        注:從0.21.0版本開始,加入了一個名爲FileContext的文件系統接口,該接口可以更好地處理多文件系統問題,而且該接口更簡明、一致.
        public static void main(String[] args)throws Exception{
        InputStreamin = null;
            try{
                in = new URL(args[0].openStream());
                IOUtils.copyBytes(in,System.out,4096,false);
            }finally{
                IOUtils.closeStream(in);
            }
        }
        調用Hadoop中簡潔的IOUtils類,並在finally字句夅關閉數據流,同時也能夠在輸入流和輸出流之間複製數據(System.out),copyBytes方法的最後兩個參數,第一個用於設置複製的緩衝區大小,第二個用戶設置複製結果後是否關閉數據流.這裏選擇自行關閉輸入流,於是System.out不關閉輸入流.
        運行示例:
            hadoop URLCat hdfs://localhost/user/tom/quangle.txt
    3.經過FileSystem API讀取數據
        FileSystem是一個通用的文件系統API,因此第一步是檢索咱們須要使用的文件系統實例,這裏是HDFS.
        獲取FileSystem實例有兩種靜態工廠方法:
            public static FileSystem get(Configurationconf)throws IOException
            public static FileSystem get(URIuri,Configurationconf)throws IOException
        Configuration對象封裝了客戶端或服務器的配置,經過設置配置文件讀取類路徑來實現(如conf/core-site.xml).
        第一個方法返回的是默認文件系統(在conf/core-site.xml中指定的,若是沒有指定,則使用默認的本地文件系統).
        第二個方法經過給定的URI方案和權限來肯定要使用的文件系統,若是給定URI中沒有指定方案,則返回默認文件系統.
        有了FileSystem實例以後,咱們調用open()函數來獲取文件的輸入流:
            public FSDataInputStream open(Pathf)throws IOException
            public abstract FSDataInputStream open(Pathf,intbufferSize)throws IOException
        第一個方法使用默認的的緩衝區大小4kb.
        直接使用FileSystem以標準輸出格式顯示Hadoop文件系統中的文件
        public class FileSystemCat(){
            public static void main(String[] args)throws Exception{
                String uri = args[0];
                Configuration conf= new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri),conf);
                InputStream in = null;
                try{
                    in = fs.open(new Path(uri));
                    IOUtils.copyBytes(in,System.out.4096,false);
                }finally{
                    IOUtils.closeStream(in);
                }
            }
        }
        程序運行結果以下:
            hadoop FileSystemCat hdfs://localhost/user/tom/quagnle.txt
    4.寫入數據
        FileSystem類有一系列建立文件的方法.最簡單的方法是給準備建立的文件制定一個Path對象.而後返回一個用於寫入數據的輸出流:
        public FSDataOutputStream create(Path f)throws IOException
        注意:create()方法可以爲須要寫入且當前不存在的文件建立父目錄.儘管這樣很方便,但有時並不但願這樣.若是你但願不存在父目錄就發生文件寫入失敗,則應該先調用exists()方法檢查父目錄是否存在.
        還有一個重載方法Propressable,用於傳遞迴調接口,如此一來,能夠把數據寫入數據節點的進度通知到你的應用:
            public interface Progressable{
                public void progress();
            }
        另一種新建文件的方法,是使用append()方法在一個一有文件末尾追加數據(還存在一些其餘重載版本):
            public FSDataOutputStream append(Path f)throws IOException
        該追加操做容許一個writer打開文件後訪問該文件的最後偏移量處追加數據.有了這個API,某些應用能夠建立無邊界文件.
        例如:
            日誌文件能夠在機器重啓後在已有文件後面繼續追加數據.該追加操做是可選的,並不是全部Hadoop文件系統都實現了該操做.
            例如,HDFS支持追加,但S3文件系統就不支持.
    5.將本地文件複製到Hadoop文件系統:
        public class FileCopyWithProgress{
            public static void main(String[] args){
                String localSrc = args[0];
                String dst = args[1];
                InputStream in = new BufferInputStream(newFileInputStream(localSrc));
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(dst),conf);
                OutputStream out = fs.create(new Path(dst),newProgressable(){
                    public void progress(){
                        System.out.print(".");
                    }
                });
                IOUtils.copyBytes(in,out,4096,true);
            }
        }
    6.FSDataInputStream
        public class FSdataInputStream extends DataInputStream implements Seekable,PositionedReadable{}
        Seekable接口支持在文件中找到指定位置,並提供一個查詢當前位置相對於文件起始位置偏移量(getPos())的查詢方法:
            public interface Seekable{
                void seek(long pos)throws IOException;
                long getPos()throws IOException;
                boolean seekToNewSource(long targetPos)throws IOException;
            }
        使用seek方法,將Hadoop文件系統中的一個文件標準輸出上顯示兩次
            public class FileSystem DoubleCat{
                public static void main(String[] args)throws Exception{
                    String uri =args[0];
                    Configuration conf = new Configuration();
                    FileSystem fs = FileSystem.get(URI.create(uri),conf);
                    FSDataInputStream in=null;
                    try{
                        in = fs.open(new Path(uri));
                        IOUtils.copyByste(in,System.out,4096,false);
                        in.seek(0);
                        IOUtils.copyBytes(in,System.out,4096,false);
                    }finally{
                        IOUtils.closeStream(in);
                    }
                }
            }
        在一個小文件上運行的結果以下:
            hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
            FSDataInputStream類也實現了PositionedReadable接口,從一個指定偏移量處理讀取文件的一部分:
            public interface PositionedReadable{
                public int read(long position,byte[] buffer,int offset,int length)throws IOException;
                public void readFully(long position,byte[] buffer,int offset,int length)throws IOException;
                public void readFully(long position,byte[] buffer)throws IOException;
            }
            read()方法從文件的指定position處讀取至多爲length字節的數據並存入緩衝區buffer的指定偏離量offset處.返回值是實際讀到的字節數:調用者須要檢查這個值,它有可能小於指定的length長度.
            readFully()方法將指定length長度的字節數數據讀取到buffer中(或在只接受buffer字節數組的版本中,讀取buffer.length長度字節數據),除非已經讀到文件末尾,這種狀況下將拋出EOFException異常.
            全部這些方法會保留文件當前偏移量,而且是線程安全的,所以它們提供了再讀取文件-----多是元數據----的主題時訪問文件的其餘部分的便利方法.事實上,這只是按照如下模式實現的Seekable接口:
                lon goldPos = getPos();
                try{
                    seek(position);
                }finally{
                    seek(oldPos)
                }
            seek()方法是一個相對高開銷的操做,須要慎重使用.建議用流數據來構建應用的訪問模式(如使用MapReduce),而非執行大量的seek()方法.
    7.FSDataOutputStream對象
        FileSystem實例的create()方法返回FSDataOutputStream對象,與FSDataInputSream類類似,它也有一個檢查文件當前位置的方法:
        public class FSDataOutputStream extends DataOutputStream implements Syncable{
            public long getPos()throwsIOException{
                //
            }
        }
        但與FSDataInputStream類不一樣的是,FSDataOutputStream類不容許在文件中定位.這是由於HDFS只容許對一個已打開的文件順序寫入,或在現有文件的末尾追加數據.它不支持在除文件末尾以外的其餘位置進行寫入,所以,寫入時定位就沒有什麼意義.
    8.FileSystem實例提供了建立目錄的方法:
        public boolean mkdirs(Path f)throws IOException
        這個方法能夠一次性建立全部必要但尚未的父目錄,就像java.io.File類的mkdirs()方法.若是目錄都已經建立成功,則返回true.
        一般,你不須要顯式建立一個目錄,由於調用create()方法寫入文件時會自動建立父目錄.

Hadoop查詢文件系統:
    1.文件元數據:FileStatus
        任何文件系統的一個重要特徵都是提供其目錄結構瀏覽和檢索它所存文件盒目錄相關信息的功能.
        FileStatus類封裝了文件系統中文件和目錄的元數據,包括文件長度,塊大小,備份,修改時間,全部者以及權限信息.
        FileSystem的getFileStatus()方法用於獲取文件或目錄的FileStatus對象.
        展現文件狀態信息:
        public class ShowFileStatusTest{
            private MiniDFSCluster cluster;
            private FileSystem fs;
            @Before
            public void setUp()throws IOException{
                Configuration conf = new Configuration();
                if(System.getProperty("text.build.data")==null){
                    System.setProperty("test.build,data","/tmp");
                }
                cluster=newmMiniDFSCluster(conf,1,true,null);
                fs=cluster.getFileSystem();
                OutputStream out=fs.create(newPath("/dir/file"));
                out.write("content".getBytes("UTF-8"));
                out.close();
            }
            @After
            publicvoidtearDown(()throwsIOException{
                if(fs!=null){fs.close();}
                if(cluster!=null){cluster.shutdown();}
            }    
            @Test(expected=FileNotFoundException.class)
            public void throwsFileNotFoundNonExistentFile()throwsIOException{
                fs.getFileStatus(newPath("no-such-file"));
            }
            @Test
            public void fileStatusForFile()throws IOException{
                Path file = new Path("/dir/file");
                FileStatus stat = fs.getFileStatus(file);
                assertThat(stat.getPath().toUri().getPath(),is("/dir/file"));
                assertThat(stat.isDir(),is(false));
                assertThat(stat.getLen(),is(7L));
                assertThat(stat.getModificationTime(),is(lessThanOrEqualTO(System.currentTimeMillis())));
                assertThat(stat.getReplication(),is(short)1);
                assertThat(stat.getBlockSize(),is(64*1024*1024L));
                assertThat(stat.getOwner(),is("tom"));
                assertThat(stat.getGroup(),is("supergroup"));
                assertThat(stat.getPermission().toString(),is("rw-r--r--"));
            }
            @Test
            public void fileStatusForDirectory()throws IOException{
                Path dir = new Path("/dir")
                FileStatus stat = fs.getFileStatus(dir);
                assertThat(stat.getPath().toUri().getPath(),is("/dir"));
                assertThat(stat.isDir(),is(true));//是不是目錄
                assertThat(stat.getLen(),is(0L));//文件長度
                assertThat(stat.getModificationTime(),is(lessThanOrEqualTO(System.currentTimeMillis())));//最後修改時間
                assertThat(stat.getReplication(),is(short)0);//備份數
                assertThat(stat.getBlockSize(),is(0L));//塊大小
                assertThat(stat.getOwner(),is("tom"));//用戶
                assertThat(stat.getGroup(),is("supergroup"));//組
                assertThat(stat.getPermission().toString(),is("rwxr-xrr-x"));//權限
            }
        }
        若是文件或目錄均不存在,則會拋出FileNotFoundException異常.可是,若是隻需檢查文件或目錄是否存在,那麼調用exists()方法會更方便:
        public boolean exists(Path f)throws IOException
        
    2.列出文件:
        查找一個文件或目錄的信息很實用,但一般你還須要可以列出目錄的內容.這就是FileSystem的listStatus()方法的功能:
            public FileStatus[] listStatus(Path f)throws IOException
            public FileStatus[] listStatus(Path f,PathFilter fileter)throws IOException
            public FileStatus[] listStatus(Path[] files)throws IOException
            public FileStatus[] listStatus(Path[] files,PathFilter fileter)throws IOException
        當傳入的參數是一個文件時,它會簡單轉變成以數組方法返回長度爲1的FileStatus對象.
        當傳入參數是一個目錄時,則返回0或多個FileStatus對象,表示此目錄中包含的文件和目錄.
        注意FileUtil中stat2Paths()方法的使用,它將一個FileStatus對象數組轉換爲Path對象數組.
        例:顯示Hadoop文件系統中一組路徑的文件信息
        public class ListStatus{
            public static void main(String[]args){
                String uri = args[0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri),conf);
                Path[] paths = new Path[args.length];
                for(int i=0;i<paths.length;i++){
                    paths[i] = new Path(args[i]);
                }
                FileStatus[] status = fs.listStatus(paths);
                Path[] listedPaths = FileUtil.stat2Paths(status);
                for(Path p:listedPaths){
                    System.out.println(p);
                }
            }
        }
    3.文件模式
        在單個操做中處理一批文件,這是一個常見要求.舉例來講,處理日誌的MapReduce做業可能須要分析一個月內包含在大量目錄中的日誌文件.在一個表達式中使用通配符來匹配多個文件時比較方便的,無需列舉每一個文件盒目錄來指定輸入,該操做稱爲"通配".Hadoop爲執行通配提供了兩個FileSystem方法:
            public FileStatus[] globStatus(Path pathPattern)throws IOException
            public FileStatus[] globStatus(Path pathPattern,Path Filterfilter)throws IOException
            globStatus()方法返回與路徑相匹配的全部文件的FileStatus對象數組,並按路徑排序.PathFilter命令做爲可選項能夠進一步對匹配限制.
            Hadoop支持的通配符與Unixbash相同.
            通配符            名稱            匹配
            *                星號            匹配0或多個字符
            ?                問號            匹配單一個字符
            [ab]            字符類            匹配{a,b}集合中的一個字符
            [^ab]            非字符類        匹配非{a,b}集合中的一個字符
            [a-b]            字符範圍    匹配一個在{a,b}範圍內的字符(包括ab),a在字典順序上要小於或等於b
            [^a-b]            非字符範圍        匹配一個不在{a,b}範圍內的字符(包括ab),a在字典順序上要小於等於b
            {a,b}            或選擇            匹配包含a或b中的一個的表達式
            \c                轉義字符        匹配元字符c
    4.PathFilter對象            通配符模式並不總可以精確地描述咱們想要訪問的文件集.好比,使用通配格式排除一個特定的文件就不太可能,FileSystem中的listStatus()和globStatus()方法提供了可選的PathFilter對象,使咱們可以經過編程方式控制通配符:
        public interface PathFilter{
            boolean accept(Path path);
        }
    PathFilter與java.io.FileFilter同樣,是Path對象而不是File對象.
    例子:用於排序匹配正則表達式路徑的PathFilter
        public class RegexExcludePathFilter implements PathFilter{
            private final String regex;
            public RegexExcludePathFilter(String regex){
                this.regex = regex;
            }
            public boolean accept(Path path){
                return !path.toString().matchers(regex);
            }
        }
    這個過濾器只傳遞不匹配正則表達式的文件.咱們將該過濾器與預先去除文件的通配符想結合:過濾器可優化結果.
    fs.globStatus(newPath("/2007/*/*"),newRegexExcludeFilter("^.*/2007/12/31$"));
    過濾器由Path表示,只能做用於文件名.不能針對文件的屬性來構建過濾器.可是,通配符模式和正則表達式一樣沒法對文件屬性進行匹配.例如,若是你將文件存儲在按照日期排列的目錄結構中,則能夠根據Pathfilter在給定的時間範圍內選出文件.
    4.刪除數據
        使用FileSystem的delete()方法能夠永久性刪除文件或目錄.
        public boolean delete(Path f,boolean recursive)throwsIOException
        若是f是一個文件或空目錄,那麼recursive的值就會被忽略.只有在recrusive值爲true時,一個非空目錄及其內容纔回被刪除(不然會拋出IOException異常)java


                                                                                                                                Name:Xr正則表達式

                                                                                                                                Date:2014-03-12 21:54
編程

相關文章
相關標籤/搜索