12 hdfs經常使用文件、目錄拷貝操做、刪除操做

package com.da.hbase.tool.utils;

import com.da.hbase.tool.common.Const;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * hdfs操做經常使用方法類
 */
public class HdfsUtils {
    public static final Logger LOG= LoggerFactory.getLogger(HdfsUtils.class);
    /**
     * 經過ip直接鏈接hdfs
     * @param ip
     * @return
     */
    public static FileSystem getFsFromIp(String ip){
        FileSystem fs = null;
        try {
            fs=FileSystem.get(URI.create("hdfs://"+ip),new Configuration());
        } catch (IOException e) {
            LOG.error("此ip:{} 鏈接出現異常", ip);
        }
        return  fs;
    }

    /**
     * 檢查該fs是否可用
     * @param fs
     * @return
     */
    public static Boolean checkFs(FileSystem fs){
        Boolean success=true;
        if(null==fs){
            return false;
        }
        Path path=new Path("/");
        try {
            RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
            success=true;
        } catch (IOException e) {
            success=false;
        }
        return success;
    }

    /**
     * 從ips中獲取一個可用的fs
     * @param ips
     * @return
     */
    public static FileSystem getAndCheckFs(String ips){
        return getAndCheckFs(ips,",");
    }
    /**
     * 從ips中獲取一個可用的fs
     * @param ips
     * @param separator
     * @return
     */
    public static FileSystem getAndCheckFs(String ips,String separator){
        String [] ipArr=ips.split(separator);
        FileSystem fs=null;
        for (String ip : ipArr) {
            fs=getFsFromIp(ip);
            if(checkFs(fs)){
                LOG.info("此Ip:{}可鏈接hdfs",ip);
                break;
            }else{
                fs=null;
            }
        }
        if(null==fs){
            LOG.error("沒法鏈接hdfs環境,請檢查網絡是否可用或者ip配置是否正確,配置ips:{}",ips);
        }
        return fs;
    }

    /**
     * 測試getAndCheckFs方法
     */
    private static void testConnectFs(){
        String ips="10.17.139.126,10.17.139.127,10.17.139.125";
        FileSystem fs=getAndCheckFs(ips);
        String path1="/hbase/data/default/";
        Path path=new Path(path1);
        try {
            RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
            while(remoteIterator.hasNext()){
                System.out.println(remoteIterator.next().getPath());
            }
        } catch (IOException e) {
        }
    }

    /**
     * 查看當前路徑是否存在
     * @param fs
     * @param path
     * @return
     */
    public static Boolean checkPathExist(FileSystem fs,String path){
        Boolean isExist=true;
        try {
            isExist=fs.exists(new Path(path));
        } catch (IOException e) {
            isExist=false;
            e.printStackTrace();
        }
        return  isExist;
    }

    /**
     * 遞歸遍歷找到全部目錄和文件存儲在map中,文件,key:路徑,value:FILE ;目錄,key:路徑,value:DIR
     * @param fs
     * @param src
     */
    public static void recureScanDir(FileSystem fs,Path src, Map<Path,String> map){
        try{
            if(fs.isFile(src)) {
                map.put(src, Const.FILE_STATUS);
            }else{
                map.remove(src);
                RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(src);
                if(!remoteIterator.hasNext()){
                    map.put(src, Const.DIR_STATUS);
                }else {
                    while (remoteIterator.hasNext()){
                        recureScanDir(fs,remoteIterator.next().getPath(),map);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        }

    /**
     * 目錄從本地拷貝到hdfs上
     * @param fs
     * @param src
     * @param dst
     * @return
     */
    public static Boolean copyFromLocal(FileSystem fs,Path src,Path dst){
        Boolean success=true;
        try {
            if(fs.exists(dst)){
                fs.delete(dst,true);
            }
            fs.copyFromLocalFile(false,true,src,dst);
            success=true;
        } catch (IOException e) {
            success=false;
            LOG.error("文件從本地拷貝到hdfs上,出現Io異常,致使拷貝文件失敗,src:{},dst:{}",src,dst);
            e.printStackTrace();
        }
        return success;
    }

        /**
     *目錄從hdfs上拷貝到本地
     * @param fs
     * @param src
     * @param dst
     * @return
     */
    public static Boolean copyToLocal(FileSystem fs,Path src,Path dst){
        Boolean success=true;
        try {
            if(new File(dst.toString()).exists()){
                Utils.deletNotEmptyDir(new File(dst.toString()));
            }
            fs.copyToLocalFile(false, src, dst, true);
            success=true;
        } catch (IOException e) {
            success=false;
            LOG.error("文件從hdfs拷貝到本地,出現Io異常,致使拷貝文件失敗");
            e.printStackTrace();
        }
        return success;
    }

    private static void testCopyFileToLocal(){
        String ips="10.17.139.126,10.17.139.127,10.17.139.125";
        FileSystem fs=getAndCheckFs(ips);
        String path1="/hbase/data/default/";
        Path path=new Path(path1);
        try {
            RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
            while(remoteIterator.hasNext()){
                System.out.println(remoteIterator.next().getPath());
            }
        } catch (IOException e) {
            LOG.error(e.getMessage());
        }
    }

    /**
     * 獲取目錄path下全部的文件名
     * @param fs
     * @param path
     * @return
     */
    public static List<String> scanDir(FileSystem fs,Path path){
        List<String> list=new ArrayList<>();
        try {
            RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
            while(remoteIterator.hasNext()){
                list.add(remoteIterator.next().getPath().getName());
            }
        } catch (IOException e) {
            LOG.error(e.getMessage());
        }
        return list;
    }

    public static void main(String[] args) {
        //testConnectFs();
        testCopyFileToLocal();

    }
}
相關文章
相關標籤/搜索