提示:如下代碼都是在 Hadoop2.7.x 最新API下進行。java
示例:計算學生的平均分紅績,輸出:學生姓名和平均分紅績;要求:根據成績的範圍(0~59, 60~70, 70~80, 80~90, 90~100)輸出到不一樣的文件中,文件名前綴爲:student_score_05九、student_score_6070、student_score_7080、student_score_8090、student_score_90100。ide
// 重載 MultipleTextOutputFormat 的 generateFileNameForKeyValue()方法來實現 public class PartitionScoreOutputFormat extends MultipleTextOutputFormat<Text, IntWritable> { private static final String PREFIX = "student_score_"; @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { int score = value.get(); if(score < 60) { return PREFIX + "059"; } if(score < 70) { return PREFIX + "6070"; } if(score < 80) { return PREFIX + "7080"; } if(score < 90) { return PREFIX + "8090"; } return PREFIX + "90100"; } } // 調用 job.setOutputFormat(PartitionScoreOutputFormat.class)
public class StudentScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 使用 MultipleOutputs private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Context context) throws ... { super.setup(context); mos = new MultipleOutputs<Text,IntWritable>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws ... { int totalScore = 0; int count = 0; for(IntWritable score : values) { totalScore += score.get(); count++; } int avgScore = count > 0 ? totalScore/count : 0; // 使用 named output,對應在 Task 中的 MultipleOutputs.addNamedOutput(...) 定義 mos.write(getNamed(avgScore), key, new IntWritable(avgScore)); } @Override protected void cleanup(...) { // ... mos.close(); } private static String getNamed(int score) { if(score < 60) { return "score059"; } if(score < 70) { return "score6070"; } if(score < 80) { return "score7080"; } if(score < 90) { return "score8090"; } return "score90100"; } }
public class StudentScoreTask { public static void main(String[] args) throws ... { Job job = Job.getInstance(new Configuration()); job.setJobName("..."); // ... // 重要:定義命名輸出規則: // 第二個參數:score059 等名字要和 Reducer中的 MultipleOutputs.write(namedParam, ...) 命名一致 MultipleOutputs.addNamedOutput(job, "score059", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "score6070", ...); MultipleOutputs.addNamedOutput(job, "score7080", ...); MultipleOutputs.addNamedOutput(job, "score8090", ...); MultipleOutputs.addNamedOutput(job, "score90100", ...); // ... System.exit(job.waitForCompletion(true) ? 0 : 1); } }
示例:計算學生的平均分紅績,將學生姓名和成績輸出到文件中,文件名格式爲:student_score_${yyyyMMdd}_${taskId}.txt(student_score_20160930_0.txt)oop
public class StudentScoreOutputFormat<K,V> extends TextOutputFormcat<K,V> { @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { FileOutputCommitter comitter = (FileOutputCommitter) super.getOutputCommitter(context); return new Path(committer.getWorkPath(), generateFileName(context)); } public synchronized static String generateFileName(TaskAttemptContext context) { TaskID taskId = context.getTaskAttemptID().getTaskID(); int partition = taskId.getId(); String currentDate = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime()); return String.format("student_score_%s_%d.txt", currentDate, partition); } } // 使用方式 Job job = Job.getInstance(...); //... // 將輸出文件格式化類指定爲自定義的 StudentScoreOutputFormat 便可 job.setOutputFormatClass(StudentScoreOutputFormat.class); // 若是不想生成空文件 part-r-xxxx 等,使用 LazyOutputFormat 設置替代上面的設置便可 // LazyOutputFormat.setOutputFormatClass(job, StudentScoreOutputFormat.class);
================== 未完待續,後面會持續補充遇到的特殊文件輸出要求,更歡迎你們提供~~~spa