InverseIndexStepOnejava
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.itcast.hadoop.mr.flowsort.SortMR; import cn.itcast.hadoop.mr.flowsort.SortMR.SortMapper; import cn.itcast.hadoop.mr.flowsort.SortMR.SortReducer; import cn.itcast.hadoop.mr.flowsum.FlowBean; /** * 倒排索引步驟 ——job * * @author duanhaitao@itcast.cn * */ public class InverseIndexStepOne { public static class StepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿到一行數據 String line = value.toString(); //切分出各個單詞 String[] fields = StringUtils.split(line, " "); // 獲取這一行數據所在的文件切片 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 從文件切片中獲取文件名 String fileName = inputSplit.getPath().getName(); for (String field : fields) { // 封裝 kv 輸出,k: hello-->a.txt v: 1 context.write(new Text(field + "-->" + fileName), new LongWritable(1)); } } } public static class StepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable> { // <hello-->a.txt,{1,1,1....}> @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long counter = 0; for (LongWritable value : values) { counter += value.get(); } context.write(key, new LongWritable(counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InverseIndexStepOne.class); job.setMapperClass(StepOneMapper.class); job.setReducerClass(StepOneReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //檢查一下,參數所指定的輸出路徑是否存在,若是已經存在,先刪除 Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
打 jar 包,先 put 上去。apache
一頓操做:30.00app
InverseIndexStepTwoide
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Reducer; import cn.itcast.hadoop.mr.ii.InverseIndexStepOne.StepOneMapper; import cn.itcast.hadoop.mr.ii.InverseIndexStepOne.StepOneReducer; public class InverseIndexStepTwo { public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{ //k: 行起偏移量 v: {hello-->a.txt 3} @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); String[] wordAndfileName = StringUtils.split(fields[0], "-->"); String word = wordAndfileName[0]; String fileName = wordAndfileName[1]; long count = Long.parseLong(fields[1]); context.write(new Text(word), new Text(fileName+"-->"+count)); //map輸出的結果是這個形式: <hello,a.txt-->3> } } public static class StepTwoReducer extends Reducer<Text, Text,Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { //拿到的數據 <hello,{a.txt-->3,b.txt-->2,c.txt-->1}> String result = ""; for(Text value:values){ result += value + " "; } context.write(key, new Text(result)); //輸出的結果 k: hello v: a.txt-->3 b.txt-->2 c.txt-->1 } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //構造job_one // Job job_one = Job.getInstance(conf); // // job_one.setJarByClass(InverseIndexStepTwo.class); // job_one.setMapperClass(StepOneMapper.class); // job_one.setReducerClass(StepOneReducer.class); //...... //構造job_two Job job_tow = Job.getInstance(conf); job_tow.setJarByClass(InverseIndexStepTwo.class); job_tow.setMapperClass(StepTwoMapper.class); job_tow.setReducerClass(StepTwoReducer.class); job_tow.setOutputKeyClass(Text.class); job_tow.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job_tow, new Path(args[0])); //檢查一下參數所指定的路徑是否存在,若是已存在,先刪除 Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job_tow, output); //先提交 job_oneִ 執行 // boolean one_result = job_one.waitForCompletion(true); // if(one_result){ System.exit(job_tow.waitForCompletion(true)?0:1); // } } }