大數據(hadoop-小文件合併、Mapreduce原理)

hadoop-小文件合併

package com.andy.merge;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

public class RegexAcceptFilter implements PathFilter{
	private final String regex ;
	
	public RegexAcceptFilter(String regex){
		this.regex = regex ;
	}

	//只接受符合regex的文件
	@Override
	public boolean accept(Path path) {
		boolean flag = path.toString().matches(regex) ;
		return flag;
	}
	
}
package com.andy.merge;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

//PathFilter是一個接口,裏面只有一個方法accept(Path path)
public class RegexUncludeFilter implements PathFilter{
	private final String regex ;

	public RegexUncludeFilter(String regex){
		this.regex = regex ;
	}
	
	//過濾 regex 格式的文件
	@Override
	public boolean accept(Path path) {
		boolean flag = path.toString().matches(regex);
		//符合得我就接受,不符合的就過濾,因此是非flag
		return !flag;
	}
	
	
}
package com.andy.merge;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 小文件合併
 * @author huang
 *
 */
public class MegerSmallFiles {
	//寫入到HDFS的FileSystem對象
	private static FileSystem fs = null ;
	
	//本地文件系統的FileSystem
	private static FileSystem local = null ;
	
	//HDFS服務路徑
	private static final String HDFS_SERVER = "hdfs://192.168.153.111:9000" ;
	
	//合併小文件的主要方法
	public static void megerFiles() throws Exception {
		//設置系統用戶爲hadoop
		System.setProperty("HADOOP_USER_NAME", "hadoop") ;
		
		//讀取hadoop文件的配置信息
		Configuration conf = new Configuration() ;
		
		//建立URI
		URI uri = new URI(HDFS_SERVER) ;
		
		//建立兩個文件系統的fs
		fs = FileSystem.get(uri, conf) ;	//針對HDFS
		local = FileSystem.get(conf) ;		//針對本地文件系統
		
		/* 獲取指定路徑下的全部文件
		 * 過濾該路徑下的全部svn文件
		 *  ^匹配一行的開頭 ;.表示匹配任意一個字符
		 *  *表示匹配0個或多個前面這個字符 ;$匹配一行的結束
		 * */
		FileStatus[] globStatus = local.globStatus(new Path("D:/pdata/*"), 
				new RegexUncludeFilter("^.*svn$"));
		//調試輸出
		for (FileStatus fileStatus : globStatus) {
			System.out.println(fileStatus.getPath().toString());
		}
		
		//將一組FileStatus對象轉換成Path對象
		Path[] dirs = FileUtil.stat2Paths(globStatus);
		
		//獲取輸入輸出流
		FSDataOutputStream out = null ;
		FSDataInputStream in = null ;
		
		for (Path dir : dirs) {	//具體的每一個目錄下面的全部文件
			//文件名稱
			String fileName = dir.getName().replaceAll("-", "") ;
			//只接受該目錄下的txt文件
			FileStatus[] txtPaths = local.globStatus(new Path(dir + "/*") ,
					new RegexAcceptFilter("^.*txt$"));
			Path[] txtFiles = FileUtil.stat2Paths(txtPaths);
			
			//設置輸出路徑
			Path hdfsFile = new Path(HDFS_SERVER + "/vip/" + fileName + ".txt") ;
			
			//打開輸入輸出流,進行讀寫
			out = fs.create(hdfsFile) ;	//輸出流
			for (Path p : txtFiles) {
				in = local.open(p) ;
				IOUtils.copyBytes(in, out, 4096, false);
				//關閉輸入流
				in.close();
			}
			if(null != out){
				out.close();
			}
		}
	}
	
	//程序入口
	public static void main(String[] args) throws Exception {
		megerFiles() ;
		System.out.println("=====小文件合併成功=====");
	}

}
相關文章
相關標籤/搜索