package com.da.hbase.tool.utils; import com.da.hbase.tool.common.Const; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * hdfs操做經常使用方法類 */ public class HdfsUtils { public static final Logger LOG= LoggerFactory.getLogger(HdfsUtils.class); /** * 經過ip直接鏈接hdfs * @param ip * @return */ public static FileSystem getFsFromIp(String ip){ FileSystem fs = null; try { fs=FileSystem.get(URI.create("hdfs://"+ip),new Configuration()); } catch (IOException e) { LOG.error("此ip:{} 鏈接出現異常", ip); } return fs; } /** * 檢查該fs是否可用 * @param fs * @return */ public static Boolean checkFs(FileSystem fs){ Boolean success=true; if(null==fs){ return false; } Path path=new Path("/"); try { RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path); success=true; } catch (IOException e) { success=false; } return success; } /** * 從ips中獲取一個可用的fs * @param ips * @return */ public static FileSystem getAndCheckFs(String ips){ return getAndCheckFs(ips,","); } /** * 從ips中獲取一個可用的fs * @param ips * @param separator * @return */ public static FileSystem getAndCheckFs(String ips,String separator){ String [] ipArr=ips.split(separator); FileSystem fs=null; for (String ip : ipArr) { fs=getFsFromIp(ip); if(checkFs(fs)){ LOG.info("此Ip:{}可鏈接hdfs",ip); break; }else{ fs=null; } } if(null==fs){ LOG.error("沒法鏈接hdfs環境,請檢查網絡是否可用或者ip配置是否正確,配置ips:{}",ips); } return fs; } /** * 測試getAndCheckFs方法 */ private static void testConnectFs(){ String ips="10.17.139.126,10.17.139.127,10.17.139.125"; FileSystem fs=getAndCheckFs(ips); String path1="/hbase/data/default/"; Path path=new Path(path1); try { RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path); while(remoteIterator.hasNext()){ System.out.println(remoteIterator.next().getPath()); } } catch (IOException e) { } } /** * 查看當前路徑是否存在 * @param fs * @param path * @return */ public static Boolean checkPathExist(FileSystem fs,String path){ Boolean isExist=true; try { isExist=fs.exists(new Path(path)); } catch (IOException e) { isExist=false; e.printStackTrace(); } return isExist; } /** * 遞歸遍歷找到全部目錄和文件存儲在map中,文件,key:路徑,value:FILE ;目錄,key:路徑,value:DIR * @param fs * @param src */ public static void recureScanDir(FileSystem fs,Path src, Map<Path,String> map){ try{ if(fs.isFile(src)) { map.put(src, Const.FILE_STATUS); }else{ map.remove(src); RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(src); if(!remoteIterator.hasNext()){ map.put(src, Const.DIR_STATUS); }else { while (remoteIterator.hasNext()){ recureScanDir(fs,remoteIterator.next().getPath(),map); } } } } catch (IOException e) { e.printStackTrace(); } } /** * 目錄從本地拷貝到hdfs上 * @param fs * @param src * @param dst * @return */ public static Boolean copyFromLocal(FileSystem fs,Path src,Path dst){ Boolean success=true; try { if(fs.exists(dst)){ fs.delete(dst,true); } fs.copyFromLocalFile(false,true,src,dst); success=true; } catch (IOException e) { success=false; LOG.error("文件從本地拷貝到hdfs上,出現Io異常,致使拷貝文件失敗,src:{},dst:{}",src,dst); e.printStackTrace(); } return success; } /** *目錄從hdfs上拷貝到本地 * @param fs * @param src * @param dst * @return */ public static Boolean copyToLocal(FileSystem fs,Path src,Path dst){ Boolean success=true; try { if(new File(dst.toString()).exists()){ Utils.deletNotEmptyDir(new File(dst.toString())); } fs.copyToLocalFile(false, src, dst, true); success=true; } catch (IOException e) { success=false; LOG.error("文件從hdfs拷貝到本地,出現Io異常,致使拷貝文件失敗"); e.printStackTrace(); } return success; } private static void testCopyFileToLocal(){ String ips="10.17.139.126,10.17.139.127,10.17.139.125"; FileSystem fs=getAndCheckFs(ips); String path1="/hbase/data/default/"; Path path=new Path(path1); try { RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path); while(remoteIterator.hasNext()){ System.out.println(remoteIterator.next().getPath()); } } catch (IOException e) { LOG.error(e.getMessage()); } } /** * 獲取目錄path下全部的文件名 * @param fs * @param path * @return */ public static List<String> scanDir(FileSystem fs,Path path){ List<String> list=new ArrayList<>(); try { RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path); while(remoteIterator.hasNext()){ list.add(remoteIterator.next().getPath().getName()); } } catch (IOException e) { LOG.error(e.getMessage()); } return list; } public static void main(String[] args) { //testConnectFs(); testCopyFileToLocal(); } }