hadoop學習,利用mapreduce實現wordcount和電影評分預測

mapreduce中map指映射,map指的是歸約。
java

mapreduce是一個key-value來處理數據的編程模型,它使用map將一組key-value映射爲另外一組key-value正則表達式

經過底層傳遞給reduce,在reduce中,它將全部map過程傳遞過來的key-value進行歸約,相同的key值,value值會放在一塊兒。mapreduce內部還會對reduce過程當中的key值進行一次排序。
算法

一.WordCountapache

public class WordCount
{
    //
    public static final String HDFS = "hdfs://localhost:8888";
    public static final Pattern DELIMITER = Pattern.compile("\\b([a-zA-Z]+)\\b");
    
    //自定義Map類型執行  "映射"這一部分
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
    {
        //mapreduce中,Text至關於String類型,IntWritable至關於Int類型
        //LongWritable是實現了WritableComparable的一個數據類型。
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        @Override
        //重寫父類map()函數
        public void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException
        {
            //讀取一行數據
            String line = value.toString();

            //將該行字符所有變爲小寫
            line = line.toLowerCase();
            //根據定義好的正則表達式拆分一行字符串。
            Matcher matcher = DELIMITER.matcher(line);
            while(matcher.find()){
                //將分解的一個個單詞類型轉化爲Text。
                word.set(matcher.group());
                //將相應的key-value值傳入。key值爲單詞,value值爲1.
                context.write(word,one);
            }
        }
    }
    
    //自定義Combine過程先對本地進行的map進行一次reduce過程,減小傳遞給主機的數據量.
    public static class Combine extends Reducer <Text, IntWritable, Text, IntWritable>
    {
         @Override
         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            //遍歷同一個key值的全部value,全部的value放在同一個Iterable中。
            for (IntWritable line : values)
            {
                sum += line.get();
            }
            IntWritable value = new IntWritable(sum);
            //將key-value按照指定的輸出格式輸出。
            context.write(key, value);
        }
    }
    
    public static class Reduce extends Reducer <Text, IntWritable, Text, IntWritable>
    {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
           for (IntWritable line : values)
           {
               sum += line.get();
           }
           IntWritable value = new IntWritable(sum);
           context.write(key, value);
            

            
       }
    }
    public static void main(String[] args) throws Exception
    {
        JobConf conf = WordCount.config();
        String input = "data/1.txt";
        String output = HDFS + "/user/hdfs/wordcount";
        //自定義HDFS文件操做工具類
        HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
        //移除存在的文件不然會報文件生成文件已存在的錯誤
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(WordCount.class);
        
        //設置輸出的key值類型
        job.setOutputKeyClass(Text.class);

        //設置輸出的value值類型
        job.setOutputValueClass(IntWritable.class);
        
        job.setMapperClass(WordCount.Map.class);
        job.setCombinerClass(WordCount.Combine.class);
        job.setReducerClass(WordCount.Reduce.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        //設置輸出的格式,這裏使用的是自定義的FileOutputFormat類,見下文。
        job.setOutputFormatClass(ParseTextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

       
      
        
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
    
    public static JobConf config() {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
//        conf.set("io.sort.mb", "");
        return conf;
    }

    


    
}

自定義文件輸出格式
編程

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;



public class ParseTextOutputFormat<K, V> extends FileOutputFormat<K, V>{
    protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
        private static final String utf8 = "UTF-8";
        private static final byte[] newline;
        static {
          try {
            newline = "\n".getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }

        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
          this.out = out;
          try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }

        public LineRecordWriter(DataOutputStream out) {
          this(out, "\t");
        }

        /**
         * Write the object to the byte stream, handling Text as a special
         * case.
         * @param o the object to print
         * @throws IOException if the write throws, we pass it on
         */
        private void writeObject(Object o) throws IOException {
          if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
          } else {
            out.write(o.toString().getBytes(utf8));
          }
        }

        public synchronized void write(K key, V value)
          throws IOException {

          boolean nullKey = key == null || key instanceof NullWritable;
          boolean nullValue = value == null || value instanceof NullWritable;
          if (nullKey && nullValue) {
            return;
          }
          if (!nullKey) {
            writeObject(key);
          }
          if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
          }
          if (!nullValue) {
            writeObject(value);
          }
          out.write(newline);
        }

        public synchronized 
        void close(TaskAttemptContext context) throws IOException {
          out.close();
        }
      }

      public RecordWriter<K, V> 
             getRecordWriter(TaskAttemptContext job
                             ) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        boolean isCompressed = getCompressOutput(job);
        String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
                                           ":");
        CompressionCodec codec = null;
        String extension = "";
        if (isCompressed) {
          Class<? extends CompressionCodec> codecClass = 
            getOutputCompressorClass(job, GzipCodec.class);
          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
          extension = codec.getDefaultExtension();
        }
        Path file = getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        if (!isCompressed) {
          FSDataOutputStream fileOut = fs.create(file, false);
          return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
        } else {
          FSDataOutputStream fileOut = fs.create(file, false);
          return new LineRecordWriter<K, V>(new DataOutputStream
                                            (codec.createOutputStream(fileOut)),
                                            keyValueSeparator);
        }
      }
        
 }

二.電影評分預測app

整個算法的實現中使用了slop one算法來預測評分,此處自定義的輸出類與上文一致。dom

輸入文件格式爲userId::movieId::score
ide

package  main.java.org.conan.myhadoop.recommend;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;
import  main.java.org.conan.myhadoop.hdfs.HdfsDAO;

public class Recommend {

    public static final String HDFS = "hdfs://localhost:8888";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    
    public static final Pattern STRING = Pattern.compile("[\t,:]");
    
//    public final static int movieListLength = 100000;
//    public static int []movieList = new int[movieListLength];
    public static List movieList = new ArrayList();
    
    public static Map userScore = new HashMap();

    public static void main(String[] args) throws Exception {
        Map<String, String> path = new HashMap<String, String>();
        String in = "logfile/4.txt";
        String out = HDFS + "/user/hdfs/recommend" + "/step5";
        
//       path.put("data", "logfile/small.csv");
        
//       path.put("data", "logfile/ratings.dat");
        if(args.length == 2){
            in = args[0];
            out = HDFS + args[1];
            System.out.println(out);
        }
        //設置數據輸入路徑
        path.put("data", in);
        
        //設置第一步輸入文件路徑
        path.put("Step1Input", HDFS + "/user/hdfs/recommend");
        
        //設置第一步結果輸出路徑
        path.put("Step1Output", path.get("Step1Input") + "/step1");
        
        //設置第二步輸入文件路徑
        path.put("Step2Input", path.get("Step1Output"));
        
        //設置第二步結果輸出路徑
        path.put("Step2Output", path.get("Step1Input") + "/step2");
        
        //設置第三步輸入文件路徑
        path.put("Step3Input1", path.get("data"));
//        path.put("Step3Input2", "logfile/movie/movies.dat");
        //設置第三步結果輸出路徑
        path.put("Step3Output", path.get("Step1Input") + "/step3");
//        path.put("Step3Input2", path.get("Step2Output"));
//        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
//        
        //設置第四步輸入文件路徑1
        path.put("Step4Input1", path.get("Step2Output"));
        
        //設置第四步輸入文件路徑2
        path.put("Step4Input2", path.get("Step3Output"));
        //設置第四步結果輸出路徑
        path.put("Step4Output", path.get("Step1Input") + "/step4");
//        
        //設置第五步輸入文件路徑
        path.put("Step5Input", path.get("Step4Output"));
//        path.put("Step5Input2", path.get("Step3Output2"));
        //設置第五步結果輸出路徑
        path.put("Step5Output", out);
        
        //第一步,根據給出的用戶評分文件,求出每一個用戶對物品的評分矩陣
        Step1.run(path);
        
        //根據第一步的輸出結果計算物品評分的同現矩陣
        Step2.run(path);
        
        //獲取全部用戶評過度的電影,並輸出每位用戶對每部電影的評分,未評過則記爲0
        Step3.run(path);
        
        //根據第二步與第三步的結果計算出每位用戶對每部電影的評分
        Step4.run(path);
        
        //整理輸出格式。
        Step5.run(path);
        
        System.exit(0);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(Recommend.class);
        conf.setJobName("Recommand");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
//        conf.set("io.sort.mb", "");
        return conf;
    }

}

//求出用戶對物品的評分矩陣,即得出用戶對電影 的評分矩陣
//每一行數據表明一個用戶對全部打分電影的結果
//key值爲userId, value值爲movieID:score,movieId:score


public class Step1 {

    public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        private final static Text k = new Text();
        private final static Text v = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] tokens = value.toString().split("::");
            String itemID = tokens[1];
            String pref = tokens[2];
            k.set(tokens[0]);
            v.set(itemID + ":" + pref);
            output.collect(k, v);
        }
    }

    public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String value= "";
            int num = 0;
            while (values.hasNext()) {
                num++;
                value += values.next();
                value += ",";
                if( num >= 400 ){
                    value = value.substring(0, value.length() - 1);
                    Text v = new Text(value);
                    output.collect(key, v);
                    value = "";
                    num = 0;
                    break;
                }

            }
            if(num != 0){
                value = value.substring(0, value.length() - 1);
                Text v = new Text(value);
                output.collect(key, v);
            }
            
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step1Input");
        String output = path.get("Step1Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
//        hdfs.rmr(output);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("data"), input);

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Step1_ToItemPreMapper.class);
        conf.setReducerClass(Step1_ToUserVectorReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }

}

//根據第一步的 結果求出物品的同現矩陣
//算法方面,沒有太好的算法處理兩個for循環,就在求物品同現矩陣的時候使用一個隨機數,得出一個movieA:movieB的結果

public class Step2 {
    public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> {
        private final static Text k = new Text();
        private final static DoubleWritable v = new DoubleWritable();
//        private final static IntWritable v = new IntWritable(1);


        @Override
        public void map(LongWritable key, Text values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            for (int i = 1; i < tokens.length; i++) {
                String itemID = tokens[i].split(":")[0];
//                for (int j = 1; j < i+1; j++) {
//                    String itemID2 = tokens[j].split(":")[0];
//                    double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]); 
////                    if(sum<0.5) break;
////                    if(sum>4.5) break;
//                    k.set(itemID + ":" + itemID2+":");
//                    v.set(sum);
//                    output.collect(k, v);
//                    k.set(itemID2 + ":" + itemID+":");
//                    v.set(sum);
//                    output.collect(k, v);
//
//                }
                Random random = new Random();
                int j;
                j = random.nextInt(tokens.length - 1) + 1;
                String itemID2 = tokens[j].split(":")[0];
                double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);
                k.set(itemID + ":" + itemID2+":");
                v.set(sum);
                output.collect(k, v);
            }
        }
    }

    public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();

        @Override
        public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
            double sum = 0;
            int count = 0;
            while (values.hasNext()) {
                sum += values.next().get();
                count++;
            }
            sum = sum/count*1.0;
            DecimalFormat df = new DecimalFormat("#.0000");
            sum = Double.valueOf(df.format(sum));
//            System.out.println(key+"---count----"+count+"-------"+sum);
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step2Input"); 
        String output = path.get("Step2Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(DoubleWritable.class);

        conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
//        conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
        conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }
}

//取全部用戶評過度的電影,並輸出每位用戶對每部電影的評分,未評過則記爲0
//此處由於沒有一個專門的電影記錄爲文件,因此就只能從數據文件中獲取到全部的電影ID。
//並將全部的電影ID維持在一個線性表中,可是當數據文件過大時,每次讀取一條數據都要從線性表中判斷該電影是否已經記錄
//,致使效率會愈來愈低
//而且維持一個靜態map記錄每一個用戶對的第一部評過度的電影,以此做爲標準,使用物品同現矩陣進行計算
public class Step3 {

    public static class Step4_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {
        private final static Text k = new Text();
        private final static Text v = new Text();
        private String flag;    //判斷讀取的數據集
//        private final static Map<Integer, List<Cooccurrence>> cooccurrenceMatrix = new HashMap<Integer, List<Cooccurrence>>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判斷讀的數據集
            
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = values.toString().split("::");
//            System.out.println(flag);
//            System.out.println(tokens.length);
//            
//            for(int i = 0;i < tokens.length;i++){
//                System.out.println(tokens[i]);
//            }
            
//            獲取全部的電影數據,應該有一個文件記錄全部的電影信息,就不用判斷是否包含直接添加
            if( !Recommend.movieList.contains(tokens[1]) ){
                Recommend.movieList.add(tokens[1]);
            }
            
//            if(flag.equals("movie")){
//                Recommend.movieList.add(tokens[0]);
//            }
//            else{
                k.set(tokens[0]);
                v.set(tokens[1] + "," + tokens[2]);
                context.write(k, v);
//            }
            
        }
    }

    public static class Step4_AggregateAndRecommendReducer extends Reducer<Text, Text, Text, Text> {
        private final static Text v = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            Map userMovieList = new HashMap();
            for(Text line : values){
                String[] tokens = Recommend.DELIMITER.split(line.toString());
                userMovieList.put(tokens[0], tokens[1]);
            }
            for(int i = 0; i < Recommend.movieList.size();i++){
//                System.out.println("key---->" + key);
//                System.out.println("value---->" + v);
                if(!userMovieList.containsKey(Recommend.movieList.get(i))){
                    v.set(Recommend.movieList.get(i) + "," + 0);
                    context.write(key, v);
                }
                else{
                    v.set(Recommend.movieList.get(i) + "," + userMovieList.get(Recommend.movieList.get(i)));
                    context.write(key, v);
                }
            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = Recommend.config();

        String input1 = path.get("Step3Input1");
//        String input2 = path.get("Step3Input2");

        String output = path.get("Step3Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Step3.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step3.Step4_PartialMultiplyMapper.class);
        job.setReducerClass(Step3.Step4_AggregateAndRecommendReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1));
        FileOutputFormat.setOutputPath(job, new Path(output));
        

        do{
            job.waitForCompletion(false);
        }while(!job.isComplete());
    }

}

//根據第二步與第三步的結果計算出每位用戶對每部電影的評分
//根據第三步結果,讀取數據,當發現用戶對某部電影的評分爲0的時候,
//根據第二步獲得的map獲取數據和物品同現矩陣計算得出用戶對電影的評分
public class Step4 {

    public static class Step4Update_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// A同現矩陣 or B評分矩陣

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判斷讀的數據集

//             System.out.println(flag);
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            
            
            
            if (flag.equals("step2")) {// 同現矩陣
//                System.out.println(tokens.length);
//                for(int i = 0; i < tokens.length;i++){
//                    System.out.println(tokens[i]);
//                }
//                String[] v1 = tokens[0].split(":");
//                String itemID1 = v1[0];
//                String itemID2 = v1[1];
//                String num = tokens[1];
//
//                Text k = new Text(itemID1);
//                Text v = new Text("A:" + itemID2 + "," + num);
                String[] v1 = tokens[0].split(":");
                
                
                
                
                Text k = new Text(v1[0]);
                
                Text v = new Text("M:" + v1[1] + "," + tokens[1]);
                
                
                context.write(k, v);
//                 System.out.println(k.toString() + "  " + v.toString());

            } else if (flag.equals("step3")) {// 評分矩陣
//                System.out.println(tokens.length);
//                for(int i = 0; i < tokens.length;i++){
//                    System.out.println(tokens[i]);
//                }
                
//                String[] v2 = tokens[1].split(",");
////                String itemID = tokens[0];
////                String userID = v2[0];
////                String pref = v2[1];
                
                if(Double.parseDouble(tokens[2]) != 0 && !Recommend.userScore.containsKey(tokens[0])){
                    Recommend.userScore.put(tokens[0], tokens[1] + "," + tokens[2]);
                }
////
                Text k = new Text(tokens[1]);
                
                Text v = new Text("U:" + tokens[0] + "," + tokens[2]);

                context.write(k, v);
                // System.out.println(k.toString() + "  " + v.toString());
            }
        }

    }

    public static class Step4Update_AggregateReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            System.out.println("key--->" + key);
            Map movie = new HashMap();
            Text k;
            Text v;
            //Map user = new HashMap();
            List list = new ArrayList();
            for (Text line : values) {
                list.add(line.toString());
//                System.out.println(line.toString());
                String[] tokens = Recommend.STRING.split(line.toString());
                if(tokens[0].equals("M")){
//                    System.out.println(tokens[1]);
//                    System.out.println(tokens[2]);
                    movie.put(tokens[1], tokens[2]);
                }
            }
            
            for(int i = 0;i < list.size();i++) {
                
                String[] tokens = Recommend.STRING.split((String) list.get(i));
                //System.out.println(tokens[0]);
                if(tokens[0].equals("U")){
                    if(Double.parseDouble(tokens[2]) == 0 ){
                        String userScore = (String) Recommend.userScore.get(tokens[1]);
                        String[] temps =  Recommend.STRING.split(userScore);
                        k = new Text(key);
//                        System.out.println("useid"+tokens[1]+"movie score"+temps[1]);
//                        System.out.println("movie id"+movie.get(temps[0]));
                        double temp = 0;
                        if(movie.get(temps[0]) != null){
                            Double.parseDouble((String) movie.get(temps[0]));
                        }
                        
                        double score = Double.parseDouble(temps[1])+temp;
                        
                        v = new Text(tokens[1] + "," + score);

                    }
                    else{
                        k = new Text(key);
                        v = new Text(tokens[1] + "," + tokens[2]);
                        
                        
                    }
//                    System.out.println("key-->" + k);
//                    System.out.println("value-->" + v);
                    context.write(k, v);
                }
                
            }
            
            
            
//            System.out.println(key.toString() + ":");
//
//            Map<String, String> mapA = new HashMap<String, String>();
//            Map<String, String> mapB = new HashMap<String, String>();
//
//            for (Text line : values) {
//                String val = line.toString();
//                System.out.println(val);
//
//                if (val.startsWith("A:")) {
//                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
//                    mapA.put(kv[0], kv[1]);
//
//                } else if (val.startsWith("B:")) {
//                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
//                    mapB.put(kv[0], kv[1]);
//
//                }
//            }
//
//            double result = 0;
//            Iterator<String> iter = mapA.keySet().iterator();
//            while (iter.hasNext()) {
//                String mapk = iter.next();// itemID
//
//                int num = Integer.parseInt(mapA.get(mapk));
//                Iterator<String> iterb = mapB.keySet().iterator();
//                while (iterb.hasNext()) {
//                    String mapkb = iterb.next();// userID
//                    double pref = Double.parseDouble(mapB.get(mapkb));
//                    result = num * pref;// 矩陣乘法相乘計算
//
//                    Text k = new Text(mapkb);
//                    Text v = new Text(mapk + "," + result);
//                    context.write(k, v);
//                    System.out.println(k.toString() + "  " + v.toString());
//                }
//            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = Recommend.config();

        String input1 = path.get("Step4Input1");
        String input2 = path.get("Step4Input2");
        String output = path.get("Step4Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Step4.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step4.Step4Update_PartialMultiplyMapper.class);
        job.setReducerClass(Step4.Step4Update_AggregateReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));
        FileOutputFormat.setOutputPath(job, new Path(output));
        

        do{
            job.waitForCompletion(false);
        }while(!job.isComplete());
        
    }

}

//對最後的數據輸出格式作一遍規範。
public class Step5 { 
 
    public static class Step5_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> { 
 
 
 
        @Override 
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { 
//            System.out.println("run"); 
//            System.out.println("key--->" + key); 
            String[] tokens = Recommend.DELIMITER.split(values.toString()); 
            Text k = new Text(tokens[1]); 
            Text v; 
            if(Double.parseDouble(tokens[2]) == 0){ 
                v = new Text(tokens[0] + "::"); 
            } 
            else{ 
                v = new Text(tokens[0] + "::" + tokens[2]); 
            } 
            context.write(k, v); 
        } 
 
    } 
 
    public static class Step5_AggregateReducer extends Reducer<Text, Text, Text, Text> { 
 
        @Override 
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
            for (Text line : values) { 
                Text k = new Text(key.toString()); 
                context.write(k, line); 
            } 
        } 
    } 
 
    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { 
        JobConf conf = Recommend.config(); 
 
        String input = path.get("Step5Input"); 
        String output = path.get("Step5Output"); 
 
        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 
        hdfs.rmr(output); 
 
        Job job = new Job(conf); 
        job.setJarByClass(Step5.class); 
 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(Text.class); 
 
        job.setMapperClass(Step5.Step5_PartialMultiplyMapper.class); 
        job.setReducerClass(Step5.Step5_AggregateReducer.class); 
 
        job.setInputFormatClass(TextInputFormat.class); 
        job.setOutputFormatClass(ParseTextOutputFormat.class); 
 
        FileInputFormat.setInputPaths(job, new Path(input)); 
        FileOutputFormat.setOutputPath(job, new Path(output)); 
 
        do{ 
            job.waitForCompletion(false); 
        }while(!job.isComplete()); 
        System.out.println("---------------------end--------------------"); 
    } 
 
}
相關文章
相關標籤/搜索