java api 操做hadoop

package com.java.api.hdfs;java


import java.io.BufferedReader;apache

import java.io.IOException;api

import java.io.InputStream;數組

import java.io.InputStreamReader;oop


import org.apache.hadoop.conf.Configuration;spa

import org.apache.hadoop.fs.FSDataInputStream;utf-8

import org.apache.hadoop.fs.FSDataOutputStream;hadoop

import org.apache.hadoop.fs.FileStatus;get

import org.apache.hadoop.fs.FileSystem;文件上傳

import org.apache.hadoop.fs.Path;


public class OperaHDFS {

public static void main(String[] args) throws Exception {

String hdfspath = "hdfs://192.168.4.150:9000/testdata2";

String hdfspath2 = "hdfs://192.168.4.150:9000/testdata3";

String localpath = "D:\\hdfstongbu\\readme.txt";

String content = "wo shi zhong guo ren ";

//downloadFileorDirectoryOnHDFS(hdfspath,localpath);

//uploadLocalFile2HDFS(localpath, hdfspath);

//createNewHDFSFile(hdfspath,content);

//deleteHDFSFile(hdfspath);

//readHDFSFile(hdfspath);

//mkdir(hdfspath);

//deleteDir(hdfspath);

//readHDFSListAll(hdfspath);

renameFileOrDirectoryOnHDFS(hdfspath,hdfspath2);

}


//文件下載

public static void downloadFileorDirectoryOnHDFS(String d1, String d2) throws Exception {

Configuration config = new Configuration();

FileSystem fs = FileSystem.get(config);

Path p1 = new Path(d1);

Path p2 = new Path(d2);

fs.copyToLocalFile(p1, p2);

fs.close();

System.out.println("文件下載完畢!!");

}


/*

* upload the local file to the hds notice that the path is full like

* /tmp/test.c  

*/

//文件上傳

public static void uploadLocalFile2HDFS(String s, String d) throws IOException {

Configuration config = new Configuration();

FileSystem hdfs = FileSystem.get(config);


Path src = new Path(s);

Path dst = new Path(d);


hdfs.copyFromLocalFile(src, dst);


hdfs.close();

System.out.println("文件上傳成功!!");

}


/*

* create a new file in the hdfs. notice that the toCreateFilePath is the

* full path and write the content to the hdfs file.

*/

// 建立新文件,並寫入

public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOException {

Configuration config = new Configuration();

FileSystem hdfs = FileSystem.get(config);


FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));


os.write(content.getBytes("UTF-8"));


os.close();


hdfs.close();

System.out.println("文件建立成功!!");

}


/*

* delete the hdfs file notice that the dst is the full path name

*/

//刪除文件

public static boolean deleteHDFSFile(String dst) throws IOException {

Configuration config = new Configuration();

FileSystem hdfs = FileSystem.get(config);


Path path = new Path(dst);

boolean isDeleted = hdfs.delete(path);


hdfs.close();

System.out.println("文件刪除成功!!");

return isDeleted;

}


/*

* read the hdfs file content notice that the dst is the full path name

*/

//讀取文件

public static byte[] readHDFSFile(String dst) throws Exception {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);


// check if the file exists

Path path = new Path(dst);

if (fs.exists(path)) {

FSDataInputStream is = fs.open(path);

// get the file info to create the buffer

FileStatus stat = fs.getFileStatus(path);


// create the buffer

byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];

is.readFully(0, buffer);


is.close();

fs.close();

//buffer 字節數組類型

System.out.println(new String(buffer,"utf-8"));

return buffer;

} else {

throw new Exception("the file is not found .");

}

}


/*

* make a new dir in the hdfs

* the dir may like '/tmp/testdir'

*/

//建立目錄

public static void mkdir(String dir) throws IOException {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);


fs.mkdirs(new Path(dir));


fs.close();

System.out.println("目錄建立成功!!!");

}


/*

* delete a dir in the hdfs

* dir may like '/tmp/testdir'

*/

//刪除目錄

public static void deleteDir(String dir) throws IOException {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);


fs.delete(new Path(dir));


fs.close();

System.out.println("目錄刪除成功!!!");

}


// 讀取某個目錄下的全部文件

public static void readHDFSListAll(String dir) throws Exception {

Configuration conf = new Configuration();

InputStream in = null;

FileSystem hdfs = FileSystem.get(conf);

BufferedReader buff = null;

Path listf = new Path(dir);

FileStatus stats[] = hdfs.listStatus(listf);

int j = 0;

for (int i = 0; i < stats.length; i++) {


FileStatus temp[] = hdfs.listStatus(new Path(stats[i].getPath().toString()));


for (int k = 0; k < temp.length; k++) {

System.out.println("路徑是:" + temp[k].getPath().toString());

Path p = new Path(temp[k].getPath().toString());

in = hdfs.open(p);

buff = new BufferedReader(new InputStreamReader(in));

String str = null;

while ((str = buff.readLine()) != null) {

System.out.println(str);

}


buff.close();

in.close();

}

}

hdfs.close();

}


// 目錄或文件重命名

public static void renameFileOrDirectoryOnHDFS(String path1,String path2) throws Exception {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

Path p1 = new Path(path1);

Path p2 = new Path(path2);

fs.rename(p1, p2);

fs.close();//

System.out.println("文件重命名成功!!!");

}

}

相關文章
相關標籤/搜索