hadoop2 自定義OutputFormat場景雜記

提示:如下代碼都是在 Hadoop2.7.x 最新API下進行。java

場景1:自定義輸出文件名前綴

示例:計算學生的平均分紅績,輸出:學生姓名和平均分紅績;要求:根據成績的範圍(0~59, 60~70, 70~80, 80~90, 90~100)輸出到不一樣的文件中,文件名前綴爲:student_score_05九、student_score_6070、student_score_7080、student_score_8090、student_score_90100。ide

方法1(最簡單,推薦):Override MultipleTextOutputFormatgenerateFileNameForKeyValue()方法。

// 重載 MultipleTextOutputFormat 的 generateFileNameForKeyValue()方法來實現
public class PartitionScoreOutputFormat extends MultipleTextOutputFormat<Text, IntWritable>
{
	private static final String PREFIX = "student_score_";
	
	@Override
	protected String generateFileNameForKeyValue(Text key, IntWritable value, String name)
	{
		int score = value.get();
		if(score < 60) {
			return PREFIX + "059";
		}
		if(score < 70) {
			return PREFIX + "6070";
		}
		if(score < 80) {
			return PREFIX + "7080";
		}
		if(score < 90) {
			return PREFIX + "8090";
		}
		
		return PREFIX + "90100";
	}
}

// 調用
job.setOutputFormat(PartitionScoreOutputFormat.class)

方法2:使用 MultipleOutputs.addNamedOutput() 方法

public class StudentScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
	// 使用 MultipleOutputs
	private MultipleOutputs<Text, IntWritable> mos;
	
	@Override
	protected void setup(Context context) throws ...
	{
		super.setup(context);
		mos = new MultipleOutputs<Text,IntWritable>(context);
	}
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws ...
	{
		int totalScore = 0;
		int count = 0;
		
		for(IntWritable score : values) {
			totalScore += score.get();
			count++;
		}
		
		int avgScore = count > 0 ? totalScore/count : 0;
		
		// 使用 named output,對應在 Task 中的 MultipleOutputs.addNamedOutput(...) 定義
		mos.write(getNamed(avgScore), key, new IntWritable(avgScore));
		
	}
	
	@Override
	protected void cleanup(...) 
	{
		// ...
		mos.close();
	}
	
	private static String getNamed(int score) 
	{
		if(score < 60) {
			return "score059";
		}
		if(score < 70) {
			return "score6070";
		}
		if(score < 80) {
			return "score7080";
		}
		if(score < 90) {
			return "score8090";
		}
		return "score90100";
	}
	
}
public class StudentScoreTask
{
	public static void main(String[] args) throws ...
	{
		Job job = Job.getInstance(new Configuration());
		job.setJobName("...");
		
		// ...
		// 重要:定義命名輸出規則: 
		// 第二個參數:score059 等名字要和 Reducer中的 MultipleOutputs.write(namedParam, ...) 命名一致
		MultipleOutputs.addNamedOutput(job, "score059", TextOutputFormat.class, Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "score6070", ...);
		MultipleOutputs.addNamedOutput(job, "score7080", ...);
		MultipleOutputs.addNamedOutput(job, "score8090", ...);
		MultipleOutputs.addNamedOutput(job, "score90100", ...);
		
		// ...
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
}

場景2:徹底自定義輸出的文件名和後綴

示例:計算學生的平均分紅績,將學生姓名和成績輸出到文件中,文件名格式爲:student_score_${yyyyMMdd}_${taskId}.txt(student_score_20160930_0.txt)oop

方法:因爲是輸出到文本文件,所以 Override TextOutputFormat 的 getDefaultWorkFile() 方法便可:

public class StudentScoreOutputFormat<K,V> extends TextOutputFormcat<K,V>
{
	@Override
	public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException
	{
		FileOutputCommitter comitter = (FileOutputCommitter) super.getOutputCommitter(context);
		
		return new Path(committer.getWorkPath(), generateFileName(context));
	}
	
	public synchronized static String generateFileName(TaskAttemptContext context)
	{
		TaskID taskId = context.getTaskAttemptID().getTaskID();
		int partition = taskId.getId();
		
		String currentDate = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime());
		
		return String.format("student_score_%s_%d.txt", currentDate, partition);
	}
}

// 使用方式
Job job = Job.getInstance(...);
//...

// 將輸出文件格式化類指定爲自定義的 StudentScoreOutputFormat 便可
job.setOutputFormatClass(StudentScoreOutputFormat.class);

// 若是不想生成空文件 part-r-xxxx 等,使用 LazyOutputFormat 設置替代上面的設置便可
// LazyOutputFormat.setOutputFormatClass(job, StudentScoreOutputFormat.class);

場景3:修改默認一個Task一個文件輸出,將全部輸出合併到一個文件中,該文件採用固定size進行分割爲多個文件(相似日誌文件輸出,好比當文件size達到1G時,自動生成第二個文件,後續輸出到第二個文件中,以此類推)。

 

================== 未完待續,後面會持續補充遇到的特殊文件輸出要求,更歡迎你們提供~~~spa

相關文章
相關標籤/搜索