注:轉載請署名java
1、實體python
package com.ebd.application.common.Base; import java.util.List; public class HDFSDir { private String id; //自定id private String pid; //父ID private String name; //當前目錄名稱 private String alias; //目錄別名,可不用 private String dir; //自"/"目錄後的完整目錄 private boolean spread; //是否展開(true,false) private List<HDFSDir> children; //子目錄 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAlias() { return alias; } public void setAlias(String alias) { this.alias = alias; } public String getDir() { return dir; } public void setDir(String dir) { this.dir = dir; } public boolean isSpread() { return spread; } public void setSpread(boolean spread) { this.spread = spread; } public List<HDFSDir> getChildren() { return children; } public void setChildren(List<HDFSDir> children) { this.children = children; } }
2、工具類apache
package hdfstest; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import com.ebd.application.common.Base.HDFSDir; import com.ebd.application.common.utils.Identities; import net.sf.json.JSONObject; public class HdfsListTest { //HDFS訪問地址 private static final String HDFS = "hdfs://bigdata.hadoop.com:9000"; public HdfsListTest(Configuration conf) { this(HDFS, conf); } public HdfsListTest(String hdfs, Configuration conf) { this.hdfsPath = hdfs; this.conf = conf; } //hdfs路徑 private String hdfsPath; //Hadoop系統配置 private Configuration conf; //啓動函數 public static void main(String[] args) throws IOException { JobConf conf = config(); // System.out.println(conf.get("hadoop.http.staticuser.user")); // System.out.println(System.getenv("HADOOP_HOME")); HdfsListTest hdfs = new HdfsListTest(conf); // hdfs.mkdirs("/testput"); // hdfs.copyFile("C:\\Users\\Administrator\\Desktop\\testput", "/testput/testput2"); // hdfs.catFile("/testput/testput"); // hdfs.download("/testput/testput", "E:\\"); // hdfs.ls("hdfs://bigdata.hadoop.com:9000/user"); // hdfs.rmr("/testput"); // List<String> fileList = hdfs.getTree("/","/","|-"); List<HDFSDir> kk = new ArrayList<HDFSDir>(); HDFSDir ds1 = new HDFSDir(); HDFSDir ds2 = new HDFSDir(); HDFSDir ds3 = new HDFSDir(); ds1.setId(Identities.uuid()); ds1.setDir("/testput"); ds2.setId(Identities.uuid()); ds2.setDir("/user"); ds3.setId(Identities.uuid()); ds3.setDir("/tmp"); // kk.add(ds1); // kk.add(ds2); // kk.add(ds3); HDFSDir ds = new HDFSDir(); ds.setId(Identities.uuid()); ds.setDir("/"); kk.add(ds); // List<HDFSDir> fileList = hdfs.getListTree("/","/user",0); HDFSDir hdfss = hdfs.getChildNode(ds); JSONObject object = JSONObject.fromObject(hdfss); System.out.println(dirJsonFunc(object.toString())); } //加載Hadoop配置文件 public static JobConf config(){ JobConf conf = new JobConf(HdfsListTest.class); conf.setJobName("HdfsDAO"); conf.addResource("hadoop/core-site.xml"); conf.addResource("hadoop/hdfs-site.xml"); conf.addResource("hadoop/mapred-site.xml"); return conf; } //在根目錄下建立文件夾 public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } //某個文件夾的文件列表 public FileStatus[] ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); if(list != null) for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); // System.out.printf("%s, folder: %s, 大小: %dK\n", f.getPath().getName(), (f.isDir()?"目錄":"文件"), f.getLen()/1024); } System.out.println("=========================================================="); fs.close(); return list; } public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); //remote---/用戶/用戶下的文件或文件夾 fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } public void catFile(String remote) throws IOException { FSDataInputStream instream = null; FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); Path path = new Path(remote); if(fs.isFile(path)){ fs.open(path); instream = fs.open(path); byte[] b = new byte[1024]; instream.read(b); System.out.println(new String(b,"utf-8")); fs.close(); } } List <String> treeList = new ArrayList<String>(); public List<String> getTree(String top, String remote, String prefix) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); if(list != null) for (FileStatus f : list) { // System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); System.out.println(prefix+ f.getPath().getName()); top += f.getPath().getName(); treeList.add(top); if(fs.isDirectory(f.getPath())){ getTree(top,f.getPath().toString(),prefix+"-"); } } return treeList; } int id = 0; static int pid = 0; List<HDFSDir> dirList = new ArrayList<HDFSDir>(); HDFSDir hdfsDir = null; private List<HDFSDir> getListTree(String top, String remote, int pid) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); if(list != null) for (FileStatus f : list) { if(f.isDirectory()){ hdfsDir = new HDFSDir(); // hdfsDir.setId(id++); // hdfsDir.setPid(pid); hdfsDir.setName(f.getPath().getName()); hdfsDir.setAlias(f.getPath().getName()); hdfsDir.setDir(f.getPath().toString().substring(HDFS.length())); hdfsDir.setSpread(false); System.out.println(f.getPath().getName()+"="+f.getPath().toString().substring(HDFS.length())); dirList.add(hdfsDir); } // System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); // System.out.println(prefix+ f.getPath().getName()); // top += f.getPath().getName(); // if(fs.isDirectory(f.getPath())){ // getListTree(top,f.getPath().toString(),pid++); // } } return dirList; } List<HDFSDir> cDirList = null; public HDFSDir getChildNode(HDFSDir pDir) throws IOException{ Path path = null; if(pDir.getChildren() != null && pDir.getChildren().size() >= 1){ for(HDFSDir p : pDir.getChildren()){ path = new Path(p.getDir()); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); if(list != null){ cDirList = new ArrayList<HDFSDir>(); for (FileStatus f : list) { if(f.isDirectory()){ hdfsDir = new HDFSDir(); hdfsDir.setId(Identities.uuid()); hdfsDir.setPid(p.getId()); hdfsDir.setName(f.getPath().getName()); hdfsDir.setAlias(f.getPath().getName()); hdfsDir.setDir(f.getPath().toString().substring(HDFS.length())); hdfsDir.setSpread(false); cDirList.add(hdfsDir); } } p.setChildren(cDirList); for(HDFSDir pp : cDirList){ getChildNode(pp); } } } }else{ path = new Path(pDir.getDir()); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); if(list != null){ cDirList = new ArrayList<HDFSDir>(); for (FileStatus f : list) { if(f.isDirectory()){ hdfsDir = new HDFSDir(); hdfsDir.setId(Identities.uuid()); hdfsDir.setPid(pDir.getId()); hdfsDir.setName(f.getPath().getName().equals("")?"/":f.getPath().getName()); hdfsDir.setAlias(f.getPath().getName().equals("")?"/":f.getPath().getName()); hdfsDir.setDir(f.getPath().toString().substring(HDFS.length())); hdfsDir.setSpread(false); cDirList.add(hdfsDir); } } pDir.setChildren(cDirList); for(HDFSDir pp : cDirList){ getChildNode(pp); } } } return pDir; } public static String dirJsonFunc(String jsonStr) { if (StringUtils.isNotBlank(jsonStr)) { String[] reg_array = {"([\"])","(,['])","([']:)"}; String[] rpa_array = {"'",",",":"}; for(int i=0;i<reg_array.length;i++){ jsonStr = jsonStr.replaceAll(reg_array[i], rpa_array[i]); } jsonStr = jsonStr.replace("{'", "{"); jsonStr = jsonStr.replace("'}", "}"); } return jsonStr; } //刪除文件或文件夾 public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } //下載文件到本地系統 public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyToLocalFile(path, new Path(local)); System.out.println("download: from" + remote + " to " + local); fs.close(); } }
轉換工具類json
package test; import org.apache.commons.lang3.StringUtils; import com.ebd.application.common.Base.HDFSDir; import com.ebd.application.common.utils.Identities; import net.sf.json.JSONArray; public class TestObjectToJson { public static void main(String[] args) { HDFSDir ds = new HDFSDir(); ds.setId(Identities.uuid()); ds.setDir("/testput"); JSONArray js = JSONArray.fromObject(ds); // System.out.println(js.toString()); String jsonStr = js.toString(); // String reg_1 = "([\"])"; //雙引號轉單引號 // String reg_2 = "(,['])"; //去掉逗號後面的單引號 // String reg_3 = "([']:)"; //去掉冒號前面的單引號 // String reg_4 = "('{'['])"; //去掉開頭大括號後面的單引號 // Pattern pattern = Pattern.compile(regEx); // jsonStr = jsonStr.replaceAll(reg_1, "'"); // jsonStr = jsonStr.replaceAll(reg_2, ","); // jsonStr = jsonStr.replaceAll(reg_3, ":"); // jsonStr = jsonStr.replaceAll("{'", "{"); // System.out.println(jsonStr); } public static String dirJsonFunc(String jsonStr) { if (StringUtils.isNotBlank(jsonStr)) { String[] reg_array = {"([\"])","(,['])","([']:)"}; String[] rpa_array = {"'",",",":"}; for(int i=0;i<reg_array.length;i++){ jsonStr = jsonStr.replaceAll(reg_array[i], rpa_array[i]); } jsonStr = jsonStr.replace("{'", "{"); jsonStr = jsonStr.replace("'}", "}"); } return jsonStr; } }
工具類app
package hdfstest; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import com.ebd.application.common.utils.CreateFileUtil; public class HdfsDirConsoleTest { //HDFS訪問地址 private static final String HDFS = "hdfs://bigdata.hadoop.com:9000"; public HdfsDirConsoleTest(Configuration conf) { this(HDFS, conf); } public HdfsDirConsoleTest(String hdfs, Configuration conf) { this.hdfsPath = hdfs; this.conf = conf; } //hdfs路徑 private String hdfsPath; //Hadoop系統配置 private Configuration conf; //啓動函數 public static void main(String[] args) throws IOException { JobConf conf = config(); System.out.println(conf.get("hadoop.http.staticuser.user")); System.out.println(System.getenv("HADOOP_HOME")); HdfsDirConsoleTest hdfs = new HdfsDirConsoleTest(conf); // hdfs.mkdirs("/testput"); // hdfs.copyFile("C:\\Users\\Administrator\\Desktop\\testput", "/testput/testput2"); // hdfs.catFile("/testput/testput"); hdfs.download("/testput/testput", "D:/ss/ss",conf); // hdfs.ls("hdfs://bigdata.hadoop.com:9000/"); // hdfs.rmr("/testput"); // List<String> fileList = hdfs.getTree("/","/","|-"); // for(int i=0;i<fileList.size();i++){ // System.out.println(fileList.get(i)); // } System.out.println("success!"); } //加載Hadoop配置文件 public static JobConf config(){ JobConf conf = new JobConf(HdfsDirConsoleTest.class); conf.setJobName("HdfsDAO"); conf.addResource("hadoop/core-site.xml"); conf.addResource("hadoop/hdfs-site.xml"); conf.addResource("hadoop/mapred-site.xml"); return conf; } //在根目錄下建立文件夾 public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } //某個文件夾的文件列表 public FileStatus[] ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); if(list != null) for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); System.out.println(f.getOwner()+"=="+f.getBlockSize()+"="+f.getModificationTime()+"--"+f.getPermission()+"="+f.getReplication()); // System.out.printf("%s, folder: %s, 大小: %dK\n", f.getPath().getName(), (f.isDir()?"目錄":"文件"), f.getLen()/1024); } System.out.println("=========================================================="); fs.close(); return list; } public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); //remote---/用戶/用戶下的文件或文件夾 fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } public void catFile(String remote) throws IOException { FSDataInputStream instream = null; FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); Path path = new Path(remote); if(fs.isFile(path)){ fs.open(path); instream = fs.open(path); byte[] b = new byte[1024]; instream.read(b); System.out.println(new String(b,"utf-8")); fs.close(); } } List <String> treeList = new ArrayList<String>(); public List<String> getTree(String top, String remote, String prefix) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); if(list != null) for (FileStatus f : list) { // System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); System.out.println(prefix+ f.getPath().getName()); top += f.getPath().getName(); treeList.add(top); if(fs.isDirectory(f.getPath())){ getTree(top,f.getPath().toString(),prefix+"-"); } } return treeList; } //刪除文件或文件夾 public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } //下載文件到本地系統 public void download(String remote, String local,JobConf conf) throws IOException { // Path path = new Path(remote); // FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); // fs.copyToLocalFile(path, new Path(local)); // System.out.println("download: from" + remote + " to " + local); // fs.close(); FileSystem fs = FileSystem.get(URI.create(remote),conf); FSDataInputStream fsdi = fs.open(new Path(remote)); if(CreateFileUtil.createDir(local)){ OutputStream output = new FileOutputStream(local+remote.substring(remote.lastIndexOf("/"))); IOUtils.copyBytes(fsdi,output,4096,true); } } public static void makdir(String path) { String strPath = "E:/a/aa/"; File file = new File(strPath); File fileParent = file.getParentFile(); if(!fileParent.exists()){ fileParent.mkdirs(); } } }