thrift實現HDFS文件操做

thrift 文件以下java


namespace java com.pera.file.transformide


struct  File{
    1:string path ,
    2:string content,
}

service FileTransform {
    bool exists(1:string path),
    void mkdir(1:string path ),
    void store(1:File file),
    set<string> ls(1:string path),
    bool isDirectory(1:string path),
    string getParentFile(1:string path),
    void createNewFile(1:string path),
    void deleteFile(1:string path),
    void deleteOnExit(1:string path),
    bool isFile(1:string path),
    void rename(1:string srcPath ,2:string destPath),
    string getName(1:string path),
    i64 totalSpace(1:string path),
    i64 usedSpace(1:string path),
    string read(1:string path)

}this


客戶端:url

public class FileTransFormPair {
    
    private TTransport transport ;
    private Client  client  ;
    
    public FileTransFormPair(){}
    
    public FileTransFormPair( TTransport transport ,Client  client ){
        this.transport=transport ;
        this.client =client;
    }
    
    public TTransport getTransport() {
        return transport;
    }
    public void setTransport(TTransport transport) {
        this.transport = transport;
    }
    public Client getClient() {
        return client;
    }
    public void setClient(Client client) {
        this.client = client;
    }
}

public class FileTransFormClient implements Iface {

    public static  String SERVER_IP = "localhost";
    public static  int SERVER_PORT = 8090;
    public static  int TIMEOUT = 30000;
    
    public FileTransFormClient(String host,int port ,int timeout){
        SERVER_IP=host;
        SERVER_PORT=port;
        TIMEOUT=timeout;
    }
    
    private static   FileTransFormPair getClient() throws TTransportException{
        TTransport transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
        TProtocol protocol = new TBinaryProtocol(transport);
        Client  client =new Client(protocol);
        transport.open();
        return new FileTransFormPair(transport, client);
    }
    
    @Override
    public boolean exists(String path) throws TException {
        boolean result =false;
        FileTransFormPair parameter =getClient();
        result =parameter.getClient().exists(path);
        parameter.getTransport().close();
        return result;
    }


    @Override
    public void mkdir(String path) throws TException {
        FileTransFormPair parameter =getClient();
        parameter.getClient().mkdir(path);
        parameter.getTransport().close();
        
    }


    @Override
    public void store(File file) throws TException {
        FileTransFormPair parameter =getClient();
        parameter.getClient().store(file);
        parameter.getTransport().close();
        
    }


    @Override
    public Set<String> ls(String path) throws TException {
        FileTransFormPair parameter =getClient();
        Set<String> result =parameter.getClient().ls(path);
        parameter.getTransport().close();
        return result;
    }


    @Override
    public boolean isDirectory(String path) throws TException {
        boolean result =false ;
        FileTransFormPair parameter =getClient();
        result =parameter.getClient().isDirectory(path);
        parameter.getTransport().close();
        return result;
    }


    @Override
    public String getParentFile(String path) throws TException {
        FileTransFormPair parameter =getClient();
        String result =parameter.getClient().getParentFile(path);
        parameter.getTransport().close();
        return result;
    }


    @Override
    public void createNewFile(String path) throws TException {
        FileTransFormPair parameter =getClient();
        parameter.getClient().createNewFile(path);
        parameter.getTransport().close();
    }


    @Override
    public void deleteFile(String path) throws TException {
        FileTransFormPair parameter =getClient();
        parameter.getClient().deleteFile(path);
        parameter.getTransport().close();
        
    }


    @Override
    public void deleteOnExit(String path) throws TException {
        FileTransFormPair parameter =getClient();
        parameter.getClient().deleteOnExit(path);
        parameter.getTransport().close();
        
    }


    @Override
    public boolean isFile(String path) throws TException {
        FileTransFormPair parameter =getClient();
        boolean result =parameter.getClient().isFile(path);
        parameter.getTransport().close();
        return result ;
    }


    @Override
    public void rename(String srcPath, String destPath) throws TException {
        FileTransFormPair parameter =getClient();
        parameter.getClient().rename(srcPath, destPath);
        parameter.getTransport().close();
        
    }


    @Override
    public String getName(String path) throws TException {
        FileTransFormPair parameter =getClient();
        String result =parameter.getClient().getName(path);
        parameter.getTransport().close();
        return result;
    }


    @Override
    public long totalSpace(String path) throws TException {
        FileTransFormPair parameter =getClient();
        long result =parameter.getClient().totalSpace(path);
        parameter.getTransport().close();
        return result;
    }


    @Override
    public long usedSpace(String path) throws TException {
        FileTransFormPair parameter =getClient();
        long result =parameter.getClient().usedSpace(path);
        parameter.getTransport().close();
        return result;
    }

    @Override
    public String read(String path) throws TException {
        FileTransFormPair parameter =getClient();
        String result =parameter.getClient().read(path);
        parameter.getTransport().close();
        return result;
    }
}
spa

服務端:orm

public class ThriftServer {

    public static  int SERVER_PORT ;

    private static TServer server =null;
    
    private  static Log  LOG =LogFactory.getLog(ThriftServer.class);
    
    public void init(){
        SERVER_PORT = 9090;
        LOG.info("init the thriftServer");
    }
    
    public void start() throws TTransportException {
        TProcessor tprocessor = new FileTransform.Processor<FileTransform.Iface>(
                new FileStoreService());
        TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
        TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(
                serverTransport);
        ttpsArgs.processor(tprocessor);
        ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());
        server = new TThreadPoolServer(ttpsArgs);
        server.serve();
        LOG.info("start the thriftServer");
    }

    public void stop() {
        if(null!=server){
            server.stop();
            LOG.info("stop the thriftServer");
        }
    }

    public static void main(String[] args) {
        ThriftServer server =new ThriftServer();
        server.init();
        try {
            server.start();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }

}
public class FileStoreService implements Iface{

    private Configuration getConfiguration(){
        Configuration  conf =new Configuration();
        Properties  pro = new Properties();
        try {
            pro.load(new FileInputStream(new File("conf/filetransform-site.properties")));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
//        conf.set("dfs.default.name", pro.getProperty("dfs.default.name"));
        
        return conf;
    }
    @Override
    public boolean exists(String path) throws TException {
        Configuration  conf =getConfiguration();
        FileSystem fs=null;
        try {
            fs = FileSystem.get(conf);
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }
        boolean exists =false ;
        try {
            exists  = fs.exists(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return exists;
    }

    @Override
    public void mkdir(String path) throws TException {
        Configuration  conf =getConfiguration();
        FileSystem fs=null;
        try {
            fs = FileSystem.get(conf);
            fs.mkdirs(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
    }

    @Override
    public void store(com.pera.file.transform.gen.File file) throws TException {
        String path =file.getPath();
        String content =file.getContent();
        Configuration conf  =getConfiguration();
        FileSystem  fs =null;
        FSDataOutputStream out=null;
        try {
            Path p =new Path(path);
            fs =FileSystem.get(conf);
            if(!fs.exists(p.getParent())){
                throw new TException( path+" is not exists");
            }
            out =fs.create(p);
            if(null!=content){
                byte   [] tmp =content.getBytes();
                out.write(tmp);
                out.flush();
                out.close();
            }
        } catch (Exception e) {
            throw new TException(e.getMessage());
        }finally{
            try {
                if(null!=out){
                    out.close();
                }
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }
            try {
                if(null!=fs){
                    fs.close();
                }
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }
            
        }
    }

    @Override
    public Set<String> ls(String path) throws TException {
        Configuration  conf =getConfiguration();
        FileSystem fs =null;
        Set<String>  result =new HashSet<String>();
        try {
            fs=FileSystem.get(conf);
            FileStatus   [] fstatus=fs.listStatus(new Path(path));
            int length =fstatus==null?0:fstatus.length;
            for (int i = 0; i < length; i++) {
                String url =fstatus[i].getPath().toString();
                result.add(url);
            }
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return result;
    }

    
    @Override
    @SuppressWarnings("deprecation")
    public boolean isDirectory(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        boolean result =false ;
        try {
            fs =FileSystem.get(conf);
            result =fs.isDirectory(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return result;
    }

    @Override
    public String getParentFile(String path) throws TException {
        Path parentPath =new Path(path).getParent();
        return parentPath.toString();
    }

    @Override
    public void createNewFile(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        try {
            fs =FileSystem.get(conf);
            fs.createNewFile(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
    }

    @Override
    public void deleteFile(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        try {
            fs =FileSystem.get(conf);
            fs.deleteOnExit(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        
    }

    @Override
    public void deleteOnExit(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        try {
            fs =FileSystem.get(conf);
            fs.deleteOnExit(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        
    }

    @Override
    public boolean isFile(String path) throws TException {
        
        boolean isFile =false ;
        
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        try {
            fs =FileSystem.get(conf);
            isFile =fs.isFile(new Path(path));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return isFile;
    }

    @Override
    public void rename(String srcPath, String destPath) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        try {
            fs =FileSystem.get(conf);
            fs.rename(new Path(srcPath), new Path(destPath));
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        
    }

    @Override
    public String getName(String path) throws TException {
        String name  =new Path(path).getName();
        
        return name;
    }

    @Override
    public long totalSpace(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        long length =0 ;
        try {
            fs =FileSystem.get(conf);
            length =fs.getFileStatus(new Path(path)).getLen();
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return length;
    }

    @Override
    public long usedSpace(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        long used =0 ;
        try {
            fs =FileSystem.get(conf);
            used =fs.getUsed();
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return used;
    }
    
    @Override
    public String read(String path) throws TException {
        Configuration conf =getConfiguration();
        FileSystem fs =null;
        String result   ="" ;
        try {
            fs =FileSystem.get(conf);
            FSDataInputStream  in =fs.open(new Path(path));
            int length =in.available();
            byte  [] tmp = new byte  [length] ;
            in.readFully(tmp);
            result  =new String(tmp);
        } catch (IOException e) {
            throw new TException(e.getMessage());
        }finally{
            if(null!=fs){
                try {
                    fs.close();
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
            }
        }
        return result;
    }
}

server

相關文章
相關標籤/搜索