屬性
|
類型
|
描述
|
mapred.job.id | String | 做業ID |
mapred.jar | String | 做業目錄中jar的位置 |
job.local.dir | String | 做業的本地空間 |
mapred.tip.id | String | 任務ID |
mapred.task.id | String | 任務重試ID |
mapred.task.is.map | Boolean | 標誌量,表示是否爲一個map任務 |
mapred.task.partition | Int | 做業內部的任務ID |
map.input.file | String | Mapper讀取的文件路徑 |
map.input.start | Long | 當前Mapper輸入分片的文件偏移量 |
map.input.length | Long | 當前Mapper輸入分片的字節數 |
mapred.work.output.dir | String | 任務的工做(即臨時)輸出目錄 |
1 import java.io.IOException; 2 import java.util.Iterator; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapred.FileInputFormat; 12 import org.apache.hadoop.mapred.FileOutputFormat; 13 import org.apache.hadoop.mapred.SequenceFileInputFormat; 14 import org.apache.hadoop.mapred.SequenceFileOutputFormat; 15 import org.apache.hadoop.mapred.KeyValueTextInputFormat; 16 import org.apache.hadoop.mapred.TextInputFormat; 17 import org.apache.hadoop.mapred.TextOutputFormat; 18 import org.apache.hadoop.mapred.JobClient; 19 import org.apache.hadoop.mapred.JobConf; 20 import org.apache.hadoop.mapred.MapReduceBase; 21 import org.apache.hadoop.mapred.Mapper; 22 import org.apache.hadoop.mapred.OutputCollector; 23 import org.apache.hadoop.mapred.Reducer; 24 import org.apache.hadoop.mapred.Reporter; 25 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; 26 import org.apache.hadoop.util.Tool; 27 import org.apache.hadoop.util.ToolRunner; 28 29 30 public class MultiFile extends Configured implements Tool { 31 32 public static class MapClass extends MapReduceBase 33 implements Mapper<LongWritable, Text, NullWritable, Text> { 34 35 public void map(LongWritable key, Text value, 36 OutputCollector<NullWritable, Text> output, 37 Reporter reporter) throws IOException { 38 39 output.collect(NullWritable.get(), value); 40 } 41 } 42 43 public static class PartitionByCountryMTOF 44 extends MultipleTextOutputFormat<NullWritable,Text> 45 { 46 protected String generateFileNameForKeyValue(NullWritable key, 47 Text value, 48 String inputfilename) 49 { 50 String[] arr = value.toString().split(",", -1); 51 String country = arr[4].substring(1,3); 52 return country+"/"+inputfilename; 53 } 54 } 55 56 public int run(String[] args) throws Exception { 57 // Configuration processed by ToolRunner 58 Configuration conf = getConf(); 59 60 // Create a JobConf using the processed conf 61 JobConf job = new JobConf(conf, MultiFile.class); 62 63 // Process custom command-line options 64 Path in = new Path(args[0]); 65 Path out = new Path(args[1]); 66 FileInputFormat.setInputPaths(job, in); 67 FileOutputFormat.setOutputPath(job, out); 68 69 // Specify various job-specific parameters 70 job.setJobName("MultiFile"); 71 job.setMapperClass(MapClass.class); 72 73 job.setInputFormat(TextInputFormat.class); 74 job.setOutputFormat(PartitionByCountryMTOF.class); 75 job.setOutputKeyClass(NullWritable.class); 76 job.setOutputValueClass(Text.class); 77 78 job.setNumReduceTasks(0); 79 80 // Submit the job, then poll for progress until the job is complete 81 JobClient.runJob(job); 82 83 return 0; 84 } 85 86 public static void main(String[] args) throws Exception { 87 // Let ToolRunner handle generic command-line options 88 int res = ToolRunner.run(new Configuration(), new MultiFile(), args); 89 90 System.exit(res); 91 } 92 }
1 import java.io.IOException; 2 import java.util.Iterator; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapred.FileInputFormat; 12 import org.apache.hadoop.mapred.FileOutputFormat; 13 import org.apache.hadoop.mapred.SequenceFileInputFormat; 14 import org.apache.hadoop.mapred.SequenceFileOutputFormat; 15 import org.apache.hadoop.mapred.KeyValueTextInputFormat; 16 import org.apache.hadoop.mapred.TextInputFormat; 17 import org.apache.hadoop.mapred.TextOutputFormat; 18 import org.apache.hadoop.mapred.JobClient; 19 import org.apache.hadoop.mapred.JobConf; 20 import org.apache.hadoop.mapred.MapReduceBase; 21 import org.apache.hadoop.mapred.Mapper; 22 import org.apache.hadoop.mapred.OutputCollector; 23 import org.apache.hadoop.mapred.Reducer; 24 import org.apache.hadoop.mapred.Reporter; 25 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; 26 import org.apache.hadoop.mapred.lib.MultipleOutputs; 27 import org.apache.hadoop.util.Tool; 28 import org.apache.hadoop.util.ToolRunner; 29 30 31 public class MultiFile extends Configured implements Tool { 32 33 public static class MapClass extends MapReduceBase 34 implements Mapper<LongWritable, Text, NullWritable, Text> { 35 36 private MultipleOutputs mos; 37 private OutputCollector<NullWritable, Text> collector; 38 39 public void configure(JobConf conf) { 40 mos = new MultipleOutputs(conf); 41 } 42 43 public void map(LongWritable key, Text value, 44 OutputCollector<NullWritable, Text> output, 45 Reporter reporter) throws IOException { 46 47 String[] arr = value.toString().split(",", -1); 48 String chrono = arr[0] + "," + arr[1] + "," + arr[2]; 49 String geo = arr[0] + "," + arr[4] + "," + arr[5]; 50 51 collector = mos.getCollector("chrono", reporter); 52 collector.collect(NullWritable.get(), new Text(chrono)); 53 collector = mos.getCollector("geo", reporter); 54 collector.collect(NullWritable.get(), new Text(geo)); 55 } 56 57 public void close() throws IOException { 58 mos.close(); 59 } 60 } 61 62 public int run(String[] args) throws Exception { 63 // Configuration processed by ToolRunner 64 Configuration conf = getConf(); 65 66 // Create a JobConf using the processed conf 67 JobConf job = new JobConf(conf, MultiFile.class); 68 69 // Process custom command-line options 70 Path in = new Path(args[0]); 71 Path out = new Path(args[1]); 72 FileInputFormat.setInputPaths(job, in); 73 FileOutputFormat.setOutputPath(job, out); 74 75 // Specify various job-specific parameters 76 job.setJobName("MultiFile"); 77 job.setMapperClass(MapClass.class); 78 79 job.setInputFormat(TextInputFormat.class); 80 // job.setOutputFormat(PartitionByCountryMTOF.class); 81 job.setOutputKeyClass(NullWritable.class); 82 job.setOutputValueClass(Text.class); 83 job.setNumReduceTasks(0); 84 85 MultipleOutputs.addNamedOutput(job, 86 "chrono", 87 TextOutputFormat.class, 88 NullWritable.class, 89 Text.class); 90 MultipleOutputs.addNamedOutput(job, 91 "geo", 92 TextOutputFormat.class, 93 NullWritable.class, 94 Text.class); 95 96 // Submit the job, then poll for progress until the job is complete 97 JobClient.runJob(job); 98 99 return 0; 100 } 101 102 public static void main(String[] args) throws Exception { 103 // Let ToolRunner handle generic command-line options 104 int res = ToolRunner.run(new Configuration(), new MultiFile(), args); 105 106 System.exit(res); 107 } 108 }