mapreduce小文件合併&文件過濾器的使用


HDFS中PathFilter類java

在單個操做中處理一批文件,這是很常見的需求。好比說處理日誌的 MapReduce做業可能須要分析一個月內包含在大量目錄中的日誌文件。在一個表達式中使用通配符在匹配多個文件時比較方便的,無需列舉每一個文件和目錄 來指定輸入。hadoop爲執行通配提供了兩個FIleSystem方法:apache

1 public FileStatus[] globStatus(Path pathPattern) throw IOException數組

2 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOExceptionbash

globStatus()方法返回與路徑想匹配的全部文件的FileStatus對象數組,並按路徑排序。hadoop所支持的通配符與Unix bash相同。svn

第二個方法傳了一個PathFilter對象做爲參數,PathFilter能夠進一步對匹配進行限制。PathFilter是一個接口,裏面只有一個方法accept(Path path)。具體使用參考下面代碼oop

package com.tv;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;

public class MergeSmallFilesToHDFS {
	private static FileSystem fs = null;
	private static FileSystem local = null;
	public static class RegexExcludePathFilter implements PathFilter{
		private final String regex;
		public RegexExcludePathFilter(String regex) {
	        this.regex = regex;
	    }
		public boolean accept(Path path) {
			// TODO Auto-generated method stub
			boolean flag = path.toString().matches(regex);
			//過濾 regex 格式的文件,只需 return !flag
	        return !flag;
		}
	}
	public static class RegexAcceptPathFilter implements PathFilter {
        private final String regex;
        
        public RegexAcceptPathFilter(String regex) {
            this.regex = regex;
        }
		public boolean accept(Path path) {
			// TODO Auto-generated method stub
			boolean flag = path.toString().matches(regex);
			//接受 regex 格式的文件,只需 return flag
            return flag;
		}
	}
	public static void list() throws IOException, URISyntaxException {
		//讀取配置文件
        Configuration conf = new Configuration();
        URI uri = new URI("hdfs://zbc:9000");
        // FileSystem是用戶操做HDFS的核心類,它得到URI對應的HDFS文件系統
        fs = FileSystem.get(uri, conf);
        // 得到本地文件系統
        local = FileSystem.getLocal(conf);
        //獲取該目錄下的全部子目錄(日期名稱)
        FileStatus[] dirstatus = local.globStatus(new Path("C:/Users/zaish/Documents/學習/hadooop分析數據/tvdata/*"),new RegexExcludePathFilter("^.*svn$"));
        Path[] dirs = FileUtil.stat2Paths(dirstatus);
        FSDataOutputStream out = null;
        FSDataInputStream in = null;
        for (Path dir : dirs) {
            String fileName = dir.getName().replace("-", "");//文件名稱
            //只接受日期目錄下的.txt文件
            FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
            // 得到日期目錄下的全部文件
            Path[] listedPaths = FileUtil.stat2Paths(localStatus);
            //輸出路徑
            Path block = new Path("hdfs://zbc:9000/middle/tv/"+ fileName + ".txt");
            // 打開輸出流
            out = fs.create(block);            
            for (Path p : listedPaths) {
                in = local.open(p);// 打開輸入流
                IOUtils.copyBytes(in, out, 4096, false); // 複製數據
                // 關閉輸入流
                in.close();
            }
            if (out != null) {
            	// 關閉輸出流
                out.close();
            }
        }        
	}
	public static void main(String[] args) throws Exception {
		list();
	}
}
相關文章
相關標籤/搜索