1 import java.io.IOException; 2 import java.util.Iterator; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.FileInputFormat; 9 import org.apache.hadoop.mapred.FileOutputFormat; 10 import org.apache.hadoop.mapred.JobClient; 11 import org.apache.hadoop.mapred.JobConf; 12 import org.apache.hadoop.mapred.KeyValueTextInputFormat; 13 import org.apache.hadoop.mapred.MapReduceBase; 14 import org.apache.hadoop.mapred.Mapper; 15 import org.apache.hadoop.mapred.OutputCollector; 16 import org.apache.hadoop.mapred.Reducer; 17 import org.apache.hadoop.mapred.Reporter; 18 import org.apache.hadoop.mapred.TextOutputFormat; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21 22 public class MyJob extends Configured implements Tool { 23 24 public static class MapClass extends MapReduceBase 25 implements Mapper<Text, Text, Text, Text> { 26 27 public void map(Text key, Text value, 28 OutputCollector<Text, Text> output, 29 Reporter reporter) throws IOException { 30 31 output.collect(value, key); 32 } 33 } 34 35 public static class Reduce extends MapReduceBase 36 implements Reducer<Text, Text, Text, Text> { 37 38 public void reduce(Text key, Iterator<Text> values, 39 OutputCollector<Text, Text> output, 40 Reporter reporter) throws IOException { 41 42 String csv = ""; 43 while (values.hasNext()) { 44 if (csv.length() > 0) csv += ","; 45 csv += values.next().toString(); 46 } 47 output.collect(key, new Text(csv)); 48 } 49 } 50 51 public int run(String[] args) throws Exception { 52 Configuration conf = getConf(); 53 54 JobConf job = new JobConf(conf, MyJob.class); 55 56 Path in = new Path(args[0]); 57 Path out = new Path(args[1]); 58 FileInputFormat.setInputPaths(job, in); 59 FileOutputFormat.setOutputPath(job, out); 60 61 job.setJobName("MyJob"); 62 job.setMapperClass(MapClass.class); 63 job.setReducerClass(Reduce.class); 64 65 job.setInputFormat(KeyValueTextInputFormat.class); 66 job.setOutputFormat(TextOutputFormat.class); 67 job.setOutputKeyClass(Text.class); 68 job.setOutputValueClass(Text.class); 69 job.set("key.value.separator.in.input.line", ","); 70 71 JobClient.runJob(job); 72 73 return 0; 74 } 75 76 public static void main(String[] args) throws Exception { 77 int res = ToolRunner.run(new Configuration(), new MyJob(), args); 78 79 System.exit(res); 80 } 81 }
選項
|
描述
|
-conf <configuration file> | 指定一個配置文件 |
-D <property=value> | 給JobConf屬性賦值 |
-fs <local | namenode:port> | 指定一個NameNode,能夠是「local」 |
-jt <local | jobtracker:port> | 指定一個JobTracker |
-files <list of files> | 指定一個以逗號分隔的文件列表,用於MapReduce做業。這些文件自動地分佈到全部節點,使之可從本地獲取 |
-libjars <list of jars> | 指定一個以逗號分隔的jar文件列表,使之包含在全部任務JVM的classpath中 |
-archives <list of archives> | 指定一個以逗號分隔的存檔文件列表,使之能夠在全部任務節點上打開 |
1 import java.io.IOException; 2 import java.util.Iterator; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapred.FileInputFormat; 10 import org.apache.hadoop.mapred.FileOutputFormat; 11 import org.apache.hadoop.mapred.JobClient; 12 import org.apache.hadoop.mapred.JobConf; 13 import org.apache.hadoop.mapred.KeyValueTextInputFormat; 14 import org.apache.hadoop.mapred.MapReduceBase; 15 import org.apache.hadoop.mapred.Mapper; 16 import org.apache.hadoop.mapred.OutputCollector; 17 import org.apache.hadoop.mapred.Reducer; 18 import org.apache.hadoop.mapred.Reporter; 19 import org.apache.hadoop.mapred.TextOutputFormat; 20 import org.apache.hadoop.util.Tool; 21 import org.apache.hadoop.util.ToolRunner; 22 23 public class CitationHistogram extends Configured implements Tool { 24 25 public static class MapClass extends MapReduceBase 26 implements Mapper<Text, Text, IntWritable, IntWritable> { 27 28 private final static IntWritable uno = new IntWritable(1); 29 private IntWritable citationCount = new IntWritable(); 30 31 public void map(Text key, Text value, 32 OutputCollector<IntWritable, IntWritable> output, 33 Reporter reporter) throws IOException { 34 35 citationCount.set(Integer.parseInt(value.toString())); 36 output.collect(citationCount, uno); 37 } 38 } 39 40 public static class Reduce extends MapReduceBase 41 implements Reducer<IntWritable,IntWritable,IntWritable,IntWritable> 42 { 43 44 public void reduce(IntWritable key, Iterator<IntWritable> values, 45 OutputCollector<IntWritable, IntWritable>output, 46 Reporter reporter) throws IOException { 47 48 int count = 0; 49 while (values.hasNext()) { 50 count += values.next().get(); 51 } 52 output.collect(key, new IntWritable(count)); 53 } 54 } 55 56 public int run(String[] args) throws Exception { 57 Configuration conf = getConf(); 58 59 JobConf job = new JobConf(conf, CitationHistogram.class); 60 61 Path in = new Path(args[0]); 62 Path out = new Path(args[1]); 63 FileInputFormat.setInputPaths(job, in); 64 FileOutputFormat.setOutputPath(job, out); 65 66 job.setJobName("CitationHistogram"); 67 job.setMapperClass(MapClass.class); 68 job.setReducerClass(Reduce.class); 69 70 job.setInputFormat(KeyValueTextInputFormat.class); 71 job.setOutputFormat(TextOutputFormat.class); 72 job.setOutputKeyClass(IntWritable.class); 73 job.setOutputValueClass(IntWritable.class); 74 75 JobClient.runJob(job); 76 77 return 0; 78 } 79 80 public static void main(String[] args) throws Exception { 81 int res = ToolRunner.run(new Configuration(), 82 new CitationHistogram(), 83 args); 84 85 System.exit(res); 86 } 87 }
1 import java.io.IOException; 2 import java.util.Iterable; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.Tool; 17 import org.apache.hadoop.util.ToolRunner; 18 19 public class MyJob extends Configured implements Tool { 20 21 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 22 23 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 24 25 String[] citation = value.toString().split(","); 26 context.write(new Text(citation[1]), new Text(citation[0])); 27 } 28 } 29 30 public static class Reduce extends Reducer<Text, Text, Text, Text> { 31 32 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 33 34 String csv = ""; 35 for (Text val:values) { //Iterable類型容許foreach循環 36 if (csv.length() > 0) csv += ","; 37 csv += val.toString(); 38 } 39 40 context.write(key, new Text(csv)); 41 } 42 } 43 44 public int run(String[] args) throws Exception { 45 Configuration conf = getConf(); 46 47 Job job = new Job(conf, "MyJob"); 48 job.setJarByClass(MyJob.class); 49 50 Path in = new Path(args[0]); 51 Path out = new Path(args[1]); 52 FileInputFormat.setInputPaths(job, in); 53 FileOutputFormat.setOutputPath(job, out); 54 55 job.setMapperClass(MapClass.class); 56 job.setReducerClass(Reduce.class); 57 58 job.setInputFormatClass(TextInputFormat.class); //兼容的InputFormat類 59 job.setOutputFormatClass(TextOutputFormat.class); 60 job.setOutputKeyClass(Text.class); 61 job.setOutputValueClass(Text.class); 62 63 System.exit(job.waitForCompletion(true)?0:1); 64 65 return 0; 66 } 67 68 public static void main(String[] args) throws Exception { 69 int res = ToolRunner.run(new Configuration(), new MyJob(), args); 70 71 System.exit(res); 72 } 73 }