【大數據系列】MapReduce示例好友推薦

package org.slp;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by sanglp on 2017/7/17.
 */
public class Test2Mapper extends Mapper<LongWritable ,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //super.map(key, value, context);
        String line = value.toString();//一行數據表明一組好友關係
        String[] ss = line.split("\t");
        context.write(new Text(ss[0]),new Text(ss[1]));//主從分紅兩行輸出
        context.write(new Text(ss[1]),new Text(ss[0]));

    }
}
package org.slp;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by sanglp on 2017/7/17.
 */
public class Test2Reduce extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //super.reduce(key, values, context);
        Set<String> set = new HashSet<String>();
        for(Text t :values ){
            set.add(t.toString());
        }
        if (set.size()>1){
            for(Iterator j = set.iterator();j.hasNext();){
                String name = (String)j.next();
                for(Iterator k = set.iterator();k.hasNext();){
                    String other = (String)k.next();
                    if(!name.equals(other)){
                        context.write(new Text(name),new Text(other));
                    }
                }
            }
        }
    }
}
package org.slp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * Created by sanglp on 2017/7/17.
 */
public class JobRun2 {

    public static void main(String[] args){
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker","node1:9001");
        conf.set("mapred.job.tracker","node1:9001");
        conf.set("mapred.jar","C:\\Users\\sanglp\\qq.jar");
        try {
            Job job = new Job(conf);
            job.setJobName("qq");
            job.setJarByClass(JobRun2.class);
            job.setMapperClass(Test2Mapper.class);
            job.setReducerClass(Test2Reduce.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setNumReduceTasks(1);//設置reduce任務的個數
            //mapreduce輸入數據所在目錄或文件
            FileInputFormat.addInputPath(job,new Path("/usr/input/qq"));
            //mr執行以後的輸出數據的目錄
            FileOutputFormat.setOutputPath(job,new Path("/usr/out/qq"));
            try {
                System.exit(job.waitForCompletion(true)?0:1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

文件內容例如:java

小明  小李node

小花  小白apache

相關文章
相關標籤/搜索