HDFS中PathFilter類java
在單個操做中處理一批文件,這是很常見的需求。好比說處理日誌的 MapReduce做業可能須要分析一個月內包含在大量目錄中的日誌文件。在一個表達式中使用通配符在匹配多個文件時比較方便的,無需列舉每一個文件和目錄 來指定輸入。hadoop爲執行通配提供了兩個FIleSystem方法:apache
1 public FileStatus[] globStatus(Path pathPattern) throw IOException數組
2 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOExceptionbash
globStatus()方法返回與路徑想匹配的全部文件的FileStatus對象數組,並按路徑排序。hadoop所支持的通配符與Unix bash相同。svn
第二個方法傳了一個PathFilter對象做爲參數,PathFilter能夠進一步對匹配進行限制。PathFilter是一個接口,裏面只有一個方法accept(Path path)。具體使用參考下面代碼oop
package com.tv; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IOUtils; public class MergeSmallFilesToHDFS { private static FileSystem fs = null; private static FileSystem local = null; public static class RegexExcludePathFilter implements PathFilter{ private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); //過濾 regex 格式的文件,只需 return !flag return !flag; } } public static class RegexAcceptPathFilter implements PathFilter { private final String regex; public RegexAcceptPathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); //接受 regex 格式的文件,只需 return flag return flag; } } public static void list() throws IOException, URISyntaxException { //讀取配置文件 Configuration conf = new Configuration(); URI uri = new URI("hdfs://zbc:9000"); // FileSystem是用戶操做HDFS的核心類,它得到URI對應的HDFS文件系統 fs = FileSystem.get(uri, conf); // 得到本地文件系統 local = FileSystem.getLocal(conf); //獲取該目錄下的全部子目錄(日期名稱) FileStatus[] dirstatus = local.globStatus(new Path("C:/Users/zaish/Documents/學習/hadooop分析數據/tvdata/*"),new RegexExcludePathFilter("^.*svn$")); Path[] dirs = FileUtil.stat2Paths(dirstatus); FSDataOutputStream out = null; FSDataInputStream in = null; for (Path dir : dirs) { String fileName = dir.getName().replace("-", "");//文件名稱 //只接受日期目錄下的.txt文件 FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$")); // 得到日期目錄下的全部文件 Path[] listedPaths = FileUtil.stat2Paths(localStatus); //輸出路徑 Path block = new Path("hdfs://zbc:9000/middle/tv/"+ fileName + ".txt"); // 打開輸出流 out = fs.create(block); for (Path p : listedPaths) { in = local.open(p);// 打開輸入流 IOUtils.copyBytes(in, out, 4096, false); // 複製數據 // 關閉輸入流 in.close(); } if (out != null) { // 關閉輸出流 out.close(); } } } public static void main(String[] args) throws Exception { list(); } }