package com.java.api.hdfs;java
import java.io.BufferedReader;apache
import java.io.IOException;api
import java.io.InputStream;數組
import java.io.InputStreamReader;oop
import org.apache.hadoop.conf.Configuration;spa
import org.apache.hadoop.fs.FSDataInputStream;utf-8
import org.apache.hadoop.fs.FSDataOutputStream;hadoop
import org.apache.hadoop.fs.FileStatus;get
import org.apache.hadoop.fs.FileSystem;文件上傳
import org.apache.hadoop.fs.Path;
public class OperaHDFS {
public static void main(String[] args) throws Exception {
String hdfspath = "hdfs://192.168.4.150:9000/testdata2";
String hdfspath2 = "hdfs://192.168.4.150:9000/testdata3";
String localpath = "D:\\hdfstongbu\\readme.txt";
String content = "wo shi zhong guo ren ";
//downloadFileorDirectoryOnHDFS(hdfspath,localpath);
//uploadLocalFile2HDFS(localpath, hdfspath);
//createNewHDFSFile(hdfspath,content);
//deleteHDFSFile(hdfspath);
//readHDFSFile(hdfspath);
//mkdir(hdfspath);
//deleteDir(hdfspath);
//readHDFSListAll(hdfspath);
renameFileOrDirectoryOnHDFS(hdfspath,hdfspath2);
}
//文件下載
public static void downloadFileorDirectoryOnHDFS(String d1, String d2) throws Exception {
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path p1 = new Path(d1);
Path p2 = new Path(d2);
fs.copyToLocalFile(p1, p2);
fs.close();
System.out.println("文件下載完畢!!");
}
/*
* upload the local file to the hds notice that the path is full like
* /tmp/test.c
*/
//文件上傳
public static void uploadLocalFile2HDFS(String s, String d) throws IOException {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
Path src = new Path(s);
Path dst = new Path(d);
hdfs.copyFromLocalFile(src, dst);
hdfs.close();
System.out.println("文件上傳成功!!");
}
/*
* create a new file in the hdfs. notice that the toCreateFilePath is the
* full path and write the content to the hdfs file.
*/
// 建立新文件,並寫入
public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOException {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));
os.write(content.getBytes("UTF-8"));
os.close();
hdfs.close();
System.out.println("文件建立成功!!");
}
/*
* delete the hdfs file notice that the dst is the full path name
*/
//刪除文件
public static boolean deleteHDFSFile(String dst) throws IOException {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
Path path = new Path(dst);
boolean isDeleted = hdfs.delete(path);
hdfs.close();
System.out.println("文件刪除成功!!");
return isDeleted;
}
/*
* read the hdfs file content notice that the dst is the full path name
*/
//讀取文件
public static byte[] readHDFSFile(String dst) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// check if the file exists
Path path = new Path(dst);
if (fs.exists(path)) {
FSDataInputStream is = fs.open(path);
// get the file info to create the buffer
FileStatus stat = fs.getFileStatus(path);
// create the buffer
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
//buffer 字節數組類型
System.out.println(new String(buffer,"utf-8"));
return buffer;
} else {
throw new Exception("the file is not found .");
}
}
/*
* make a new dir in the hdfs
*
* the dir may like '/tmp/testdir'
*/
//建立目錄
public static void mkdir(String dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path(dir));
fs.close();
System.out.println("目錄建立成功!!!");
}
/*
* delete a dir in the hdfs
*
* dir may like '/tmp/testdir'
*/
//刪除目錄
public static void deleteDir(String dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(dir));
fs.close();
System.out.println("目錄刪除成功!!!");
}
// 讀取某個目錄下的全部文件
public static void readHDFSListAll(String dir) throws Exception {
Configuration conf = new Configuration();
InputStream in = null;
FileSystem hdfs = FileSystem.get(conf);
BufferedReader buff = null;
Path listf = new Path(dir);
FileStatus stats[] = hdfs.listStatus(listf);
int j = 0;
for (int i = 0; i < stats.length; i++) {
FileStatus temp[] = hdfs.listStatus(new Path(stats[i].getPath().toString()));
for (int k = 0; k < temp.length; k++) {
System.out.println("路徑是:" + temp[k].getPath().toString());
Path p = new Path(temp[k].getPath().toString());
in = hdfs.open(p);
buff = new BufferedReader(new InputStreamReader(in));
String str = null;
while ((str = buff.readLine()) != null) {
System.out.println(str);
}
buff.close();
in.close();
}
}
hdfs.close();
}
// 目錄或文件重命名
public static void renameFileOrDirectoryOnHDFS(String path1,String path2) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path p1 = new Path(path1);
Path p2 = new Path(path2);
fs.rename(p1, p2);
fs.close();//
System.out.println("文件重命名成功!!!");
}
}