hadoop-小文件合併
package com.andy.merge;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
public class RegexAcceptFilter implements PathFilter{
private final String regex ;
public RegexAcceptFilter(String regex){
this.regex = regex ;
}
//只接受符合regex的文件
@Override
public boolean accept(Path path) {
boolean flag = path.toString().matches(regex) ;
return flag;
}
}
package com.andy.merge;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
//PathFilter是一個接口,裏面只有一個方法accept(Path path)
public class RegexUncludeFilter implements PathFilter{
private final String regex ;
public RegexUncludeFilter(String regex){
this.regex = regex ;
}
//過濾 regex 格式的文件
@Override
public boolean accept(Path path) {
boolean flag = path.toString().matches(regex);
//符合得我就接受,不符合的就過濾,因此是非flag
return !flag;
}
}
package com.andy.merge;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* 小文件合併
* @author huang
*
*/
public class MegerSmallFiles {
//寫入到HDFS的FileSystem對象
private static FileSystem fs = null ;
//本地文件系統的FileSystem
private static FileSystem local = null ;
//HDFS服務路徑
private static final String HDFS_SERVER = "hdfs://192.168.153.111:9000" ;
//合併小文件的主要方法
public static void megerFiles() throws Exception {
//設置系統用戶爲hadoop
System.setProperty("HADOOP_USER_NAME", "hadoop") ;
//讀取hadoop文件的配置信息
Configuration conf = new Configuration() ;
//建立URI
URI uri = new URI(HDFS_SERVER) ;
//建立兩個文件系統的fs
fs = FileSystem.get(uri, conf) ; //針對HDFS
local = FileSystem.get(conf) ; //針對本地文件系統
/* 獲取指定路徑下的全部文件
* 過濾該路徑下的全部svn文件
* ^匹配一行的開頭 ;.表示匹配任意一個字符
* *表示匹配0個或多個前面這個字符 ;$匹配一行的結束
* */
FileStatus[] globStatus = local.globStatus(new Path("D:/pdata/*"),
new RegexUncludeFilter("^.*svn$"));
//調試輸出
for (FileStatus fileStatus : globStatus) {
System.out.println(fileStatus.getPath().toString());
}
//將一組FileStatus對象轉換成Path對象
Path[] dirs = FileUtil.stat2Paths(globStatus);
//獲取輸入輸出流
FSDataOutputStream out = null ;
FSDataInputStream in = null ;
for (Path dir : dirs) { //具體的每一個目錄下面的全部文件
//文件名稱
String fileName = dir.getName().replaceAll("-", "") ;
//只接受該目錄下的txt文件
FileStatus[] txtPaths = local.globStatus(new Path(dir + "/*") ,
new RegexAcceptFilter("^.*txt$"));
Path[] txtFiles = FileUtil.stat2Paths(txtPaths);
//設置輸出路徑
Path hdfsFile = new Path(HDFS_SERVER + "/vip/" + fileName + ".txt") ;
//打開輸入輸出流,進行讀寫
out = fs.create(hdfsFile) ; //輸出流
for (Path p : txtFiles) {
in = local.open(p) ;
IOUtils.copyBytes(in, out, 4096, false);
//關閉輸入流
in.close();
}
if(null != out){
out.close();
}
}
}
//程序入口
public static void main(String[] args) throws Exception {
megerFiles() ;
System.out.println("=====小文件合併成功=====");
}
}