自定義 hadoop MapReduce InputFormat 切分輸入文件

在上一篇中,咱們實現了按 cookieId 和 time 進行二次排序,如今又有新問題:假如我須要按 cookieId 和 cookieId&time 的組合進行分析呢?此時最好的辦法是自定義 InputFormat,讓 mapreduce 一次讀取一個 cookieId 下的全部記錄,而後再按 time 進行切分 session,邏輯僞碼以下: php

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某個 cookieId 下的全部記錄 java

    for session in OneSplit // session 是按 time 把 OneSplit 進行了二次分割 apache

        for line in session // line 是 session 中的每條記錄,對應原始日誌的某條記錄 編程

一、原理: cookie

InputFormat是MapReduce中一個很經常使用的概念,它在程序的運行中到底起到了什麼做用呢?
InputFormat實際上是一個接口,包含了兩個方法:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
session

  RecordReader<K, V> createRecordReader(InputSplit split,  app

                                  TaskAttemptContext context)  throws IOException; 框架

}

這兩個方法有分別完成着如下工做:
      方法 getSplits 將輸入數據切分紅splits,splits的個數即爲map tasks的個數,splits的大小默認爲塊大小,即64M
     方法 getRecordReader 將每一個 split  解析成records, 再依次將record解析成<K,V>對
也就是說 InputFormat完成如下工做:
 InputFile -->  splits  -->  <K,V>

系統經常使用的  InputFormat 又有哪些呢?
                      
其中Text InputFormat即是最經常使用的,它的 <K,V>就表明 <行偏移,該行內容>

然而系統所提供的這幾種固定的將  InputFile轉換爲 <K,V>的方式有時候並不能知足咱們的需求:
此時須要咱們自定義   InputFormat ,從而使Hadoop框架按照咱們預設的方式來將
InputFile解析爲<K,V>
在領會自定義   InputFormat 以前,須要弄懂一下幾個抽象類、接口及其之間的關係:

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader (interface), Line RecordReader(class)的關係
      FileInputFormat implements  InputFormat
      TextInputFormat extends  FileInputFormat
      TextInputFormat.get RecordReader calls  Line RecordReader
      Line RecordReader  implements  RecordReader

對於InputFormat接口,上面已經有詳細的描述
再看看 FileInputFormat,它實現了 InputFormat接口中的 getSplits方法,而將 getRecordReader與isSplitable留給具體類(如 TextInputFormat )實現, isSplitable方法一般不用修改,因此只須要在自定義的 InputFormat中實現
getRecordReader方法便可,而該方法的核心是調用 Line RecordReader(即由LineRecorderReader類來實現 " 將每一個s plit解析成records, 再依次將record解析成<K,V>對" ),該方法實現了接口RecordReader

  public interface RecordReader<K, V> {
  boolean   next(K key, V value) throws IOException;
  K   createKey();
  V   createValue();
  long   getPos() throws IOException;
  public void   close() throws IOException;
  float   getProgress() throws IOException;
}

     所以自定義InputFormat的核心是自定義一個實現接口RecordReader相似於LineRecordReader的類,該類的核心也正是重寫接口RecordReader中的幾大方法,
     定義一個InputFormat的核心是定義一個相似於LineRecordReader的,本身的RecordReader


二、代碼:

package MyInputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {

	@SuppressWarnings("deprecation")
	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		return new TrackRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}

}

package MyInputFormat;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
	private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);

	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一條記錄的分隔符
	private byte[] separator = "END\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}

			LOG.info("Skipped line of size " + newSize + " at pos "
					+ (pos - newSize));
		}
		if (newSize == 0) {
			key = null;
			value = null;
			return false;
		} else {
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() {
		return key;
	}

	@Override
	public Text getCurrentValue() {
		return value;
	}

	/**
	 * Get the progress within the split
	 */
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	public synchronized void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

	public class NewLineReader {
		private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
		private int bufferSize = DEFAULT_BUFFER_SIZE;
		private InputStream in;
		private byte[] buffer;
		private int bufferLength = 0;
		private int bufferPosn = 0;

		public NewLineReader(InputStream in) {
			this(in, DEFAULT_BUFFER_SIZE);
		}

		public NewLineReader(InputStream in, int bufferSize) {
			this.in = in;
			this.bufferSize = bufferSize;
			this.buffer = new byte[this.bufferSize];
		}

		public NewLineReader(InputStream in, Configuration conf)
				throws IOException {
			this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
		}

		public void close() throws IOException {
			in.close();
		}

		public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
				throws IOException {
			str.clear();
			Text record = new Text();
			int txtLength = 0;
			long bytesConsumed = 0L;
			boolean newline = false;
			int sepPosn = 0;
			do {
				// 已經讀到buffer的末尾了,讀下一個buffer
				if (this.bufferPosn >= this.bufferLength) {
					bufferPosn = 0;
					bufferLength = in.read(buffer);
					// 讀到文件末尾了,則跳出,進行下一個文件的讀取
					if (bufferLength <= 0) {
						break;
					}
				}
				int startPosn = this.bufferPosn;
				for (; bufferPosn < bufferLength; bufferPosn++) {
					// 處理上一個buffer的尾巴被切成了兩半的分隔符(若是分隔符中重複字符過多在這裏會有問題)
					if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
						sepPosn = 0;
					}
					// 遇到行分隔符的第一個字符
					if (buffer[bufferPosn] == separator[sepPosn]) {
						bufferPosn++;
						int i = 0;
						// 判斷接下來的字符是否也是行分隔符中的字符
						for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
							// buffer的最後恰好是分隔符,且分隔符被不幸地切成了兩半
							if (bufferPosn + i >= bufferLength) {
								bufferPosn += i - 1;
								break;
							}
							// 一旦其中有一個字符不相同,就斷定爲不是分隔符
							if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
								sepPosn = 0;
								break;
							}
						}
						// 的確遇到了行分隔符
						if (sepPosn == separator.length) {
							bufferPosn += i;
							newline = true;
							sepPosn = 0;
							break;
						}
					}
				}
				int readLength = this.bufferPosn - startPosn;
				bytesConsumed += readLength;
				// 行分隔符不放入塊中
				if (readLength > maxLineLength - txtLength) {
					readLength = maxLineLength - txtLength;
				}
				if (readLength > 0) {
					record.append(this.buffer, startPosn, readLength);
					txtLength += readLength;
					// 去掉記錄的分隔符
					if (newline) {
						str.set(record.getBytes(), 0, record.getLength()
								- separator.length);
					}
				}
			} while (!newline && (bytesConsumed < maxBytesToConsume));
			if (bytesConsumed > (long) Integer.MAX_VALUE) {
				throw new IOException("Too many bytes before newline: "
						+ bytesConsumed);
			}

			return (int) bytesConsumed;
		}

		public int readLine(Text str, int maxLineLength) throws IOException {
			return readLine(str, maxLineLength, Integer.MAX_VALUE);
		}

		public int readLine(Text str) throws IOException {
			return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
		}
	}
}

package MyInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TestMyInputFormat {

	public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {

		public void map(LongWritable key, Text value, Context context) throws IOException,
				InterruptedException {
			System.out.println("key:\t " + key);
			System.out.println("value:\t " + value);
			System.out.println("-------------------------");
		}
	}

	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		 Path outPath = new Path("/hive/11");
		 FileSystem.get(conf).delete(outPath, true);
		Job job = new Job(conf, "TestMyInputFormat");
		job.setInputFormatClass(TrackInputFormat.class);
		job.setJarByClass(TestMyInputFormat.class);
		job.setMapperClass(TestMyInputFormat.MapperClass.class);
		job.setNumReduceTasks(0);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

三、測試數據: ide

  cookieId    time     url                 cookieOverFlag oop

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END

四、結果:

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

REF:

自定義hadoop map/reduce輸入文件切割InputFormat

http://hi.baidu.com/lzpsky/item/0d9d84c05afb43ba0c0a7b27

MapReduce高級編程之自定義InputFormat

http://datamining.xmu.edu.cn/bbs/home.php?mod=space&uid=91&do=blog&id=190

http://irwenqiang.iteye.com/blog/1448164

相關文章
相關標籤/搜索