【Hadoop】- HDFS Java客戶端操做

開發工具:eclipse + maven + jdk1.8java


代碼node

package com.zhiwei.hdfs;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;

/**
 *  問題:Permission Denied
 *  設置hadoop目錄的訪問權限:hdfs dfs -chmod -R 777 hadoop目錄路徑 
 *  Hadoop版本:hadoop-2.7.3
 */
public class HdfsClient {

	private static String hdfsPath = "hdfs://192.168.204.129:9090";
	private static String prefix = "hdfs://";
	private static String targetHost = "localhost";
	private static String targetPort = "9090";
	private static Configuration conf = new Configuration();
	private static FileSystem fileSystem = null;
	private HdfsClient(){}
	
	/**
	 * HDFS客戶端初始化
	 * @param host
	 * @param port
	 */
	public static void initClient(String host,String port) {
			
		initClient(host,port,"root");
	}
	
	public static void initClient(String host,String port, String user) {
 		
		try {
		
			targetHost = host;
			targetPort = port;
			
			try {
				
				//指定用戶名鏈接HDFS
				fileSystem = FileSystem.get(URI.create(prefix + targetHost + ":" + targetPort), conf, user);
			
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 獲取HDFS集羣DataNode節點信息
	 * Xceivers : 指datanode當前用於傳輸數據的線程數
	 * @return
	 */
	public static DatanodeInfo[] getDatanodeInfos(){
		
		DatanodeInfo[] datanodeInfos= null;
		try {
			
			DistributedFileSystem dbfs = (DistributedFileSystem) fileSystem;
			datanodeInfos = dbfs.getDataNodeStats();
		
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
		return datanodeInfos;
	}
	
	/**
	 * 判斷HDFS文件是否存在
	 * @param hdfsFile
	 * @return
	 */
	public static boolean isFileExist(String hdfsFile){
       
		boolean isSuccess = false;
		try {
			isSuccess = fileSystem.exists(new Path(hdfsFile));
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return isSuccess;
	}
	
	/**
	 * 獲取HDFS目錄下的全部文件信息
	 * @param hdfsFileDir
	 * @return
	 */
	public static FileStatus[] getFilesByDir(String hdfsFileDir){
		
        FileStatus[] fileStatus = null;
        try {
			
			fileSystem = FileSystem.get(URI.create(hdfsPath),conf);
			fileStatus = fileSystem.listStatus(new Path(hdfsFileDir));
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
        return fileStatus;
	}
	
	 /**
	  * HDFS建立目錄(遞歸建立)
	  * @param path
	  * @throws IOException
	  */
    public static boolean makeHdfsDir(String hdfsFileDir){
    	
        boolean isSuccess = false;
		try {
			isSuccess = fileSystem.mkdirs(new Path(hdfsFileDir));
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return isSuccess;
    }
    
    
    public static boolean deleteHdfsFile(String hdfsFilePath) {
    	return deleteHdfsFile(hdfsFilePath,true);
    }
    
    /**
	  * 刪除HDFS文件
	  * @param hdfsFilePath HDFS文件路徑
	  * @param isRecursive 是否遞歸刪除
	  */
   public static boolean deleteHdfsFile(String hdfsFilePath, Boolean isRecursive){
   	
       boolean isSuccess = false;
		try {
			isSuccess = fileSystem.delete(new Path(hdfsFilePath),isRecursive);
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return isSuccess;
   }
	
	 /**
	  * 讀取HDFS文件內容
	  * @param hdfsFilePath
	 * @throws IOException 
	  */
    public static byte[] readHdfsFile(String hdfsFilePath) throws IOException{
    	
    	FSDataInputStream fis = null;
    	 byte[] data = null;
    	try {
    		fis = fileSystem.open(new Path(hdfsFilePath));
            data = new byte[fis.available()];
            fis.read(data, 0, fis.available());
        } finally {
        	IOUtils.closeStream(fis);
        }
        return data;
    }
	
	/**
	 * 重命名HDFS文件
	 * @param oldName 源文件名:全路徑
	 * @param newName 目標文件名:全路徑
	 * @return
	 */
	public static boolean renameHdfsFile(String oldName,String newName){
		
			try {
				 fileSystem.rename(new Path(oldName), new Path(newName));
				
			} catch (IOException e) {
				e.printStackTrace();
				return false;
			}
	       return true;
	}
	
	/**
	 * 將信息寫入HDFS新文件中保存
	 * @param dest HDFS新文件路徑
	 * @param content 信息字節數組
	 * @return
	 */
	public static boolean writeInfoToHdfsFile(String dest,byte[] content){
		
		    FSDataOutputStream fsDataOutputStream = null;
	        try {
	        	
	        	fsDataOutputStream = fileSystem.create(new Path(dest));
	        	fsDataOutputStream.write(content);
	        	fsDataOutputStream.flush();
	        	
			} catch (IOException e) {
				e.printStackTrace();
				return false;
			}finally {
				IOUtils.closeStream(fsDataOutputStream);
			}
	        return true;
	}

	/**
	 * HDFS默認文件文件上傳方法
	 * @param src 源文件地址
	 * @param dest hdfs文件地址
	 * @return 狀態
	 */
	public static boolean uploadLocalFileToHDFS(String src,String dest){
		return uploadLocalFileToHDFS(false, false, src, dest);
	}
	
	/**
	 * 上傳本地文件到Hadoop的HDFS文件系統
	 * @param delSrc:是否刪除源文件:默認不刪除
	 * @param override:是否覆蓋同名文件:默認不覆蓋
	 * @param src 本地文件全路徑
	 * @param dest hadoop HDFS文件系統全路徑
	 * @return 
	 */
	public static boolean uploadLocalFileToHDFS(boolean delSrc,boolean override,String src,String dest){

		try {
			
			//注意:目標地址能夠寫全路徑,若是不寫則默認在當前訪問的用戶主目錄下操做
			fileSystem.copyFromLocalFile(delSrc,override,new Path(src), new Path(dest));
			
	
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}
	
	/**
	 * 關閉HDFS客戶端
	 */
	public static void close() {
		
		if(fileSystem != null ) {
			try {
				fileSystem.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}

測試代碼:apache

package com.zhiwei.hdfs;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HdfsClientTest {

	@Before
	public void init() {
		
		System.setProperty("hadoop.home.dir", "D:\\Tools\\hadoop-2.7.3");
	}
	
	/**
	 * 獲取HDFS節點信息
	 * @throws Exception
	 */
	@Test
	public void getDatanodeInfosTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		DatanodeInfo[] datanodeInfos = HdfsClient.getDatanodeInfos();
		
		for(DatanodeInfo datanodeInfo : datanodeInfos) {
			
			System.out.println("節點主機名:" + datanodeInfo.getHostName());
			System.out.println("節點Http訪問端口:" + datanodeInfo.getInfoPort());
			System.out.println("節點IPC訪問端口:" + datanodeInfo.getIpcPort());
			System.out.println("節點已用緩存:" + datanodeInfo.getCacheUsedPercent());
		}

	}
	
	/**
	 * 判斷文件是否存在
	 * @throws Exception
	 */
	@Test
	public void isFileExistTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println(HdfsClient.isFileExist("/data"));
	}
	
	/**
	 * 獲取目錄下的文件列表
	 * @throws Exception
	 */
	@Test
	public void getFilesByDirTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		FileStatus[] fStatus = HdfsClient.getFilesByDir("/data");
		
		for(FileStatus fs : fStatus) {
			System.out.println("子文件路徑:" + fs.getPath() 
			                 + ", " + "子文件屬組:" + fs.getGroup() 
			                 + ", 文件屬主: " + fs.getOwner());
		}
	}
	
	/**
	 * HDFS建立目錄
	 * @throws Exception
	 */
	@Test
	public void makeHdfsDirTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println("文件建立成功: " + HdfsClient.makeHdfsDir("/data/test"));
	}
	
	
	/**
	 * HDFS刪除目錄
	 * @throws Exception
	 */
	@Test
	public void deleteHdfsFileTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println("文件刪除成功: " + HdfsClient.deleteHdfsFile("/data/test",true));
	}
	
	
	/**
	 * 讀取HDFS文件
	 * @throws Exception
	 */
	@Test
	public void readHdfsFileTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println("HDFS文件內容: " + Bytes.toString(HdfsClient.readHdfsFile("/data/mapreduce/output/part-r-00000")));
	}
	
	/**
	 * 讀取文件重命名
	 * @throws Exception
	 */
	@Test
	public void renameHdfsFileTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println("文件重命名成功: " + HdfsClient.renameHdfsFile("/data/mapreduce/output/test","/data/mapreduce/output/test1"));
	}
	
	/**
	 * 將數據寫入HDFS
	 * @throws Exception
	 */
	@Test
	public void writeInfoToHdfsFileTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println("數據寫入HDFS: " + HdfsClient.writeInfoToHdfsFile("/data/Test","/data/mapreduce/output/test1".getBytes()));
	}
	
	/**
	 * 文件上傳HDFS
	 * @throws Exception
	 */
	@Test
	public void uploadLocalFileToHDFSTest() throws Exception {

		HdfsClient.initClient("192.168.204.129", "9090", "squirrel");

		System.out.println("文件上傳HDFS: " + HdfsClient.uploadLocalFileToHDFS(true,true,"d://temp/test.txt","/data/Test"));
	}
	
	@After
	public void close() {
		
		HdfsClient.close();
	}
}

maven 配置json

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.zhiwei</groupId>
  <artifactId>hadoop</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>Hadoop</name>
  <url>http://maven.apache.org</url>
  
   <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <hadoop.version>2.7.3</hadoop.version>
        <hbase.version>1.2.6</hbase.version>
        <hive.version>2.3.1</hive.version>
        <zookeeper.version>3.4.8</zookeeper.version>
        <curator.version>4.0.0</curator.version>
        <fastjson.version>1.2.41</fastjson.version>
        <mahout.version>0.13.0</mahout.version>
        <kafka.version>0.11.0.2</kafka.version>
        <zkclient.version>0.10</zkclient.version>
        <junit.version>4.12</junit.version>
    </properties>

  <dependencies>
    <!-- 配置Zookeeper -->
    <dependency>
		<groupId>org.apache.zookeeper</groupId>
		<artifactId>zookeeper</artifactId>
		<version>${zookeeper.version}</version>
	</dependency>
	
	<!-- Netflix Zookeeper組件 -->
	<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>${curator.version}</version>
	</dependency>
	
	<!-- Netflix Zookeeper組件 -->
	<dependency>
	    <groupId>com.101tec</groupId>
	    <artifactId>zkclient</artifactId>
	    <version>${zkclient.version}</version>
    </dependency>
	
	<!-- Hadoop -->
	 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        
        <!-- Hbase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
	     <groupId>org.apache.hbase</groupId>
	     <artifactId>hbase-server</artifactId>
	     <version>${hbase.version}</version>
	   </dependency>
	   
	   <!-- hive -->
       <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-jdbc</artifactId>
          <version>${hive.version}</version>
       </dependency>
	   <dependency>
	     <groupId>org.apache.hive</groupId>
	     <artifactId>hive-exec</artifactId>
	     <version>${hive.version}</version>
	   </dependency>
		<dependency>
		    <groupId>org.apache.hive</groupId>
		    <artifactId>hive-metastore</artifactId>
		    <version>${hive.version}</version>
		</dependency>
	   
	   
	   <!-- Kafka -->
	  <!--  <dependency>
		    <groupId>org.apache.kafka</groupId>
		    <artifactId>kafka-clients</artifactId>
		    <version>${kafka.version></version>
		</dependency> -->
		
		<!-- mahout -->
		<dependency>
		    <groupId>org.apache.mahout</groupId>
		    <artifactId>mahout-math</artifactId>
		    <version>${mahout.version}</version>
		</dependency>
		<dependency>
		    <groupId>org.apache.mahout</groupId>
		    <artifactId>mahout-hdfs</artifactId>
		    <version>${mahout.version}</version>
		</dependency>
	   
	   <!-- Alibaba FastJson -->
	   <dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>fastjson</artifactId>
		    <version>${fastjson.version}</version>
		</dependency>
		
		<!-- 配置JUNIT -->
        <dependency>
		    <groupId>junit</groupId>
		    <artifactId>junit</artifactId>
		    <version>${junit.version}</version>
		</dependency>  	
		
		 <!-- 覆蓋默認Guava(hive)版本,防止出現Guava版本衝突問題 -->
	   <dependency>
	      <groupId>com.google.guava</groupId>
	      <artifactId>guava</artifactId>
	      <version>11.0.2</version>
	    </dependency>
	</dependencies>

	<!-- 指定maven項目的JDK版本 -->
	<build>  
	    <plugins>  
	      <plugin>  
	        <groupId>org.apache.maven.plugins</groupId>  
	        <artifactId>maven-compiler-plugin</artifactId>  
	        <configuration>  
	          <source>${java.version}</source>  
	          <target>${java.version}</target>  
	        </configuration>  
	      </plugin> 
	    </plugins>  
	</build>  
</project>

注意: hadoop運行其實並不依賴與Hadoop Eclipse插件,Hadoop Eclipse插件只是簡單的封裝Hadoop的配置參數,本質也是經過Hadoop的API訪問的,將HDFS文件系統以樹結構的形式呈現。數組

項目結構:緩存

這裏寫圖片描述

相關文章
相關標籤/搜索