mapreduce如何使用本地文件

對於java來講,讀取本地文件再正常不過。可是對於mapreduce程序來講,讀取本地文件經常會陷入誤區。本地明明有這個文件,在本地運行jar包,mapreduce爲何讀不到?由於咱們知道,mapreduce程序原本就不是在本地執行的,程序會分佈式的在各個機器上執行,你固然讀不到文件,那所謂的「本地文件」就不叫「本地文件」,固然只有一個例外:你的hadoop集羣是僞集羣。

好比下面的示例:

package test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FileTest
{
    public static void main(String args[])
    {
        int mr = 0;
        try
        {
            mr = ToolRunner
                    .run(new Configuration(), new FileTestDriver(), args);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        
        System.exit(mr);
    }
}
class FileTestDriver extends Configured implements Tool
{
    @Override
    public int run(String[] arg0) throws Exception
    {
        Configuration config = getConf();
        JobConf conf = new JobConf(config, FileTestDriver.class);
        String[] otherArgs = new GenericOptionsParser(config, arg0)
                .getRemainingArgs();
        String input = otherArgs[0];
        String ouput = otherArgs[1];
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.set("mapred.task.timeout", "6000000");
        conf.setMapperClass(FileTestMapper.class);
        conf.setReducerClass(FileTestReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(ouput));
        JobClient.runJob(conf);
        return 0;
    }
}
class FileTestMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, Text>
{
    private String filepath = "";
    
    public void configure(JobConf job)
    {
        filepath = job.get("files");
    }
    public void map(LongWritable key, Text value,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException
    {
        String url = "qq.com";
        String host = getTop100DomainTest(url, filepath);
        output.collect(new Text(url + "\t" + host), new Text(""));
       
    }
    public String getTop100DomainTest(String url, String filepath)
    {
        try
        {
            BufferedReader reader = new BufferedReader(new FileReader(new File(
                    filepath)));
            String line = "";

            while ((line = reader.readLine()) != null)
            {
                // splitLine[0]爲host 後面跟着域名
                line = line.replaceAll("( )+", " ");
                String[] splitLine = line.split(" ");
                for (int i = 1; i < splitLine.length; i++)
                {
                    String host = splitLine[i];
                    if (url.equals(host))
                    {
                                return splitLine[0];
                    }
                }
            }
            return "";
        }
        catch (FileNotFoundException e)
        {
            return "";
        }
        catch (IOException e)
        {
            return "";
        }

    }

}

class FileTestReducer extends MapReduceBase implements
        Reducer<Text, Text, Text, Text>
{
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException
    {
        output.collect(key, new Text(""));
    }

}

 public String getTop100DomainTest(String url, String filepath)方法讀取文件,並根據url返回url的domain。

將上述程序打包test.jar後,

運行命令:

hadoop jar test.jar test.FileTest -D files="/opt/top100.txt"  /test/test /test/test1

若是您是僞集羣,那麼恭喜,程序成功運行,若是您是分佈式,那麼程序極可能運行不成功?

咱們知道原理後,這段代碼在分佈式的狀況下,也能夠運行成功,怎麼辦?那就把集羣的全部機器都拷貝top100.txt到/opt下!

程序運行成功了吧?但實際上是很老土的。當你集羣數多,你要一一拷貝,那是多麼麻煩的一件事,並且全部的配置文件必須在一樣的文件夾下,若是你能忍受,那go ahead。

實際上mapreduce提供了一個緩存方法DistributedCache。

只需在配置階段加入:

DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf);

便可,但此處的"/test/top100.txt"爲hdfs的路徑。

而後在mapper 的public void configure(JobConf job)方法中加入

public void configure(JobConf job)
    {
        try
        {
            localFiles = DistributedCache.getLocalCacheFiles(job);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

便可。

map中引用,經過 path.toUri().getPath()便可訪問到file。
相關文章
相關標籤/搜索