使用FileSystem類進行文件讀寫及查看文件信息

使用FileSystem類進行文件讀寫及查看文件信息

 

  在這一節咱們要深刻了解Hadoop的FileSystem類——這是與與hadoop的文件系統交互的重要接口。雖然咱們只是着重於HDFS的實現,但咱們在編碼時通常也要注意代碼在FileSystem不一樣子類文件系統之間的可移植性。這是很是有用的,好比說你能夠很是方便的直接用一樣的代碼在你的本地文件系統上進行測試。html

使用hadoop URL讀數據

  從hadoop文件系統中讀取文件的最簡單的方法之一即是使用java.net.URL對象來打開一個欲從中讀取數據的流(stream)。一般狀況下的編程風格以下:java

複製代碼
1 InputStream in = null;
2 try {
3     in = new URL("hdfs://host/path").openStream();
4     //     process in
5 } finally {
6     IOUtils.closeStream(in);
7 }
複製代碼

  想要使java識別出hdfs開頭的URL標示還須要一點其餘的工做要作:經過URL的setURLStreamHandlerFactory()方法爲java設置一個FSUrlStreamHandlerFactory。這個方法在每一個JVM中只能調用一次,因此它一般會被放在一個static block中執行(以下所示),但若是你的某部分程序——例如一個你沒法修改源代碼的第三方組件——已經調用了這個方法,那你就不能經過URL來這樣讀取數據了(下一節咱們會介紹另外一種方法)。正則表達式

複製代碼
 1 public class URLCat {
 2     static {
 3       URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
 4     }
 5     
 6     public static void main(String[] args) throws Exception{
 7       InputStream in = null;
 8       try {
 9           in = new URL(args[0]).openStream();
10           IOUtils.copyBytes(in, System.out, 4096, false);
11       } finally {
12           // TODO: handle exception
13           IOUtils.closeStream(in);
14       }
15     }
16 }
複製代碼

  上例中咱們使用了Hadoop中的IOUtils類的兩個靜態方法:
  1)IOUtils.copyBytes(),其中in表示拷貝源,System.out表示拷貝目的地(也就是要拷貝到標準輸出中去),4096表示用來拷貝的buffer大小,false代表拷貝完成後咱們並不關閉拷貝源可拷貝目的地(由於System.out並不須要關閉,in能夠在finally語句中被關閉)。
  2)IOUtils.closeStream(),用來關閉一個流。
  下面是咱們的測試例子:apache

% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

使用FileSystem讀取數據

  就像上節所說的,有時候咱們沒法經過設置URLStreamHandlerFactory方法的方式來經過URL讀取數據,這時FIleSystem API就派上用場了。
  Hadoop文件系統中的文件是用Hadoop的Path對象來表示的(而不是java中的java.io.File對象,由於它的語義太接近於本地文件系統了)。你能夠把一個Path對象看作Hadoop文件系統中的某一個URL,如上例中的「hdfs://localhost/user/tom/quangle.txt」。
  Filesystem是一個通用的文件系統API,因此使用它的第一步就是先抽取出它的一個實例出來——在這個例子中是HDFS。下面列出了幾個Filesystem的用於抽取Filesystem實例的幾個靜態方法:編程

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException

  一個Configuration對象封裝了客戶端或服務器端的配置信息,這些配置信息是經過從conf/core-size.xml之類的配置文件中讀取出來的名值對來設置的。下面咱們一一說明上面的三個方法:
  1)第一個方法返回一個默認的文件系統(在conf/core-site.xml中經過fs.default.name來指定的,若是在conf/core-site.xml中沒有設置則返回本地文件系統)。
  2)第二個方法經過uri來指定要返回的文件系統(例如,若是uri是上個測試例子中的hdfs://localhost/user/tom/quangle.txt,也即以hdfs標識開頭,那麼就返回一個hdfs文件系統,若是uri中沒有相應的標識則返回本地文件系統)。
  3)第三個方法返回文件系統的機理同(2)是相同的,但它同時又限定了該文件系統的用戶,這在安全方面是很重要的。數組

  有時候你可能想要使用一個本地文件系統,你可使用另外一個很方便的方法:
    public static LocalFileSystem getLocal(Configuration conf) throws IOException安全

  獲得一個文件系統的實例後,咱們能夠調用該實例的open()方法來打開某個給定文件的輸入流(第一個方法使用一個默認的4KB的輸入緩衝):bash

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

  把上面介紹的組合起來咱們就獲得了下面的代碼:  服務器

複製代碼
 1 public class FileSystemCat {
 2     public static void main(String[] args) throws Exception {
 3         String uri = args[0];
 4         Configuration configuration = new Configuration();
 5         FileSystem fs = FileSystem.get(URI.create(uri), configuration);
 6         InputStream in = null;
 7         try{
 8             in = fs.open(new Path(uri));
 9             IOUtils.copyBytes(in, System.out, 4096, false);
10         } finally {
11             IOUtils.closeStream(in);
12         }
13     }
14 }
複製代碼

 

FSDataInputStream

  與URL的openStream()方法返回InputStream不一樣,FileSystem的open()方法返回的是一個FSDataInputStream對象(繼承關係:java.io.InputStream --> java.io.FilterInputStream --> java.io.DataInputStream --> org.apache.hadoop.fs.FSDataInputStream)。因爲FSDataInputStream實現了Closeable, DataInput, PositionedReadable, Seekable等接口,你能夠從流中的任意一個位置讀取數據。
  Seekable接口的seek()和getPos()方法容許咱們跳轉到流中的某個位置並獲得其位置:app

public interface Seekable {
  void seek(long pos) throws IOException;
  long getPos() throws IOException;
}

  若是調用seek()時指定了一個超過文件長度的位移值,會拋出IOException異常。
  與java.io.Inputstream的skip()方法指明一個相對位移值不一樣,seek()方法使用的是絕對位移值。以下所示的代碼經過seek()方法讀取了兩次輸入文件:  

複製代碼
 1 public class FileSystemDoubleCat {
 2   public static void main(String[] args) throws Exception {
 3     String uri = args[0];
 4     Configuration conf = new Configuration();
 5     FileSystem fs = FileSystem.get(URI.create(uri), conf);
 6     FSDataInputStream in = null;
 7     try {
 8       in = fs.open(new Path(uri));
 9       IOUtils.copyBytes(in, System.out, 4096, false);
10       in.seek(0); // go back to the start of the file
11       IOUtils.copyBytes(in, System.out, 4096, false);
12     } finally {
13       IOUtils.closeStream(in);
14     }
15   }
16 }
複製代碼

  運行結果以下:

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

  FSDataInputStream也實現了 PositionedReadable接口,這容許你從流中的某個給定位置讀取給定長度的內容:

public interface PositionedReadable {
  public int read(long position, byte[] buffer, int offset, int length)
    throws IOException;
  public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException;
  public void readFully(long position, byte[] buffer) throws IOException;
}

  說明:read()方法從文件的給定position出讀取length個字節到buffer的offset處。返回值是讀取到的實際字節數,調用者應該檢查這個返回值,由於它可能比length小(可能讀到了文件末尾,或發生了中斷等等)。

  調用全部的這些方法並不會改變文件的偏移值,因此這些方法是線程安全的。也由此提供了一種當訪問某文件的內容時訪問該文件的另外一部分數據——例如元數據——的很方便的方法。
  最後須要注意的是調用seek()方法的代價比較高,應儘可能避免使用。你的程序應該基於流式訪問來構建,而不是執行一大堆seek。

寫數據

  FileSystem類有不少方法用來建立一個文件,最簡單的就是以欲建立文件的Path對象爲參數的create(Path f)方法,該方法返回一個用來寫入數據的輸出流:
    public FSDataOutputStream create(Path f) throws IOException
  該方法還有幾個重載的方法,經過這些重載的方法你能夠指定是否覆蓋該文件名已存在的文件,這個文件的備份數,用來寫數據的buffer size,該文件的block大小和文件權限等。

create()方法會建立指定的文件名中包含的任何不存在的父目錄,這樣雖然很方便,但不推薦使用(由於若是某個父目錄中存在其餘數據,會被覆蓋掉從而致使文件丟失)。若是你想要當父目錄不存在時該建立操做失敗,你能夠在調用create()方法以前調用exists()方法檢查指明的父目錄是否存在,若是存在則報錯以讓create()失敗

  create()方法還有一個重載方法可讓你傳遞一個回調的藉口——progressable,這樣你的程序就會知道你的數據被寫入了多少,也即寫入的進度(progress):

package org.apache.hadoop.util;
public interface Progressable {
  public void progress();
}

  除了建立一個新文件以寫入數據之外,咱們還可使用append()方法向一個已存在文件添加數據:
    public FSDataOutputStream append(Path f) throws IOException
  有了這個函數,應用程序就能夠向那些不能限制大小的文件(如logfile,你事先並不知道待記錄日誌會有多少)寫數據了。append操做在Hadoop的fileSystem中是可選的,例如HDFS實現了它,但S3就沒有。

  下面這個例子展現瞭如何從本地文件系統拷貝一個文件到HDFS,咱們在每64KB大小的數據寫入以後調用一次progress()函數,這個函數每被調用一次打印一個句點:  

複製代碼
 1 public class FileCopyWithProgress {
 2     public static void main(String[] args) throws Exception {
 3         String localSrc = args[0];
 4         String dst = args[1];
 5         InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
 6         Configuration conf = new Configuration();
 7         FileSystem fs = FileSystem.get(URI.create(dst), conf);
 8         OutputStream out = fs.create(new Path(dst), new Progressable() {
 9             public void progress() {
10                 System.out.print(".");
11             }
12         });
13         IOUtils.copyBytes(in, out, 4096, true);
14     }
15 }
複製代碼

   下面是該例子的示範用法:

% hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/
1400-8.txt
...............

  注:如今除了HDFS之外的其餘Hadoop支持的文件系統都不支持progress()方法,但咱們應該知道進度信息(pregress)在MapReduce程序中是很是重要的。

FSDataOutputStream

  FileSystem中的create()方法返回一個FSDataOutputStream,像FSDataInputStream同樣,它也有一個用於查詢位移的方法(但並無相似於FSDataInputStream中seek()的方法,由於Hadoop不容許向流中的任意位置寫數據,咱們只能在一個文件的末尾處添加數據):

package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
  public long getPos() throws IOException {
    // implementation elided
  }
  // implementation elided
}

查詢某個文件系統

文件元數據:FileStatus

  任何文件系統的典型功能就是可以遍歷它的目錄結構從而獲取有關目錄和文件的信息。Hadoop中的FileStatus類爲文件和目錄包裝了其元數據(包括文件長度,block大小,冗餘度,修改時間,文件全部者和權限等信息),其getFileStatus()方法提供了獲取某個給定文件或目錄的FileStatus對象的途徑,以下所示:

複製代碼
 1 public class ShowFileStatusTest {
 2     private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing (這個類在最新的Hadoop1.0.4中已經被廢棄了)
 3                                     
 4     private FileSystem fs;
 5 
 6     @Before
 7     public void setUp() throws IOException {
 8         Configuration conf = new Configuration();
 9         if (System.getProperty("test.build.data") == null) {
10             System.setProperty("test.build.data", "/tmp");
11         }
12         cluster = new MiniDFSCluster(conf, 1, true, null);
13         fs = cluster.getFileSystem();
14         OutputStream out = fs.create(new Path("/dir/file"));
15         out.write("content".getBytes("UTF-8"));
16         out.close();
17     }
18 
19     @After
20     public void tearDown() throws IOException {
21         if (fs != null) {
22             fs.close();
23         }
24         if (cluster != null) {
25             cluster.shutdown();
26         }
27     }
28 
29     @Test(expected = FileNotFoundException.class)
30     public void throwsFileNotFoundForNonExistentFile() throws IOException {
31         fs.getFileStatus(new Path("no-such-file"));
32     }
33 
34     @Test
35     public void fileStatusForFile() throws IOException {
36         Path file = new Path("/dir/file");
37         FileStatus stat = fs.getFileStatus(file);
38         assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
39         assertThat(stat.isDir(), is(false));
40         assertThat(stat.getLen(), is(7L));
41         assertThat(stat.getModificationTime(),
42                 is(lessThanOrEqualTo(System.currentTimeMillis())));
43         assertThat(stat.getReplication(), is((short) 1));
44         assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
45         assertThat(stat.getOwner(), is("tom"));
46         assertThat(stat.getGroup(), is("supergroup"));
47         assertThat(stat.getPermission().toString(), is("rw-r--r--"));
48     }
49 
50     @Test
51     public void fileStatusForDirectory() throws IOException {
52         Path dir = new Path("/dir");
53         FileStatus stat = fs.getFileStatus(dir);
54         assertThat(stat.getPath().toUri().getPath(), is("/dir"));
55         assertThat(stat.isDir(), is(true));
56         assertThat(stat.getLen(), is(0L));
57         assertThat(stat.getModificationTime(),
58                 is(lessThanOrEqualTo(System.currentTimeMillis())));
59         assertThat(stat.getReplication(), is((short) 0));
60         assertThat(stat.getBlockSize(), is(0L));
61         assertThat(stat.getOwner(), is("tom"));
62         assertThat(stat.getGroup(), is("supergroup"));
63         assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
64     }
65 }
複製代碼

Listing files

  除了從某個單一文件或目錄獲取文件信息之外,你可能還須要列出某個目錄中的全部文件,這就要使用FileSystem的listStatus()方法了:

public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

  當傳入參數是一個文件時,它獲取此文件的FileStatus對象,當傳入文件是目錄時,它返回零個或多個FileStatus對象,分別表明該目錄下全部文件的對應信息。
  重載後的函數容許你指定一個PathFilter來進一步限定要匹配的文件或目錄。
  下面咱們使用listStatus()方法得到參數中指定的文件(能夠有多個)的元數據信息,存放在一個FIleStatus數組中,再使用stat2Paths()方法吧FileStatus數組轉化爲Path數組,最後打印出文件名來:

複製代碼
 1 public class ListStatus {
 2     public static void main(String[] args) throws Exception {
 3         String uri = args[0];
 4         Configuration conf = new Configuration();
 5         FileSystem fs = FileSystem.get(URI.create(uri), conf);
 6         Path[] paths = new Path[args.length];
 7         for (int i = 0; i < paths.length; i++) {
 8             paths[i] = new Path(args[i]);
 9         }
10         FileStatus[] status = fs.listStatus(paths);
11         Path[] listedPaths = FileUtil.stat2Paths(status);
12         for (Path p : listedPaths) {
13             System.out.println(p);
14         }
15     }
16 }
複製代碼

   運行結果以下:

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt

文件模式

  在某個單一操做中處理一些列文件是很常見的。例如一個日誌處理的MapReduce做業可能要分析一個月的日誌量。若是一個文件一個文件或者一個目錄一個目錄的聲明那就太麻煩了,咱們可使用通配符(wild card)來匹配多個文件(這個操做也叫作globbing)。Hadoop提供了兩種方法來處理文件組:

public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

  globStatus()方法返回匹配文件模式的多個文件的FileStatus數組(以Path排序)。一個可選的PathFilter能夠用來進一步限制匹配模式。Hadoop中的匹配符與Unix中bash相同,以下所示:  

  假設某個日誌文件的組織結構以下:

  則對應於該組織結構有以下表示:

PathFilter

  使用文件模式有時候並不能有效的描述你想要的一系列文件,例如若是你想排除某個特定文件就很難。因此FileSystem的listStatus()和globStatus()方法就提供了一個可選參數:PathFilter——它容許你一些更細化的控制匹配:

package org.apache.hadoop.fs;
public interface PathFilter {
  boolean accept(Path path);
}

  PathFilter的做用就像java.io.FileFilter,只不過前者針對Path對象,然後者針對File對象。下面咱們用PathFIlter來排除一個符合給定正則表達式的文件:  

複製代碼
 1 public class RegexExcludePathFilter implements PathFilter {
 2     private final String regex;
 3 
 4     public RegexExcludePathFilter(String regex) {
 5         this.regex = regex;
 6     }
 7 
 8     public boolean accept(Path path) {
 9         return !path.toString().matches(regex);
10     }
11 }
複製代碼

  RegexExcludePathFilter只讓不匹配(具體參見accept方法的實現)給定正則表達式的文件經過,咱們經過文件模式(file pattern)獲得所需的文件集後,再用RegexExcludePathFilter來過濾掉咱們不須要的文件:
    fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31$"))
  這樣咱們就獲得:/2007/12/30

  注意:Filter只能根據文件名來過濾文件,是不能經過文件的屬性(如修改時間,文件全部者等)來過濾文件的。但它仍然提供了文件模式和正則表達式所不能提供的功能。

刪除數據

  使用FIleSystem的delete()方法能夠永久的刪除一個文件或目錄:
    public boolean delete(Path f, boolean recursive) throws IOException
  若是傳入的Path f是一個文件或者空目錄,recursive的值會被忽略掉。當recursive值爲true時,給定的非空目錄連同其內容會被一併刪除掉。

出處:http://www.cnblogs.com/beanmoon/archive/2012/12/11/2813235.html

相關文章
相關標籤/搜索