/** * */ package com.jason.hadoop.example; /** * @author jason * */ import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoinDC extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<Text,Text,Text,Text> { private HashMap<String, String> joinData = new HashMap<String, String>(); @Override public void configure(JobConf conf) { Path[] cacheFiles; try { cacheFiles = DistributedCache.getLocalCacheFiles(conf); // System.out.println("cacheFiles is " + cacheFiles); if (cacheFiles != null && cacheFiles.length > 0) { String line = null; String[] tokens = null; // System.out.println("cacheFiles[0] is " + cacheFiles[0].toString()); BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString())); try { while ((line = joinReader.readLine()) != null) { tokens = line.split(",", 2); joinData.put(tokens[0], tokens[1]); // System.out.printf("line is %s,tokens[0] is %s,tokens[1] is %s\n", line, tokens[0], tokens[1]); } } finally { joinReader.close(); } } } catch (IOException e) { e.printStackTrace(); } } @Override public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // System.out.printf("key is %s, value is %s\n", key.toString(), value.toString()); String joinValue = joinData.get(key.toString()); if (joinValue != null) { // System.out.printf("joinValue is %s\n", joinValue); output.collect(key, new Text(value.toString() + "," + joinValue)); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); //放在new Job以前,不然讀不到CacheFile DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf); // DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/example/datajoin/in/user.txt"), conf); JobConf job = new JobConf(conf, DataJoinDC.class); Path in = new Path(args[1]); Path out = new Path(args[2]); FileSystem fs=FileSystem.get(conf); if(fs.exists(out)){ fs.delete(out, true); } FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin with DistributedCache"); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.set("key.value.separator.in.input.line", ","); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args); System.exit(res); } }