讀HDFS文件

不帶壓縮的讀取方式

hdfs = FileSystem.get(new URI("hdfs://SANDBOX-HADOOP-01.whh.net:8022"), conf, "bigdata");
package com.whh.bigdata.xetl.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;

/**
 * Created by whh on 2017/9/29.
 */


public class ReadHDFS {
    private static final String utf8 = "UTF-8";

    /**
     *讀取了HDFS上的一個txt文件裏的內容,按行讀取;返回一個map(行的hash值,行內容),把重複的行打出來
     * @param txtFilePath
     * @param conf
     * @return
     */
    public static Map<Integer,List<Integer>> getStringByTXT(String txtFilePath, Configuration conf)
    {

        Map<Integer,List<Integer>> map = new HashMap<Integer,List<Integer>>();

        StringBuffer buffer = new StringBuffer();
        FSDataInputStream fsr = null;//輸入流
        BufferedReader bufferedReader = null;
        String lineTxt = null;
        try
        {
            FileSystem fs = FileSystem.get(URI.create(txtFilePath),conf);
            fsr = fs.open(new Path(txtFilePath));


            bufferedReader = new BufferedReader(new InputStreamReader(fsr));
            int lineCount = 1;
            while ((lineTxt = bufferedReader.readLine()) != null)
            {
                int hc=lineTxt.hashCode();
                List <Integer>list=new ArrayList();
                list.add(lineCount);
                if (map.containsKey(hc))  //重複的行打印出來
                {
                    System.out.println(lineCount +":" + lineTxt);
                    map.get(hc).add(lineCount);
                }
                else map.put(hc,list);
                lineCount++;

            }
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            if (bufferedReader != null)
            {
                try
                {
                    bufferedReader.close();
                } catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }

        return map;
    }
    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String txtFilePath = "hdfs://SANDBOX-HADOOP-01.whh.net:8022/log_data/stg_log_1600005/day=2017-11-19/-r-00001";

       // String mbline = getStringByTXT(txtFilePath, conf);
       Map <Integer,List<Integer>>map=new HashMap();
        map=getStringByTXT(txtFilePath, conf);
        for (Map.Entry<Integer, List<Integer>> entry : map.entrySet())
        {
            if(entry.getValue().size()>1)
            {
                System.out.println(entry.getKey()+":"+entry.getValue());
            }
        }
    }



}

帶壓縮的文件讀取

package com.whh.bigdata.xetl.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.*;

/**
 * Created by whh on 2017/9/29.
 */


public class ReadHDFS {
    private static final String utf8 = "UTF-8";


    /**
     * 獲取文件的壓縮方式
     * @param file
     * @return
     */
    public String getCodec(Path file) {
            String filename = file.getName();
            String reversedFilename = (new StringBuilder(filename)).reverse().toString();
        return reversedFilename;
    }


    /**
     * 讀取壓縮文件的第一行,打印出來
     * @param txtFilePath
     * @param conf
     * @return
     */

    public static String getStringByTXT1(String txtFilePath, Configuration conf)
    {

        StringBuffer buffer = new StringBuffer();
        FSDataInputStream fsr = null;
        BufferedReader bufferedReader = null;
        String lineTxt = null;
        try
        {
            FileSystem fs = FileSystem.get(URI.create(txtFilePath),conf);
            fsr = fs.open(new Path(txtFilePath));

            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            CompressionCodec codec = factory.getCodec(new Path(txtFilePath));
            System.out.println("codec="+codec);
            CompressionInputStream compin=codec.createInputStream(fsr);
            //BufferedReader br= new BufferedReader(new InputStreamReader(compin));


            bufferedReader = new BufferedReader(new InputStreamReader(compin));
            while ((lineTxt = bufferedReader.readLine()) != null)
            {
                //if(lineTxt.split("\t")[0].trim().equals("00067")){
                    return lineTxt;
                //}

            }
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            if (bufferedReader != null)
            {
                try
                {
                    bufferedReader.close();
                } catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }

        return lineTxt;
    }

   
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String txtFilePath = "hdfs://SANDBOX-HADOOP-01.whh.net:8022/collect_data/userlog/20170925/kp_diag_2017092523_10.1.11.171.1506354616660.1549.log.gz";
        System.out.println(mbline);
    }



}
相關文章
相關標籤/搜索