對於java來講,讀取本地文件再正常不過。可是對於mapreduce程序來講,讀取本地文件經常會陷入誤區。本地明明有這個文件,在本地運行jar包,mapreduce爲何讀不到?由於咱們知道,mapreduce程序原本就不是在本地執行的,程序會分佈式的在各個機器上執行,你固然讀不到文件,那所謂的「本地文件」就不叫「本地文件」,固然只有一個例外:你的hadoop集羣是僞集羣。 好比下面的示例: package test; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FileTest { public static void main(String args[]) { int mr = 0; try { mr = ToolRunner .run(new Configuration(), new FileTestDriver(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(mr); } } class FileTestDriver extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Configuration config = getConf(); JobConf conf = new JobConf(config, FileTestDriver.class); String[] otherArgs = new GenericOptionsParser(config, arg0) .getRemainingArgs(); String input = otherArgs[0]; String ouput = otherArgs[1]; conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.set("mapred.task.timeout", "6000000"); conf.setMapperClass(FileTestMapper.class); conf.setReducerClass(FileTestReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(ouput)); JobClient.runJob(conf); return 0; } } class FileTestMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private String filepath = ""; public void configure(JobConf job) { filepath = job.get("files"); } public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String url = "qq.com"; String host = getTop100DomainTest(url, filepath); output.collect(new Text(url + "\t" + host), new Text("")); } public String getTop100DomainTest(String url, String filepath) { try { BufferedReader reader = new BufferedReader(new FileReader(new File( filepath))); String line = ""; while ((line = reader.readLine()) != null) { // splitLine[0]爲host 後面跟着域名 line = line.replaceAll("( )+", " "); String[] splitLine = line.split(" "); for (int i = 1; i < splitLine.length; i++) { String host = splitLine[i]; if (url.equals(host)) { return splitLine[0]; } } } return ""; } catch (FileNotFoundException e) { return ""; } catch (IOException e) { return ""; } } } class FileTestReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(key, new Text("")); } } public String getTop100DomainTest(String url, String filepath)方法讀取文件,並根據url返回url的domain。 將上述程序打包test.jar後, 運行命令: hadoop jar test.jar test.FileTest -D files="/opt/top100.txt" /test/test /test/test1 若是您是僞集羣,那麼恭喜,程序成功運行,若是您是分佈式,那麼程序極可能運行不成功? 咱們知道原理後,這段代碼在分佈式的狀況下,也能夠運行成功,怎麼辦?那就把集羣的全部機器都拷貝top100.txt到/opt下! 程序運行成功了吧?但實際上是很老土的。當你集羣數多,你要一一拷貝,那是多麼麻煩的一件事,並且全部的配置文件必須在一樣的文件夾下,若是你能忍受,那go ahead。 實際上mapreduce提供了一個緩存方法DistributedCache。 只需在配置階段加入: DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf); 便可,但此處的"/test/top100.txt"爲hdfs的路徑。 而後在mapper 的public void configure(JobConf job)方法中加入 public void configure(JobConf job) { try { localFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException e) { e.printStackTrace(); } } 便可。 map中引用,經過 path.toUri().getPath()便可訪問到file。