倒排索引的 MR 實現 demo

 

InverseIndexStepOnejava

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.hadoop.mr.flowsort.SortMR;
import cn.itcast.hadoop.mr.flowsort.SortMR.SortMapper;
import cn.itcast.hadoop.mr.flowsort.SortMR.SortReducer;
import cn.itcast.hadoop.mr.flowsum.FlowBean;

/**
 * 倒排索引步驟 ——job
 * 
 * @author duanhaitao@itcast.cn
 *
 */
public class InverseIndexStepOne {

	public static class StepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			// 拿到一行數據
			String line = value.toString();
			//切分出各個單詞
			String[] fields = StringUtils.split(line, " ");

			// 獲取這一行數據所在的文件切片
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			// 從文件切片中獲取文件名
			String fileName = inputSplit.getPath().getName();

			for (String field : fields) {

				// 封裝 kv 輸出,k: hello-->a.txt               v: 1
				context.write(new Text(field + "-->" + fileName), new LongWritable(1));

			}

		}

	}

	public static class StepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

		// <hello-->a.txt,{1,1,1....}>
		@Override
		protected void reduce(Text key, Iterable<LongWritable> values, Context context)
				throws IOException, InterruptedException {

			long counter = 0;
			for (LongWritable value : values) {

				counter += value.get();

			}

			context.write(key, new LongWritable(counter));
		}

	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(InverseIndexStepOne.class);

		job.setMapperClass(StepOneMapper.class);
		job.setReducerClass(StepOneReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		//檢查一下,參數所指定的輸出路徑是否存在,若是已經存在,先刪除
		Path output = new Path(args[1]);
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(output)) {
			fs.delete(output, true);
		}

		FileOutputFormat.setOutputPath(job, output);

		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}

打 jar 包,先 put 上去。apache

一頓操做:30.00app

 

InverseIndexStepTwoide

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;

import cn.itcast.hadoop.mr.ii.InverseIndexStepOne.StepOneMapper;
import cn.itcast.hadoop.mr.ii.InverseIndexStepOne.StepOneReducer;

public class InverseIndexStepTwo {

	
public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
		
	
	    //k: 行起偏移量    v:  {hello-->a.txt   3} 
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String line = value.toString();
			
			String[] fields = StringUtils.split(line, "\t");
			String[] wordAndfileName = StringUtils.split(fields[0], "-->");
			
			String word = wordAndfileName[0];
			String fileName = wordAndfileName[1];
			long count = Long.parseLong(fields[1]);
			
			
			context.write(new Text(word), new Text(fileName+"-->"+count));		
			//map輸出的結果是這個形式: <hello,a.txt-->3>
			
		}
}


	public static class StepTwoReducer extends Reducer<Text, Text,Text, Text>{
		
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {

			//拿到的數據  <hello,{a.txt-->3,b.txt-->2,c.txt-->1}>
			
			String result = "";
			
			for(Text value:values){
				
				result += value + " ";
			}
			
			context.write(key, new Text(result));
			//輸出的結果  k: hello   v: a.txt-->3  b.txt-->2  c.txt-->1  
			
		}
		
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();	
		
		//構造job_one
//		Job job_one = Job.getInstance(conf);
//		
//		job_one.setJarByClass(InverseIndexStepTwo.class);
//		job_one.setMapperClass(StepOneMapper.class);
//		job_one.setReducerClass(StepOneReducer.class);
		//......
		
		
		//構造job_two
		Job job_tow = Job.getInstance(conf);
		
		job_tow.setJarByClass(InverseIndexStepTwo.class);
		
		job_tow.setMapperClass(StepTwoMapper.class);
		job_tow.setReducerClass(StepTwoReducer.class);
		
		job_tow.setOutputKeyClass(Text.class);
		job_tow.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job_tow, new Path(args[0]));
		
		//檢查一下參數所指定的路徑是否存在,若是已存在,先刪除
		Path output = new Path(args[1]);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(output)){
			fs.delete(output, true);
		}
		
		FileOutputFormat.setOutputPath(job_tow, output);
		
		
		//先提交 job_oneִ 執行
//		boolean one_result = job_one.waitForCompletion(true);
//		if(one_result){
		System.exit(job_tow.waitForCompletion(true)?0:1);
//		}
		
	}

}

相關文章
相關標籤/搜索