記錄一下Java API 鏈接hadoop操做hdfs的實現流程(使用鏈接池管理)。java
之前作過這方面的開發,原本覺得不會有什麼問題,可是作的仍是坑坑巴巴,心裏有些懊惱,記錄下這煩人的過程,警示本身切莫眼高手低!git
一:引入相關jar包以下github
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.8.2</version> </dependency> <!-- commons-pool2 鏈接池用 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.6.0</version> </dependency>
二:鏈接池開發的基本流程spring
2.1項目基本環境是SpringBoot大集成···apache
2.2hadoop相關包結構以下(本身感受這結構劃分的也是凸顯了low逼水平【手動笑哭】)windows
2.2 畫個圖表達下開發思路springboot
3、上代碼eclipse
import com.cmcc.datacenter.hdfs.client.HdfsClient; import com.cmcc.datacenter.hdfs.client.HdfsFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HdfsConfig { @Value("${hadoop.hdfs.ip}") private String hdfsServerIp; @Value("${hadoop.hdfs.port}") private String hdfsServerPort; @Value("${hadoop.hdfs.pool.maxTotal}") private int maxTotal; @Value("${hadoop.hdfs.pool.maxIdle}") private int maxIdle; @Value("${hadoop.hdfs.pool.minIdle}") private int minIdle; @Value("${hadoop.hdfs.pool.maxWaitMillis}") private int maxWaitMillis; @Value("${hadoop.hdfs.pool.testWhileIdle}") private boolean testWhileIdle; @Value("${hadoop.hdfs.pool.minEvictableIdleTimeMillis}") private long minEvictableIdleTimeMillis = 60000; @Value("${hadoop.hdfs.pool.timeBetweenEvictionRunsMillis}") private long timeBetweenEvictionRunsMillis = 30000; @Value("${hadoop.hdfs.pool.numTestsPerEvictionRun}") private int numTestsPerEvictionRun = -1; @Bean(initMethod = "init", destroyMethod = "stop") public HdfsClient HdfsClient(){ HdfsClient client = new HdfsClient(); return client; } /** * TestWhileConfig - 在空閒時檢查有效性, 默認false * MinEvictableIdleTimeMillis - 逐出鏈接的最小空閒時間 * TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 若是爲負數則不運行逐出線程,默認-1 * NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目 * */ @Bean public HdfsPoolConfig HdfsPoolConfig(){ HdfsPoolConfig hdfsPoolConfig = new HdfsPoolConfig(); hdfsPoolConfig.setTestWhileIdle(testWhileIdle); hdfsPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); hdfsPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); hdfsPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun); hdfsPoolConfig.setMaxTotal(maxTotal); hdfsPoolConfig.setMaxIdle(maxIdle); hdfsPoolConfig.setMinIdle(minIdle); hdfsPoolConfig.setMaxWaitMillis(maxWaitMillis); return hdfsPoolConfig; } @Bean public HdfsFactory HdfsFactory(){ return new HdfsFactory("hdfs://" + hdfsServerIp + ":" + hdfsServerPort); } }
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class HdfsPoolConfig extends GenericObjectPoolConfig { public HdfsPoolConfig(){} /** * TestWhileConfig - 在空閒時檢查有效性, 默認false * MinEvictableIdleTimeMillis - 逐出鏈接的最小空閒時間 * TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 若是爲負數則不運行逐出線程,默認-1 * NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目 * */ public HdfsPoolConfig(boolean testWhileIdle, long minEvictableIdleTimeMillis, long timeBetweenEvictionRunsMillis, int numTestsPerEvictionRun){ this.setTestWhileIdle(testWhileIdle); this.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); this.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); this.setNumTestsPerEvictionRun(numTestsPerEvictionRun); } }
import com.cmcc.datacenter.hdfs.config.HdfsPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; public class HdfsClient { private Logger logger = LoggerFactory.getLogger(this.getClass()); private HdfsPool hdfsPool; @Autowired private HdfsPoolConfig hdfsPoolConfig; @Autowired private HdfsFactory hdfsFactory; public void init(){ hdfsPool = new HdfsPool(hdfsFactory,hdfsPoolConfig); } public void stop(){ hdfsPool.close(); } public long getPathSize(String path) throws Exception { Hdfs hdfs = null; try { hdfs = hdfsPool.borrowObject(); return hdfs.getContentSummary(path).getLength(); } catch (Exception e) { logger.error("[HDFS]獲取路徑大小失敗", e); throw e; } finally { if (null != hdfs) { hdfsPool.returnObject(hdfs); } } } public List<String> getBasePath(){ Hdfs hdfs = null; try { hdfs = hdfsPool.borrowObject(); return hdfs.listFileName(); } catch (Exception e) { e.printStackTrace(); return null; }finally { if (null != hdfs) { hdfsPool.returnObject(hdfs); } } } }
import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import java.io.IOException; public class HdfsFactory implements PooledObjectFactory<Hdfs> { private final String url; public HdfsFactory(String url){ this.url = url; } @Override public PooledObject<Hdfs> makeObject() throws Exception { Hdfs hdfs = new Hdfs(url); hdfs.open(); return new DefaultPooledObject<Hdfs>(hdfs); } @Override public void destroyObject(PooledObject<Hdfs> pooledObject) throws Exception { Hdfs hdfs = pooledObject.getObject(); hdfs.close(); } @Override public boolean validateObject(PooledObject<Hdfs> pooledObject) { Hdfs hdfs = pooledObject.getObject(); try { return hdfs.isConnected(); } catch (IOException e) { e.printStackTrace(); return false; } } @Override public void activateObject(PooledObject<Hdfs> pooledObject) throws Exception { } @Override public void passivateObject(PooledObject<Hdfs> pooledObject) throws Exception { } }
import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.AbandonedConfig; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class HdfsPool extends GenericObjectPool<Hdfs> { public HdfsPool(PooledObjectFactory<Hdfs> factory) { super(factory); } public HdfsPool(PooledObjectFactory<Hdfs> factory, GenericObjectPoolConfig<Hdfs> config) { super(factory, config); } public HdfsPool(PooledObjectFactory<Hdfs> factory, GenericObjectPoolConfig<Hdfs> config, AbandonedConfig abandonedConfig) { super(factory, config, abandonedConfig); } }
import com.cmcc.datacenter.hdfs.config.HdfsConfig; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; import java.util.List; public class Hdfs { private Logger logger = LoggerFactory.getLogger(this.getClass()); private FileSystem fs; private String coreResource; private String hdfsResource; private final String url; private static final String NAME = "fs.hdfs.impl"; public Hdfs(String url) { this.url = url; } public void open() { try { Configuration conf = new Configuration(); conf.set("fs.defaultFS", url); System.out.println("url is "+url); fs = FileSystem.get(conf); logger.info("[Hadoop]建立實例成功"); } catch (Exception e) { logger.error("[Hadoop]建立實例失敗", e); } } public void close() { try { if (null != fs) { fs.close(); logger.info("[Hadoop]關閉實例成功"); } } catch(Exception e) { logger.error("[Hadoop]關閉實例失敗", e); } } public boolean isConnected() throws IOException { return fs.exists(new Path("/")); } public boolean exists(String path) throws IOException { Path hdfsPath = new Path(path); return fs.exists(hdfsPath); } public FileStatus getFileStatus(String path) throws IOException { Path hdfsPath = new Path(path); return fs.getFileStatus(hdfsPath); } public ContentSummary getContentSummary(String path) throws IOException { ContentSummary contentSummary = null; Path hdfsPath = new Path(path); if (fs.exists(hdfsPath)) { contentSummary = fs.getContentSummary(hdfsPath); } return contentSummary; } public List<String> listFileName() throws IOException { List<String> res = Lists.newArrayList(); FileStatus[] fileStatuses = fs.listStatus(new Path("/")); for (FileStatus fileStatus : fileStatuses){ res.add(fileStatus.getPath() +":類型--"+ (fileStatus.isDirectory()? "文件夾":"文件")); } return res; } }
4、總結:ide
一共六個類,理清思路看是很easy的。工具
這裏就是spring對類的管理和commons-pool2對鏈接類的管理混着用了,因此顯得有點亂。
1.@Configuration註解加到Hdfsconfig類上,做爲一個配置類,做用相似於spring-xml文件中的<beans></beans>標籤,springboot會掃描並注入它名下管理的類,其中
@Bean(initMethod = "init", destroyMethod = "stop") 標籤表示spring在初始化這個類時調用他的init方法,銷燬時調用他的stop方法。
2.HdfsClient 是業務方法調用的類,spring在初始化這個類時,調用它的init方法,這個方法會建立HdfsPool(即Hdfs的鏈接池)。其餘方法是對Hdfs中方法的二次封裝,即先使用鏈接池獲取實例,再調用實例方法。
3.HdfsPoolConfig繼承commons-pool2包中的GenericObjectConfig,受spring管理,做爲線程池的配置類,建立HdfsPool時做爲參數傳入。
4.HdfsFactory繼承commons-pool2包中的GenericObjectFactory,受spring管理,做爲建立鏈接實例的工廠類,建立HdfsPool時做爲參數傳入。實際上鍊接池就是經過它獲取的鏈接實例。
5.HdfsPool繼承commons-pool2包中的GenericObjectPool,是鏈接池。
6.Hdfs,是底層的鏈接實例,全部增刪改查的方法都要在這裏實現,只不過獲取/銷燬鏈接交給池管理。
聲明:這裏用spring管理一些類是應爲項目自己用的springboot,spring管理方便,並非強制使用,願意徹底能夠本身new。
5、不得不說的一些不是坑的坑。
1.我真的不記得windows上用Java API鏈接遠程的hadoop還要有一些神操做。
報錯以下:java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset
解決以下:
1. 將已下載的 hadoop-2.9.0.tar 這個壓縮文件解壓,放到你想要的位置(本機任意位置);
2. 下載 windows 環境下所需的其餘文件(hadoop2.9.0對應的hadoop.dll,winutils.exe 等),這步真是關鍵,吐槽某SDN想錢想瘋了啊,霸佔百度前10頁,各類下載各類C幣,各類要錢。
很少說了,附上github地址:github地址
3. 拿到上面下載的windows所需文件,執行如下步驟:
3.1:將文件解壓到你解壓的 hadoop-2.9.0.tar 的bin目錄下(沒有的放進去,有的不要替換,以避免花式做死,想學習嘗試的除外)3.2:將hadoop.dll複製到C:\Window\System32下3.3:添加環境變量HADOOP_HOME,指向hadoop目錄3.4:將%HADOOP_HOME%\bin加入到path裏面,無論用的話將%HADOOP_HOME%\sbin也加進去。3.5:重啓 IDE(你的編輯工具,例如eclipse,intellij idea)