使用FileSystem API讀取數據

如前一小節所解釋的,有時不能在應用中設置URLStreamHandlerFactory。這時,咱們須要用FileSystem API來打開一個文件的輸入流。java

文件在Hadoop文件系統中顯示爲一個Hadoop Path對象(不是一個java.io.File對象,由於它的語義與本地文件系統關聯太緊密)。咱們能夠把一個路徑視爲一個Hadoop文件系統URI,如hdfs://localhost/user/tom/quangle.txtapache

FileSystem是一個普通的文件系統API,因此首要任務是檢索咱們要用的文件系統實例,這裏是HDFS。取得FileSystem實例有兩種靜態工廠方法:數組

1.  public static FileSystem get(Configuration conf) 
throws IOException  
安全

2.  ublic static FileSystem get(URI uri, 
Configuration conf) throws IOException 
服務器

Configuration對象封裝了一個客戶端或服務器的配置,這是用從類路徑讀取而來的配置文件(conf/core-site.xml)來設置的。第一個方法返回的是默認文件系統(conf/core-site.xml中設置的,若是沒有設置過,則是默認的本地文件系統)。第二個方法使用指定的URI方案及決定所用文件系統的權限,若是指定URI中沒有指定方案,則退回默認的文件系統。ide

有了FileSystem實例後,咱們調用open()來獲得一個文件的輸入流:函數

1.  public FSDataInputStream open(Path f) throws IOException  oop

2.  ublic abstract FSDataInputStream open(Path f, 
int bufferSize) throws IOException 
spa

第一個方法使用默認4 KB的緩衝大小。線程

將它們合在一塊兒,咱們能夠在例3-2中重寫例3-1

3-2:直接使用FileSystem以標準輸出格式顯示Hadoop文件系統的文件

1.  public class FileSystemCat {  

2.    public static void main(String[] args) throws Exception {  

3.      String uri = args[0];  

4.      Configuration conf = new Configuration();  

5.      FileSystem fs = FileSystem.get(URI.create(uri), conf);  

6.      InputStream in = null;  

7.      try {  

8.        in = fs.open(new Path(uri));  

9.        IOUtils.copyBytes(in, System.out, 4096, false);  

10.     } finally {  

11.       IOUtils.closeStream(in);  

12.     }  

13.   }  

14.

程序運行結果以下:

1.      % hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt  

2.  On the top of the Crumpetty Tree  

3.  The Quangle Wangle sat,  

4.  But his face you could not see,  

5.  On account of his Beaver Hat.  

6.  FSDataInputStream 

FileSystem中的open()方法實際上返回的是一個FSDataInputStream,而不是標準的java.io類。這個類是java.io.DataInputStream的一個子類,支持隨機訪問,這樣就能夠從流的任意位置讀取數據了。

1.      package org.apache.hadoop.fs;  

2.   

3.  public class FSDataInputStream extends DataInputStream  

4.       implements Seekable, PositionedReadable {  

5.       // implementation elided  

6. 

Seekable接口容許在文件中定位,並提供一個查詢方法,用於查詢當前位置相對於文件開始處的偏移量(getPos())

1.  public interface Seekable {  

2.    void seek(long pos) throws IOException;  

3.    long getPos() throws IOException;  

4.    boolean seekToNewSource(long targetPos) throws IOException;  

5. 

調用seek()來定位大於文件長度的位置會致使IOException異常。與java.io.InputStream中的skip()不一樣,seek()並無指出數據流當前位置以後的一點,它能夠移到文件中任意一個絕對位置。

應用程序開發人員並不經常使用seekToNewSource()方法。此方法通常傾向於切換到數據的另外一個副本並在新的副本中尋找targetPos指定的位置。HDFS內部就採用這樣的方法在數據節點故障時爲客戶端提供可靠的數據輸入流。

3-3是例3-2的簡單擴展,它將一個文件兩次寫入標準輸出:在寫一次後,定位到文件的開頭再次讀入數據流。

3-3:經過使用seek兩次以標準輸出格式顯示Hadoop文件系統的文件

1.  public class FileSystemDoubleCat {  

2.   

3.    public static void main(String[] args) throws Exception {  

4.      String uri = args[0];  

5.      Configuration conf = new Configuration();  

6.      FileSystem fs = FileSystem.get(URI.create(uri), conf);  

7.      FSDataInputStream in = null;  

8.      try {  

9.        in = fs.open(new Path(uri));  

10.       IOUtils.copyBytes(in, System.out, 4096, false);  

11.       in.seek(0); // go back to the start of the file  

12.       IOUtils.copyBytes(in, System.out, 4096, false);  

13.     } finally {  

14.       IOUtils.closeStream(in);  

15.     }  

16.   }  

17.

在一個小文件上運行獲得如下結果:

1.      % hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt  

2.  On the top of the Crumpetty Tree  

3.  The Quangle Wangle sat,  

4.  But his face you could not see,  

5.  On account of his Beaver Hat.  

6.  On the top of the Crumpetty Tree  

7.  The Quangle Wangle sat,  

8.  But his face you could not see,  

9.  On account of his Beaver Hat. 

FSDataInputStream也實現了PositionedReadable接口,從一個指定位置讀取一部分數據:

1.  public interface PositionedReadable {  

2.   

3.    public int read(long position, byte[] buffer, 
int offset, int length)  

4.          throws IOException;  

5.   

6.    public void readFully(long position, byte[] 
buffer, int offset, int length)  

7.          throws IOException;  

8.   

9.    public void readFully(long position, byte[] 
buffer) throws IOException;  

10.

read()方法從指定position讀取指定長度的字節放入緩衝buffer的指定偏離量offset。返回值是實際讀到的字節數:調用者須要檢查這個值,它有可能小於指定的長度。readFully()方法會讀出指定字節由length指定的數據到buffer中或在只接受buffer字節數組的版本中,再讀取buffer.length字節(這兒指的是第三個函數),若已經到文件末,將會拋出EOFException

全部這些方法會保留文件當前位置而且是線程安全的,所以它們提供了在讀取文件(多是元數據)的主要部分時訪問其餘部分的便利方法。其實,這只是使用Seekable接口的實現,格式以下:

1.      long oldPos = getPos();  

2.  try {  

3.    seek(position);  

4.    // read data  

5.  } finally {  

6.    seek(oldPos);  

7. 

最後務必牢記,seek()是一個相對高開銷的操做,須要慎重使用。咱們須要依靠流數據構建應用訪問模式(如使用MapReduce),而不要大量執行seek操做。

更多分享請關注:bbs.superwu.cn

相關文章
相關標籤/搜索