HDFS經常使用API操做 和 HDFS的I/O流操做

前置操做

建立maven工程,修改pom.xml文件:java

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mcq</groupId>
  <artifactId>HDFS-001</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.7.2</version>
		</dependency>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
</dependencies>

</project>

在resources添加一個file:log4j.properties:linux

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

 

API操做

HDFS的命令和linux極其類似,能夠類比記憶,在這裏列出一些java api操做:spring

package com.mcq;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Test;

public class HDFSClient {
	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		// c.set("fs.defaultFS", "hdfs://hadoop103:9000");
		// FileSystem fs = FileSystem.get(c);
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.mkdirs(new Path("/ppqq"));
		fs.close();
		System.out.println("over");
	}

	@Test // 文件上傳
	public void testCopyFromLocalFile()
			throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));
		fs.close();
		System.out.println("over");
	}

	@Test // 文件下載
	public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);
		// 第一個false表示不剪切,最後一個true表示本地,不產生crc文件

		fs.close();
		System.out.println("over");
	}

	@Test // 文件刪除
	public void testDelete() throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.delete(new Path("/0811"), true); // 是否遞歸刪除
		fs.close();
		System.out.println("over");
	}

	@Test // 文件改名
	public void testRename() throws IOException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
		fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));
		fs.close();
		System.out.println("over");
	}

	@Test
	public void testListFiles() throws IOException, InterruptedException, URISyntaxException {

		// 1獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");

		// 2 獲取文件詳情
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

		while (listFiles.hasNext()) {
			LocatedFileStatus status = listFiles.next();

			// 輸出詳情
			// 文件名稱
			System.out.println(status.getPath().getName());
			// 長度
			System.out.println(status.getLen());
			// 權限
			System.out.println(status.getPermission());
			// 分組
			System.out.println(status.getGroup());

			// 獲取存儲的塊信息
			BlockLocation[] blockLocations = status.getBlockLocations();

			for (BlockLocation blockLocation : blockLocations) {

				// 獲取塊存儲的主機節點
				String[] hosts = blockLocation.getHosts();

				for (String host : hosts) {
					System.out.println(host);
				}
			}

			System.out.println("-----------分割線----------");
		}

		// 3 關閉資源
		fs.close();
	}
	
	@Test
	public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
			
		// 1 獲取文件配置信息
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 判斷是文件仍是文件夾
		FileStatus[] listStatus = fs.listStatus(new Path("/"));
			
		for (FileStatus fileStatus : listStatus) {
			
			// 若是是文件
			if (fileStatus.isFile()) {
					System.out.println("f:"+fileStatus.getPath().getName());
				}else {
					System.out.println("d:"+fileStatus.getPath().getName());
				}
			}
			
		// 3 關閉資源
		fs.close();
	}
}

 I/O流操做

上面的API操做 HDFS系統都是框架封裝好的,若是咱們想本身實現上述API操做能夠採用IO流的方式實現數據的上傳和下載。apache

 

package com.mcq;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.URL;
import org.junit.Test;

public class HDFSIO {
	//文件上傳
	@Test
	public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");

		// 2 建立輸入流
		FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));

		// 3 獲取輸出流
		FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));

		// 4 流對拷
		IOUtils.copyBytes(fis, fos, configuration);

		// 5 關閉資源
		IOUtils.closeStream(fos);
		IOUtils.closeStream(fis);
		fs.close();
	}
	// 文件下載
	@Test
	public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 獲取輸入流
		FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
			
		// 3 獲取輸出流
		FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));
			
		// 4 流的對拷
		IOUtils.copyBytes(fis, fos, configuration);
			
		// 5 關閉資源
		IOUtils.closeStream(fos);
		IOUtils.closeStream(fis);
		fs.close();
	}
	//定位文件讀取
	//(1)下載第一塊
	@Test
	public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 獲取輸入流
		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
			
		// 3 建立輸出流
		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
			
		// 4 流的拷貝
		byte[] buf = new byte[1024];
			
		for(int i =0 ; i < 1024 * 128; i++){
			fis.read(buf);
			fos.write(buf);
		}
			
		// 5關閉資源
		IOUtils.closeStream(fis);
		IOUtils.closeStream(fos);
	fs.close();
	}
	//(2)下載第二塊
	@Test
	public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{

		// 1 獲取文件系統
		Configuration configuration = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
			
		// 2 打開輸入流
		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
			
		// 3 定位輸入數據位置
		fis.seek(1024*1024*128);
			
		// 4 建立輸出流
		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
			
		// 5 流的對拷
		IOUtils.copyBytes(fis, fos, configuration);
			
		// 6 關閉資源
		IOUtils.closeStream(fis);
		IOUtils.closeStream(fos);
	}
}
相關文章
相關標籤/搜索