不帶壓縮的讀取方式
hdfs = FileSystem.get(new URI("hdfs://SANDBOX-HADOOP-01.whh.net:8022"), conf, "bigdata");
package com.whh.bigdata.xetl.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;
/**
* Created by whh on 2017/9/29.
*/
public class ReadHDFS {
private static final String utf8 = "UTF-8";
/**
*讀取了HDFS上的一個txt文件裏的內容,按行讀取;返回一個map(行的hash值,行內容),把重複的行打出來
* @param txtFilePath
* @param conf
* @return
*/
public static Map<Integer,List<Integer>> getStringByTXT(String txtFilePath, Configuration conf)
{
Map<Integer,List<Integer>> map = new HashMap<Integer,List<Integer>>();
StringBuffer buffer = new StringBuffer();
FSDataInputStream fsr = null;//輸入流
BufferedReader bufferedReader = null;
String lineTxt = null;
try
{
FileSystem fs = FileSystem.get(URI.create(txtFilePath),conf);
fsr = fs.open(new Path(txtFilePath));
bufferedReader = new BufferedReader(new InputStreamReader(fsr));
int lineCount = 1;
while ((lineTxt = bufferedReader.readLine()) != null)
{
int hc=lineTxt.hashCode();
List <Integer>list=new ArrayList();
list.add(lineCount);
if (map.containsKey(hc)) //重複的行打印出來
{
System.out.println(lineCount +":" + lineTxt);
map.get(hc).add(lineCount);
}
else map.put(hc,list);
lineCount++;
}
} catch (Exception e)
{
e.printStackTrace();
} finally
{
if (bufferedReader != null)
{
try
{
bufferedReader.close();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
return map;
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String txtFilePath = "hdfs://SANDBOX-HADOOP-01.whh.net:8022/log_data/stg_log_1600005/day=2017-11-19/-r-00001";
// String mbline = getStringByTXT(txtFilePath, conf);
Map <Integer,List<Integer>>map=new HashMap();
map=getStringByTXT(txtFilePath, conf);
for (Map.Entry<Integer, List<Integer>> entry : map.entrySet())
{
if(entry.getValue().size()>1)
{
System.out.println(entry.getKey()+":"+entry.getValue());
}
}
}
}
帶壓縮的文件讀取
package com.whh.bigdata.xetl.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;
/**
* Created by whh on 2017/9/29.
*/
public class ReadHDFS {
private static final String utf8 = "UTF-8";
/**
* 獲取文件的壓縮方式
* @param file
* @return
*/
public String getCodec(Path file) {
String filename = file.getName();
String reversedFilename = (new StringBuilder(filename)).reverse().toString();
return reversedFilename;
}
/**
* 讀取壓縮文件的第一行,打印出來
* @param txtFilePath
* @param conf
* @return
*/
public static String getStringByTXT1(String txtFilePath, Configuration conf)
{
StringBuffer buffer = new StringBuffer();
FSDataInputStream fsr = null;
BufferedReader bufferedReader = null;
String lineTxt = null;
try
{
FileSystem fs = FileSystem.get(URI.create(txtFilePath),conf);
fsr = fs.open(new Path(txtFilePath));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(new Path(txtFilePath));
System.out.println("codec="+codec);
CompressionInputStream compin=codec.createInputStream(fsr);
//BufferedReader br= new BufferedReader(new InputStreamReader(compin));
bufferedReader = new BufferedReader(new InputStreamReader(compin));
while ((lineTxt = bufferedReader.readLine()) != null)
{
//if(lineTxt.split("\t")[0].trim().equals("00067")){
return lineTxt;
//}
}
} catch (Exception e)
{
e.printStackTrace();
} finally
{
if (bufferedReader != null)
{
try
{
bufferedReader.close();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
return lineTxt;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String txtFilePath = "hdfs://SANDBOX-HADOOP-01.whh.net:8022/collect_data/userlog/20170925/kp_diag_2017092523_10.1.11.171.1506354616660.1549.log.gz";
System.out.println(mbline);
}
}