hadoop datajoin例子

java代碼:java

/**
 * 
 */
package com.jason.hadoop.example;

/**
 * @author jason
 *
 */
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoin extends Configured implements Tool {

    public static class MapClass extends DataJoinMapperBase {

        @Override
        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];

            return new Text(groupKey);
        }

        @Override
        protected Text generateInputTag(String inputFile) {
            String datasource = inputFile.split("-")[0];

            return new Text(datasource);
        }

        @Override
        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }

    }

    public static class Reduce extends DataJoinReducerBase {

        @Override
        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) {
                return null;
            }
            if (tags.length == 1
                    && ((Text) tags[0]).toString().endsWith("user")) {
                return null;
            }

            String retStr = "";
            if (tags.length == 1
                    && ((Text) tags[0]).toString().endsWith("order")) {
                retStr = "NULL,";
            }
            for (int i = 0; i < values.length; i++) {
                if (i > 0) {
                    retStr += ",";
                }
                TaggedWritable tw = (TaggedWritable) values[i];
                // 結果是: 3,王五,137xxxxxxxx
                String line = ((Text) tw.getData()).toString();
                // 結果是: {3,"王五,137xxxxxxxx"}
                String[] tokens = line.split(",", 2);
                retStr += tokens[1];
            }
            TaggedWritable ret = new TaggedWritable(new Text(retStr));
            ret.setTag((Text) tags[0]);

            return ret;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {
        private Text data;

        public TaggedWritable() {
            // 必須有,不然反序列化時出錯
            this.data = new Text("");
        }

        public TaggedWritable(Text data) {
            this.data = data;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            this.data.readFields(in);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        @Override
        public Writable getData() {
            return data;
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        JobConf job = new JobConf(conf, DataJoin.class);
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("DataJoin");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(TaggedWritable.class);

        job.setOutputValueClass(Text.class);

        job.set("mapred.textoutputformat.separator", ",");
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new DataJoin(), args);
        System.exit(res);
    }

}

數據準備:shell

user.txtapache

1,張三,135xxxxxxxx
2,李四,136xxxxxxxx
3,王五,137xxxxxxxx
4,趙六,138xxxxxxxx

order.txtapp

3,A,13,2013-02-12
1,B,23,2013-02-14
2,C,16,2013-02-17
3,D,25,2013-03-12

上傳數據:ide

hadoop fs -put *.txt //example/datajoin/in

最後使用命令行運行:oop

HADOOP_CLASSPATH=/home/jason/hadoop-1.0.1/contrib/datajoin/hadoop-datajoin-1.0.1.jar hadoop jar hadoopExample-1.0-SNAPSHOT.jar com.jason.hadoop.example.DataJoin -libjars /home/jason/hadoop-1.0.1/contrib/datajoin/hadoop-datajoin-1.0.1.jar /example/datajoin/in /example/datajoin/out1
相關文章
相關標籤/搜索