開發工具:eclipse + maven + jdk1.8java
代碼node
package com.zhiwei.hdfs; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.io.IOUtils; /** * 問題:Permission Denied * 設置hadoop目錄的訪問權限:hdfs dfs -chmod -R 777 hadoop目錄路徑 * Hadoop版本:hadoop-2.7.3 */ public class HdfsClient { private static String hdfsPath = "hdfs://192.168.204.129:9090"; private static String prefix = "hdfs://"; private static String targetHost = "localhost"; private static String targetPort = "9090"; private static Configuration conf = new Configuration(); private static FileSystem fileSystem = null; private HdfsClient(){} /** * HDFS客戶端初始化 * @param host * @param port */ public static void initClient(String host,String port) { initClient(host,port,"root"); } public static void initClient(String host,String port, String user) { try { targetHost = host; targetPort = port; try { //指定用戶名鏈接HDFS fileSystem = FileSystem.get(URI.create(prefix + targetHost + ":" + targetPort), conf, user); } catch (InterruptedException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } /** * 獲取HDFS集羣DataNode節點信息 * Xceivers : 指datanode當前用於傳輸數據的線程數 * @return */ public static DatanodeInfo[] getDatanodeInfos(){ DatanodeInfo[] datanodeInfos= null; try { DistributedFileSystem dbfs = (DistributedFileSystem) fileSystem; datanodeInfos = dbfs.getDataNodeStats(); } catch (IOException e) { e.printStackTrace(); return null; } return datanodeInfos; } /** * 判斷HDFS文件是否存在 * @param hdfsFile * @return */ public static boolean isFileExist(String hdfsFile){ boolean isSuccess = false; try { isSuccess = fileSystem.exists(new Path(hdfsFile)); } catch (IOException e) { e.printStackTrace(); return false; } return isSuccess; } /** * 獲取HDFS目錄下的全部文件信息 * @param hdfsFileDir * @return */ public static FileStatus[] getFilesByDir(String hdfsFileDir){ FileStatus[] fileStatus = null; try { fileSystem = FileSystem.get(URI.create(hdfsPath),conf); fileStatus = fileSystem.listStatus(new Path(hdfsFileDir)); } catch (IOException e) { e.printStackTrace(); return null; } return fileStatus; } /** * HDFS建立目錄(遞歸建立) * @param path * @throws IOException */ public static boolean makeHdfsDir(String hdfsFileDir){ boolean isSuccess = false; try { isSuccess = fileSystem.mkdirs(new Path(hdfsFileDir)); } catch (IOException e) { e.printStackTrace(); return false; } return isSuccess; } public static boolean deleteHdfsFile(String hdfsFilePath) { return deleteHdfsFile(hdfsFilePath,true); } /** * 刪除HDFS文件 * @param hdfsFilePath HDFS文件路徑 * @param isRecursive 是否遞歸刪除 */ public static boolean deleteHdfsFile(String hdfsFilePath, Boolean isRecursive){ boolean isSuccess = false; try { isSuccess = fileSystem.delete(new Path(hdfsFilePath),isRecursive); } catch (IOException e) { e.printStackTrace(); return false; } return isSuccess; } /** * 讀取HDFS文件內容 * @param hdfsFilePath * @throws IOException */ public static byte[] readHdfsFile(String hdfsFilePath) throws IOException{ FSDataInputStream fis = null; byte[] data = null; try { fis = fileSystem.open(new Path(hdfsFilePath)); data = new byte[fis.available()]; fis.read(data, 0, fis.available()); } finally { IOUtils.closeStream(fis); } return data; } /** * 重命名HDFS文件 * @param oldName 源文件名:全路徑 * @param newName 目標文件名:全路徑 * @return */ public static boolean renameHdfsFile(String oldName,String newName){ try { fileSystem.rename(new Path(oldName), new Path(newName)); } catch (IOException e) { e.printStackTrace(); return false; } return true; } /** * 將信息寫入HDFS新文件中保存 * @param dest HDFS新文件路徑 * @param content 信息字節數組 * @return */ public static boolean writeInfoToHdfsFile(String dest,byte[] content){ FSDataOutputStream fsDataOutputStream = null; try { fsDataOutputStream = fileSystem.create(new Path(dest)); fsDataOutputStream.write(content); fsDataOutputStream.flush(); } catch (IOException e) { e.printStackTrace(); return false; }finally { IOUtils.closeStream(fsDataOutputStream); } return true; } /** * HDFS默認文件文件上傳方法 * @param src 源文件地址 * @param dest hdfs文件地址 * @return 狀態 */ public static boolean uploadLocalFileToHDFS(String src,String dest){ return uploadLocalFileToHDFS(false, false, src, dest); } /** * 上傳本地文件到Hadoop的HDFS文件系統 * @param delSrc:是否刪除源文件:默認不刪除 * @param override:是否覆蓋同名文件:默認不覆蓋 * @param src 本地文件全路徑 * @param dest hadoop HDFS文件系統全路徑 * @return */ public static boolean uploadLocalFileToHDFS(boolean delSrc,boolean override,String src,String dest){ try { //注意:目標地址能夠寫全路徑,若是不寫則默認在當前訪問的用戶主目錄下操做 fileSystem.copyFromLocalFile(delSrc,override,new Path(src), new Path(dest)); } catch (IOException e) { e.printStackTrace(); return false; } return true; } /** * 關閉HDFS客戶端 */ public static void close() { if(fileSystem != null ) { try { fileSystem.close(); } catch (IOException e) { e.printStackTrace(); } } } }
測試代碼:apache
package com.zhiwei.hdfs; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; public class HdfsClientTest { @Before public void init() { System.setProperty("hadoop.home.dir", "D:\\Tools\\hadoop-2.7.3"); } /** * 獲取HDFS節點信息 * @throws Exception */ @Test public void getDatanodeInfosTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); DatanodeInfo[] datanodeInfos = HdfsClient.getDatanodeInfos(); for(DatanodeInfo datanodeInfo : datanodeInfos) { System.out.println("節點主機名:" + datanodeInfo.getHostName()); System.out.println("節點Http訪問端口:" + datanodeInfo.getInfoPort()); System.out.println("節點IPC訪問端口:" + datanodeInfo.getIpcPort()); System.out.println("節點已用緩存:" + datanodeInfo.getCacheUsedPercent()); } } /** * 判斷文件是否存在 * @throws Exception */ @Test public void isFileExistTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println(HdfsClient.isFileExist("/data")); } /** * 獲取目錄下的文件列表 * @throws Exception */ @Test public void getFilesByDirTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); FileStatus[] fStatus = HdfsClient.getFilesByDir("/data"); for(FileStatus fs : fStatus) { System.out.println("子文件路徑:" + fs.getPath() + ", " + "子文件屬組:" + fs.getGroup() + ", 文件屬主: " + fs.getOwner()); } } /** * HDFS建立目錄 * @throws Exception */ @Test public void makeHdfsDirTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println("文件建立成功: " + HdfsClient.makeHdfsDir("/data/test")); } /** * HDFS刪除目錄 * @throws Exception */ @Test public void deleteHdfsFileTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println("文件刪除成功: " + HdfsClient.deleteHdfsFile("/data/test",true)); } /** * 讀取HDFS文件 * @throws Exception */ @Test public void readHdfsFileTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println("HDFS文件內容: " + Bytes.toString(HdfsClient.readHdfsFile("/data/mapreduce/output/part-r-00000"))); } /** * 讀取文件重命名 * @throws Exception */ @Test public void renameHdfsFileTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println("文件重命名成功: " + HdfsClient.renameHdfsFile("/data/mapreduce/output/test","/data/mapreduce/output/test1")); } /** * 將數據寫入HDFS * @throws Exception */ @Test public void writeInfoToHdfsFileTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println("數據寫入HDFS: " + HdfsClient.writeInfoToHdfsFile("/data/Test","/data/mapreduce/output/test1".getBytes())); } /** * 文件上傳HDFS * @throws Exception */ @Test public void uploadLocalFileToHDFSTest() throws Exception { HdfsClient.initClient("192.168.204.129", "9090", "squirrel"); System.out.println("文件上傳HDFS: " + HdfsClient.uploadLocalFileToHDFS(true,true,"d://temp/test.txt","/data/Test")); } @After public void close() { HdfsClient.close(); } }
maven 配置json
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhiwei</groupId> <artifactId>hadoop</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>Hadoop</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <hadoop.version>2.7.3</hadoop.version> <hbase.version>1.2.6</hbase.version> <hive.version>2.3.1</hive.version> <zookeeper.version>3.4.8</zookeeper.version> <curator.version>4.0.0</curator.version> <fastjson.version>1.2.41</fastjson.version> <mahout.version>0.13.0</mahout.version> <kafka.version>0.11.0.2</kafka.version> <zkclient.version>0.10</zkclient.version> <junit.version>4.12</junit.version> </properties> <dependencies> <!-- 配置Zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> </dependency> <!-- Netflix Zookeeper組件 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>${curator.version}</version> </dependency> <!-- Netflix Zookeeper組件 --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>${zkclient.version}</version> </dependency> <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!-- Hbase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <!-- hive --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>${hive.version}</version> </dependency> <!-- Kafka --> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version></version> </dependency> --> <!-- mahout --> <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-math</artifactId> <version>${mahout.version}</version> </dependency> <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-hdfs</artifactId> <version>${mahout.version}</version> </dependency> <!-- Alibaba FastJson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- 配置JUNIT --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <!-- 覆蓋默認Guava(hive)版本,防止出現Guava版本衝突問題 --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>11.0.2</version> </dependency> </dependencies> <!-- 指定maven項目的JDK版本 --> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> </build> </project>
注意: hadoop運行其實並不依賴與Hadoop Eclipse插件,Hadoop Eclipse插件只是簡單的封裝Hadoop的配置參數,本質也是經過Hadoop的API訪問的,將HDFS文件系統以樹結構的形式呈現。數組
項目結構:緩存