Java使用鏈接池管理Hdfs鏈接

記錄一下Java API 鏈接hadoop操做hdfs的實現流程(使用鏈接池管理)。java

之前作過這方面的開發,原本覺得不會有什麼問題,可是作的仍是坑坑巴巴,心裏有些懊惱,記錄下這煩人的過程,警示本身切莫眼高手低!git

一:引入相關jar包以下github

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- commons-pool2 鏈接池用 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>

二:鏈接池開發的基本流程spring

  2.1項目基本環境是SpringBoot大集成···apache

  2.2hadoop相關包結構以下(本身感受這結構劃分的也是凸顯了low逼水平【手動笑哭】)windows

  2.2 畫個圖表達下開發思路springboot

3、上代碼eclipse

import com.cmcc.datacenter.hdfs.client.HdfsClient;
import com.cmcc.datacenter.hdfs.client.HdfsFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HdfsConfig {

    @Value("${hadoop.hdfs.ip}")
    private  String hdfsServerIp;

    @Value("${hadoop.hdfs.port}")
    private  String hdfsServerPort;

    @Value("${hadoop.hdfs.pool.maxTotal}")
    private int maxTotal;

    @Value("${hadoop.hdfs.pool.maxIdle}")
    private int maxIdle;

    @Value("${hadoop.hdfs.pool.minIdle}")
    private int minIdle;

    @Value("${hadoop.hdfs.pool.maxWaitMillis}")
    private int maxWaitMillis;

    @Value("${hadoop.hdfs.pool.testWhileIdle}")
    private boolean  testWhileIdle;

    @Value("${hadoop.hdfs.pool.minEvictableIdleTimeMillis}")
    private long minEvictableIdleTimeMillis = 60000;

    @Value("${hadoop.hdfs.pool.timeBetweenEvictionRunsMillis}")
    private long timeBetweenEvictionRunsMillis = 30000;

    @Value("${hadoop.hdfs.pool.numTestsPerEvictionRun}")
    private int numTestsPerEvictionRun = -1;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public HdfsClient HdfsClient(){
        HdfsClient client = new HdfsClient();
        return client;
    }

    /**
     * TestWhileConfig - 在空閒時檢查有效性, 默認false
     * MinEvictableIdleTimeMillis - 逐出鏈接的最小空閒時間
     * TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 若是爲負數則不運行逐出線程,默認-1
     * NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目
     * */
    @Bean
    public HdfsPoolConfig HdfsPoolConfig(){
        HdfsPoolConfig hdfsPoolConfig = new HdfsPoolConfig();
        hdfsPoolConfig.setTestWhileIdle(testWhileIdle);
        hdfsPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        hdfsPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        hdfsPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
        hdfsPoolConfig.setMaxTotal(maxTotal);
        hdfsPoolConfig.setMaxIdle(maxIdle);
        hdfsPoolConfig.setMinIdle(minIdle);
        hdfsPoolConfig.setMaxWaitMillis(maxWaitMillis);
        return hdfsPoolConfig;
    }

    @Bean
    public HdfsFactory HdfsFactory(){
        return  new  HdfsFactory("hdfs://" + hdfsServerIp + ":" + hdfsServerPort);
    }

}
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class HdfsPoolConfig extends GenericObjectPoolConfig {

    public HdfsPoolConfig(){}

    /**
     * TestWhileConfig - 在空閒時檢查有效性, 默認false
     * MinEvictableIdleTimeMillis - 逐出鏈接的最小空閒時間
     * TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 若是爲負數則不運行逐出線程,默認-1
     * NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目
     * */
    public HdfsPoolConfig(boolean testWhileIdle, long minEvictableIdleTimeMillis, long timeBetweenEvictionRunsMillis, int numTestsPerEvictionRun){
        this.setTestWhileIdle(testWhileIdle);
        this.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        this.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        this.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
    }


}
import com.cmcc.datacenter.hdfs.config.HdfsPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

public class HdfsClient {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private HdfsPool hdfsPool;

    @Autowired
    private HdfsPoolConfig hdfsPoolConfig;

    @Autowired
    private HdfsFactory hdfsFactory;

    public void init(){
        hdfsPool = new HdfsPool(hdfsFactory,hdfsPoolConfig);
    }

    public void stop(){
        hdfsPool.close();
    }

    public long getPathSize(String path) throws Exception {
        Hdfs hdfs = null;
        try {
            hdfs = hdfsPool.borrowObject();
            return hdfs.getContentSummary(path).getLength();
        } catch (Exception e) {
            logger.error("[HDFS]獲取路徑大小失敗", e);
            throw e;
        } finally {
            if (null != hdfs) {
                hdfsPool.returnObject(hdfs);
            }
        }
    }

    public List<String> getBasePath(){
        Hdfs hdfs = null;
        try {
            hdfs = hdfsPool.borrowObject();
            return hdfs.listFileName();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }finally {
            if (null != hdfs) {
                hdfsPool.returnObject(hdfs);
            }
        }
    }



}
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;

import java.io.IOException;

public class HdfsFactory implements PooledObjectFactory<Hdfs> {

    private final String url;
    public HdfsFactory(String url){
        this.url = url;
    }

    @Override
    public PooledObject<Hdfs> makeObject() throws Exception {
        Hdfs hdfs = new Hdfs(url);
        hdfs.open();
        return new DefaultPooledObject<Hdfs>(hdfs);
    }

    @Override
    public void destroyObject(PooledObject<Hdfs> pooledObject) throws Exception {
        Hdfs hdfs = pooledObject.getObject();
        hdfs.close();
    }

    @Override
    public boolean validateObject(PooledObject<Hdfs> pooledObject) {
        Hdfs hdfs = pooledObject.getObject();
        try {
            return  hdfs.isConnected();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public void activateObject(PooledObject<Hdfs> pooledObject) throws Exception {

    }

    @Override
    public void passivateObject(PooledObject<Hdfs> pooledObject) throws Exception {

    }
}
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class HdfsPool extends GenericObjectPool<Hdfs> {


    public HdfsPool(PooledObjectFactory<Hdfs> factory) {
        super(factory);
    }

    public HdfsPool(PooledObjectFactory<Hdfs> factory, GenericObjectPoolConfig<Hdfs> config) {
        super(factory, config);
    }

    public HdfsPool(PooledObjectFactory<Hdfs> factory, GenericObjectPoolConfig<Hdfs> config, AbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }
}
import com.cmcc.datacenter.hdfs.config.HdfsConfig;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.List;

public class Hdfs {

    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private FileSystem fs;
    private String coreResource;
    private String hdfsResource;
    private final String url;
    private static final String NAME = "fs.hdfs.impl";

    public Hdfs(String url) {
        this.url = url;
    }

    public void open() {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", url);
            System.out.println("url is "+url);
            fs = FileSystem.get(conf);
            logger.info("[Hadoop]建立實例成功");
        } catch (Exception e) {
            logger.error("[Hadoop]建立實例失敗", e);
        }
    }

    public void close() {
        try {
            if (null != fs) {
                fs.close();
                logger.info("[Hadoop]關閉實例成功");
            }
        } catch(Exception e) {
            logger.error("[Hadoop]關閉實例失敗", e);
        }
    }

    public boolean isConnected() throws IOException {
        return fs.exists(new Path("/"));
    }

    public boolean exists(String path) throws IOException {
        Path hdfsPath = new Path(path);
        return fs.exists(hdfsPath);
    }


    public FileStatus getFileStatus(String path) throws IOException {
        Path hdfsPath = new Path(path);
        return fs.getFileStatus(hdfsPath);
    }

    public ContentSummary getContentSummary(String path) throws IOException {
        ContentSummary contentSummary = null;
        Path hdfsPath = new Path(path);
        if (fs.exists(hdfsPath)) {
            contentSummary = fs.getContentSummary(hdfsPath);
        }
        return contentSummary;
    }


    public List<String> listFileName() throws IOException {
        List<String> res = Lists.newArrayList();
        FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus : fileStatuses){
            res.add(fileStatus.getPath() +":類型--"+ (fileStatus.isDirectory()? "文件夾":"文件"));
        }
        return  res;
    }



}

4、總結:ide

一共六個類,理清思路看是很easy的。工具

這裏就是spring對類的管理和commons-pool2對鏈接類的管理混着用了,因此顯得有點亂。

1.@Configuration註解加到Hdfsconfig類上,做爲一個配置類,做用相似於spring-xml文件中的<beans></beans>標籤,springboot會掃描並注入它名下管理的類,其中

@Bean(initMethod = "init", destroyMethod = "stop") 標籤表示spring在初始化這個類時調用他的init方法,銷燬時調用他的stop方法。

2.HdfsClient 是業務方法調用的類,spring在初始化這個類時,調用它的init方法,這個方法會建立HdfsPool(即Hdfs的鏈接池)。其餘方法是對Hdfs中方法的二次封裝,即先使用鏈接池獲取實例,再調用實例方法。

3.HdfsPoolConfig繼承commons-pool2包中的GenericObjectConfig,受spring管理,做爲線程池的配置類,建立HdfsPool時做爲參數傳入。

4.HdfsFactory繼承commons-pool2包中的GenericObjectFactory,受spring管理,做爲建立鏈接實例的工廠類,建立HdfsPool時做爲參數傳入。實際上鍊接池就是經過它獲取的鏈接實例。

5.HdfsPool繼承commons-pool2包中的GenericObjectPool,是鏈接池。

6.Hdfs,是底層的鏈接實例,全部增刪改查的方法都要在這裏實現,只不過獲取/銷燬鏈接交給池管理。

聲明:這裏用spring管理一些類是應爲項目自己用的springboot,spring管理方便,並非強制使用,願意徹底能夠本身new。

5、不得不說的一些不是坑的坑。

  1.我真的不記得windows上用Java API鏈接遠程的hadoop還要有一些神操做。

報錯以下:java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset

解決以下:

1. 將已下載的 hadoop-2.9.0.tar 這個壓縮文件解壓,放到你想要的位置(本機任意位置);

2. 下載 windows 環境下所需的其餘文件(hadoop2.9.0對應的hadoop.dll,winutils.exe 等),這步真是關鍵,吐槽某SDN想錢想瘋了啊,霸佔百度前10頁,各類下載各類C幣,各類要錢。

很少說了,附上github地址:github地址

3. 拿到上面下載的windows所需文件,執行如下步驟:

3.1:將文件解壓到你解壓的 hadoop-2.9.0.tar 的bin目錄下(沒有的放進去,有的不要替換,以避免花式做死,想學習嘗試的除外)3.2:將hadoop.dll複製到C:\Window\System32下3.3:添加環境變量HADOOP_HOME,指向hadoop目錄3.4:將%HADOOP_HOME%\bin加入到path裏面,無論用的話將%HADOOP_HOME%\sbin也加進去。3.5:重啓 IDE(你的編輯工具,例如eclipse,intellij idea)

相關文章
相關標籤/搜索