1)org.apache.hadoop.fs.FileSystem
是一個通用的文件系統API,提供了不一樣文件系統的統一訪問方式。
2)org.apache.hadoop.fs.Path
是Hadoop文件系統中統一的文件或目錄描述,相似於java.io.File對本地文件系統的文件或目錄描述。
3)org.apache.hadoop.conf.Configuration
讀取、解析配置文件(如core-site.xml/hdfs-default.xml/hdfs-site.xml等),或添加配置的工具類
4)org.apache.hadoop.fs.FSDataOutputStream
對Hadoop中數據輸出流的統一封裝
5)org.apache.hadoop.fs.FSDataInputStream
對Hadoop中數據輸入流的統一封裝java
1)構建Configuration對象,讀取並解析相關配置文件
Configuration conf=new Configuration();
2)設置相關屬性
conf.set("fs.defaultFS","hdfs://1IP:9000");
3)獲取特定文件系統實例fs(以HDFS文件系統實例)
FileSystem fs=FileSystem.get(new URI("hdfs://IP:9000"),conf,「hdfs");
4)經過文件系統實例fs進行文件操做(以刪除文件實例)
fs.delete(new Path("/user/liuhl/someWords.txt"));apache
一、新建mave項目:hadoop-hdfs-demo。編程
pom.xml以下:數組
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hadoop.demo</groupId> <artifactId>hadoop-hdfs-demo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> </dependency> </dependencies> </project>
二、新建鏈接hadoop的類:ConnectHadoop maven
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class ConnectHadoop { public static FileSystem getHadoopFileSystem() { FileSystem fs = null; Configuration conf = null; //此時的conf不需任何設置,只需讀取遠程的配置文件便可 conf = new Configuration(); // Hadoop的用戶名,master機器的登陸用戶 String hdfsUserName = "root"; URI hdfsUri = null; try { // HDFS的訪問路徑 hdfsUri = new URI("hdfs://192.168.137.100:9000"); } catch (URISyntaxException e) { e.printStackTrace(); } try { // 根據遠程的NN節點,獲取配置信息,建立HDFS對象 fs = FileSystem.get(hdfsUri,conf,hdfsUserName); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return fs; } }
hdfs://192.168.137.100:9000,是master節點下的core-site.xml的配置
三、測試基本操做類:HadoopHdfsBaseOperation工具
注意:運行過程當中會報如下異常,可是程序能夠運行成功,所說是本地要放一個hadoop的二進制包,而且要填寫HADOOP_HOMEoop
import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Iterator; import java.util.Map; @Slf4j public class HadoopHdfsBaseOperation { public static void main(String[] args){ FileSystem fs = ConnectHadoop.getHadoopFileSystem(); try { //createDir(fs); //deleteDir(fs); //renamePath(fs); //iteratorPath(fs,new Path("/aa")); //showAllConf(); //printHdfsFileContent(fs); //uploadLocalFileToHdfs(fs); //downloadFileFromHdfs(fs); copyInHdfs(fs); }catch (Exception e){ log.error("hdfs error,{}",e); }finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 建立目錄 * @param fs * @return */ public static void createDir(FileSystem fs){ boolean b = false; Path path = new Path("/hzb"); try { // even the path exist,it can also create the path. fs.mkdirs(path); log.info("mkdir success"); } catch (IOException e) { log.error("mkdir error,{}",e); } } /** * 刪除path,參數true至關於rm -r * @param fs * @return */ public static void deleteDir(FileSystem fs){ boolean b = false; Path path = new Path("/xxxx/yyyy"); try { // even the path exist,it can also create the path. fs.delete(path,true); log.info("delete dir success"); } catch (IOException e) { log.error("delete error,{}",e); } } /** * 刪除path,參數true至關於rm -r * @param fs * @return */ public static void renamePath(FileSystem fs){ boolean b = false; Path oldPath = new Path("/xxxx"); Path newPath = new Path("/zzzz"); try { // even the path exist,it can also create the path. fs.rename(oldPath,newPath); log.info("rename path success"); } catch (IOException e) { log.error("rename error,{}",e); } } /** * 遍歷文件夾及子文件 * @param hdfs * @return */ public static void iteratorPath(FileSystem hdfs,Path listPath){ FileStatus[] files; try { files = hdfs.listStatus(listPath); // 實際上並非每一個文件夾都會有文件的。 if(files.length == 0){ // 若是不使用toUri(),獲取的路徑帶URL。 log.info("==>root dir:{}",listPath.toUri().getPath()); }else { // 判斷是否爲文件 for (FileStatus f : files) { if (files.length == 0 || f.isFile()) { log.info("==>file:{}",f.getPath().toUri().getPath()); } else { // 是文件夾,且非空,就繼續遍歷 iteratorPath(hdfs, f.getPath()); } } } } catch (IOException e) { log.error("iteratorPath error,{}",e); } } /** * 讀取遠程hadoop集羣的全部配置文件信息,並以鍵值對打印出來 */ public static void showAllConf(){ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.137.100:9000"); Iterator<Map.Entry<String,String>> it = conf.iterator(); log.info("==================================================如下是遠程hadoop的配置信息=============="); while(it.hasNext()){ Map.Entry<String,String> entry = it.next(); log.info(entry.getKey()+"=" +entry.getValue()); } log.info("==================================================以上是遠程hadoop的配置信息=============="); } /** * 將遠程hdfs中的test/readme.txt內容讀取並打印到console並輸出到E */ public static void printHdfsFileContent(FileSystem hdfs){ try { FSDataInputStream is = hdfs.open(new Path("/test/readme.txt")); OutputStream os = new FileOutputStream(new File("E:/hadooptest/readme.txt")); byte[] buff = new byte[1024]; int length = 0; log.info("遠程的/test/readme.txt內容以下:=======================》"); while ((length = is.read(buff)) != -1) { System.out.println(new String(buff, 0, length)); os.write(buff, 0, length); os.flush(); } } catch (Exception e){ log.error("printHdfsFileContent error,{}",e); } } /** * 文件上傳,將本地的E:/hadooptest/navicat.zip上傳到hdfs的/aa * @param hdfs */ public static void uploadLocalFileToHdfs(FileSystem hdfs){ Path HDFSPath = new Path("/aa"); Path localPath = new Path("E:/hadooptest/navicat.zip"); // 若是上傳的路徑不存在會建立 // 若是該路徑文件已存在,就會覆蓋 try { hdfs.copyFromLocalFile(localPath,HDFSPath); } catch (IOException e) { e.printStackTrace(); log.error("uploadLocalFileToHdfs error,{}",e); } } /** * 文件下載,將hdfs中/aa/navicat.zip文件下載到E:/hadooptest/,通過測試直接使用hdfs.copyToLocalFile下載不下來,全部用文件流來下載 * @param hdfs */ public static void downloadFileFromHdfs(FileSystem hdfs){ // Path HDFSPath = new Path("/aa/navicat.zip"); // Path localPath = new Path("E:/hadooptest/"); // try { // log.info("====================開始下載======================="); // hdfs.copyToLocalFile(HDFSPath,localPath); // log.info("====================下載結束======================="); // } catch (IOException e) { // e.printStackTrace(); // log.error("downloadFileFromHdfs error,{}",e); // } try { FSDataInputStream ifs = hdfs.open(new Path("/aa/navicat.zip")); OutputStream os = new FileOutputStream(new File("E:/hadooptest/navicat.zip")); byte[] buff = new byte[1024]; int length = 0; log.info("============開始下載=======================》"); while ((length = ifs.read(buff)) != -1) { os.write(buff, 0, length); os.flush(); } } catch (Exception e){ log.error("printHdfsFileContent error,{}",e); } } /** * 在hdfs內部之間複製文件 * 使用FSDataInputStream來打開文件open(Path p) * 使用FSDataOutputStream開建立寫到的路徑create(Path p) * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)來進行具體的讀寫 * 說明: * 1.java中使用緩衝區來加速讀取文件,這裏也使用了緩衝區,可是隻要指定緩衝區大小便可,沒必要單獨設置一個新的數組來接受 * 2.最後一個布爾值表示是否使用完後關閉讀寫流。一般是false,若是不手動關會報錯的 * @param hdfs */ public static void copyInHdfs(FileSystem hdfs){ Path inPath = new Path("/aa/navicat.zip"); Path outPath = new Path("/test/navicat.zip"); FSDataInputStream hdfsIn = null; FSDataOutputStream hdfsOut = null; try { hdfsIn = hdfs.open(inPath); hdfsOut = hdfs.create(outPath); IOUtils.copyBytes(hdfsIn,hdfsOut,1024*1024*64,false); } catch (IOException e) { log.error("copyInHdfs error,{}",e); } } }