hadoop datajoin例子-mapper側鏈接

/**
 * 
 */
package com.jason.hadoop.example;

/**
 * @author jason
 *
 */
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoinDC extends Configured implements Tool {

    public static class MapClass extends MapReduceBase implements Mapper<Text,Text,Text,Text> {
        
        private HashMap<String, String> joinData = new HashMap<String, String>();

        @Override
        public void configure(JobConf conf) {
            Path[] cacheFiles;
            try {
                cacheFiles = DistributedCache.getLocalCacheFiles(conf);
//                System.out.println("cacheFiles is " + cacheFiles);
                if (cacheFiles != null && cacheFiles.length > 0) {
                    String line = null;
                    String[] tokens = null;
//                    System.out.println("cacheFiles[0] is " + cacheFiles[0].toString());
                    BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString())); 
                    try {
                        while ((line = joinReader.readLine()) != null) {
                            tokens = line.split(",", 2);
                            joinData.put(tokens[0], tokens[1]);
//                            System.out.printf("line is %s,tokens[0] is %s,tokens[1] is %s\n", line, tokens[0], tokens[1]);
                        }
                    } finally {
                        joinReader.close();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            
        }

        @Override
        public void map(Text key, Text value, OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
//            System.out.printf("key is %s, value is %s\n", key.toString(), value.toString());
            String joinValue = joinData.get(key.toString());
            if (joinValue != null) {
//                System.out.printf("joinValue is %s\n", joinValue);
                output.collect(key, new Text(value.toString() + "," + joinValue));
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        //放在new Job以前,不然讀不到CacheFile
        DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);
//        DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/example/datajoin/in/user.txt"), conf); 
        JobConf job = new JobConf(conf, DataJoinDC.class);
        Path in = new Path(args[1]);
        Path out = new Path(args[2]);
        FileSystem fs=FileSystem.get(conf);  
        if(fs.exists(out)){  
            fs.delete(out, true);
        }    
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("DataJoin with DistributedCache");
        job.setMapperClass(MapClass.class);
        job.setNumReduceTasks(0);

        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);

        job.set("key.value.separator.in.input.line", ",");
        job.set("mapred.textoutputformat.separator", ",");
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args);
        System.exit(res);
    }

}
相關文章
相關標籤/搜索