Hadoop(十):簡單瞭解Hadoop數據類型,輸入輸出格式及用戶如何自定義。

一:Hadoop內置的數據類型。java

    Hadoop提供以下內置的數據類型,這些數據類型都實現了WritableComparable接口,以便用這些類型定義的數據能夠被序列化進行網絡傳輸和文件存儲,以及進行大小比較。apache

BooleanWritable 標準布爾型數值
ByteWritable 單字節數值
DoubleWritable 雙字節數
FloatWritable 浮點數
IntWritable 整型數
LongWritable 長整型數
Text 使用UTF-8格式存儲的文本
NullWritable 當<key,value>中的key或value爲空時使用
//簡單知道這些類型
IntWritable iw = new IntWritable(1);
System.out.println(  iw.get() );  // 1 
	
BooleanWritable bw = new BooleanWritable(true);
System.out.println(  bw.get() );  // true

二:Hadoop-用戶自定義的數據類型。網絡

    自定義數據類型時,需知足兩個基本要求,即app

        1.實現Writable接口,以便該數據能被序列化後完成網絡傳輸或文件輸入/輸出。框架

        2.若是該數據須要做爲主鍵key使用,或須要比較數值大小時,則須要實現WritableComparable接口。ide

//Hadoop2.6.4版 - Writable源碼:
public interface Writable {
 
  void write(DataOutput out) throws IOException;

  void readFields(DataInput in) throws IOException;

}
public interface WritableComparable<T> extends Writable, Comparable<T> {}

三:Hadoop內置的數據輸入格式和RecordReader。oop

    數據輸入格式(InputFormat)用於描述MapReduce做業的數據輸入規範。MapReduce框架依靠數據輸入格式完成輸入規範檢查、對數據文件進行輸入分塊(InputSplit),以及提供從輸入分塊中將數據記錄逐一讀出、並轉換爲Map過程的輸入鍵值對等功能。測試

    Hadoop提供了豐富的內置數據輸入格式,最經常使用的數據輸入格式包括:TextInputFormat 和 KeyValueInputFormat。this

    TextInputFormat是系統默認的數據輸入格式,能夠將文本文件分塊並逐行讀入以便Map節點進行處理。讀入一行時,所產生的主鍵key就是當前行在整個文本文件中的字節偏移位置,而value就是該行的內容。spa

//TextInputFormat部分源碼:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  //....
}

    KeyValueTextInputFormat是另外一個經常使用的數據輸入格式,可將一個按照<key,value>格式逐行存放的文本文件逐行讀出,並自動解析生成相應的key和value。

//KeyValueTextInputFormat部分源碼:
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {

  // ...

  public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
      TaskAttemptContext context) throws IOException {
    
    context.setStatus(genericSplit.toString());
    return new KeyValueLineRecordReader(context.getConfiguration());
  }

}

    RecordReader對於一個數據輸入格式,都須要有一個對應的RecordReader,主要用於將一個文件中的數據記錄拆分紅具體的鍵值對。TextInputFormat的默認RecordReader是LineRecordReader,而KeyValueTextInputFormat的默認RecordReader是KeyValueLineRecordReader。

四:Hadoop內置的數據輸出格式與RecordWriter。

    數據輸出格式(OutputFormat)用於描述MapReduce做業的數據輸出規範。MapReduce框架依靠數據輸出格式完成輸出規範檢查以及提供做業結果數據輸出功能。

    一樣,最經常使用的數據輸出格式是TextOutputFormat,也是系統默認的數據輸出格式,能夠將計算結果以 「key + \t + vaue」的形式逐行輸出到文本文件中。

    與數據輸入格式相似樣,數據輸出格式也提供一個對應的RecordWriter,以便系統明確輸出結果寫入到文件中的具體格式。TextInputFormat的默認RecordWriter是LineRecordWriter,其實際操做是將結果數據以「key + \t + value」的形式輸出到文本文件中。

//TextOutputFormat的部分源碼:
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {

  protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
    // ...

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      //...
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    private void writeObject(Object o) throws IOException {
       // ...
    }

    public synchronized void write(K key, V value) throws IOException {
      //...
      out.write(newline);
    }

  }

  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job
                         ) throws IOException, InterruptedException {
     // ...
  }
}

五:經過打印輸出UserInfo小例子來實現簡單的用戶自定義數據類型,數據輸入格式,數據輸出格式。 (簡單的說就是模仿源碼,基本上沒多大變化)。

        如下附上案例源碼:

1.定義本身的UserInfo,做爲數據類型。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class UserInfo implements WritableComparable<UserInfo> {

	private int id;
	private String name;
	private int age;
	private String sex;
	private String address;
	
	public UserInfo() {
	}
	public UserInfo(int id, String name, int age, String sex, String address) {
		this.id = id;
		this.name = name;
		this.age = age;
		this.sex = sex;
		this.address = address;
	}

	// JavaBean 普通的get set方法....

	@Override
	public void readFields(DataInput in) throws IOException {
		this.id = in.readInt();
		this.name = in.readUTF();
		this.age = in.readInt();
		this.sex = in.readUTF();
		this.address = in.readUTF();
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(id);
		out.writeUTF(name);
		out.writeInt(age);
		out.writeUTF(sex);
		out.writeUTF(address);
	}

	@Override
	public String toString() {
		return "Id:" + id + ", Name:" + name + ", Age:" + age + ", Sex:" + sex + ", Address:" + address ;
	}

	@Override
	public int compareTo(UserInfo userInfo) {
		return 0;
	}
}

2.定製本身的數據輸入格式:UserInfoTextInputFormat。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class UserInfoTextInputFormat extends FileInputFormat<Text, UserInfo> {
	@Override
	public RecordReader<Text, UserInfo> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		context.setStatus(split.toString());
		UserInfoRecordReader userInforRecordReader = new UserInfoRecordReader(context.getConfiguration() );
		return userInforRecordReader;
	}
}

3.定製本身的RecordReader:UserInfoRecordReader。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class UserInfoRecordReader extends RecordReader<Text, UserInfo> {
	public static final String KEY_VALUE_SEPERATOR = 
			"mapreduce.input.keyvaluelinerecordreader.key.value.separator";

	private final LineRecordReader lineRecordReader;

	private byte separator = (byte) '\t';

	private Text innerValue;
	private Text key;

	private UserInfo value;


	public Class getKeyClass() { 
		return Text.class;
	}

	public UserInfoRecordReader(Configuration conf)throws IOException {
		lineRecordReader = new LineRecordReader();
		String sepStr = conf.get(KEY_VALUE_SEPERATOR,"\t");
		this.separator = (byte) sepStr.charAt(0);
	}

	public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {
		lineRecordReader.initialize(genericSplit, context);
	}

	public static int findSeparator(byte[] utf, int start, int length, byte sep) {
		for (int i = start; i < (start + length); i++) {
			if (utf[i] == sep) {
				return i;
			}
		}
		return -1; //將這個截取標識符的位置給返回回去。
	}

	public static void setKeyValue(Text key, UserInfo value, byte[] line,int lineLen, int pos) {
		if (pos == -1) {
			key.set(line, 0, lineLen);
			value.setId(0);
			value.setName("");
			value.setAge(0);
			value.setSex("");
			value.setAddress("");
		} else {
			key.set(line, 0, pos); //設置鍵  從 第 0位置 到 截取標識符的位置
			Text text = new Text();
			text.set(line, pos + 1, lineLen - pos - 1);
			System.out.println("text的值: "+text);
			String[] str = text.toString().split(",");
			for (int i=0;i<str.length;i++) {
				//System.out.println("根據逗號分隔開來的值:  " + str[i] );
				String[] strKeyValue = str[i].split(":");
				//System.out.println("strKeyValue的Key-Value:" + key+"--->"+value);
				if("ID".equals(strKeyValue[0])){
					value.setId(Integer.parseInt( strKeyValue[1]) );
				}else if("Name".equals(strKeyValue[0])){
					value.setName( strKeyValue[1]);
				}else if("Age".equals(strKeyValue[0])){
					value.setAge(Integer.parseInt( strKeyValue[1]) );
				}else if("Sex".equals(strKeyValue[0])){
					value.setSex(strKeyValue[1] );
				}else if("Address".equals(strKeyValue[0])){
					value.setAddress(strKeyValue[1] );
				}
			}
//			System.out.println( "key--> " + key);
//			System.out.println( "value--> "+value +"\n\n");
		}
	}
	
	public synchronized boolean nextKeyValue()throws IOException {
		byte[] line = null;
		int lineLen = -1;
		if (key == null) {
			key = new Text();
		}
		if (value == null) {
			value = new UserInfo(); 
		}
		if (lineRecordReader.nextKeyValue()) {
			innerValue = lineRecordReader.getCurrentValue();
			line = innerValue.getBytes();
			lineLen = innerValue.getLength();
		} else {
			return false;
		}
		if (line == null){
			return false;
		}

		int pos = findSeparator(line, 0, lineLen, this.separator);
		setKeyValue(key, value, line, lineLen, pos);
		return true;
	}

	public Text getCurrentKey() {
		return key;
	}
	public UserInfo getCurrentValue() {
		return value;
	}

	public float getProgress() throws IOException {
		return lineRecordReader.getProgress();
	}

	public synchronized void close() throws IOException { 
		lineRecordReader.close();
	}

}

3.定製本身的輸出格式:UserInfoTextOutputFormat。

 

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class UserInfoTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
	public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
	protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
		private static final String utf8 = "UTF-8";
		private static final byte[] newline;
		static {
			try {
				newline = "\n".getBytes(utf8);
				//System.out.println(  "newline --> " + newline);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;

		public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
			this.out = out;
			try {
				this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}

		public LineRecordWriter(DataOutputStream out) {
			this(out, "\t");
		}

		private void writeObject(Object o) throws IOException {
			if (o instanceof Text) {
				Text to = (Text) o;
				System.out.println(  "o instanceof Text  --> True : "+ to.toString()  );
				out.write(to.getBytes(), 0, to.getLength());
			} else {
				out.write(o.toString().getBytes(utf8));
				System.out.println( "o instanceof Text  --> false : "+ o.toString()  );
			}
		}

		public synchronized void write(K key, V value) throws IOException {
			boolean nullKey = key == null || key instanceof NullWritable;
			boolean nullValue = value == null || value instanceof NullWritable;
			System.out.println(  "nullKey--> "+nullKey +" ,  nullValue--> "+nullValue);
			if (nullKey && nullValue) {
				return;
			}
			System.out.println( " nullkey --> "+ nullKey + ", !nullkey -->"+nullKey);
			if (!nullKey) {
				writeObject(key);
			}
			System.out.println( "(nullKey || nullValue) --> " + (nullKey || nullValue) );
			if (!(nullKey || nullValue)) {
				out.write(keyValueSeparator);
			}
			if (!nullValue) {
				writeObject(value);
			}
			out.write(newline);
		}

		public synchronized void close(TaskAttemptContext context) throws IOException {
			out.close();
		}
	}

	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		String keyValueSeparator= conf.get(SEPERATOR, "---->");
		System.out.println(  "keyValueSeparator---> "+keyValueSeparator);
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class<? extends CompressionCodec> codecClass = 
					getOutputCompressorClass(job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		System.out.println(  "file --> Path : "+ file  );
		FileSystem fs = file.getFileSystem(conf);
		
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			System.out.println( "if---isCompressed-->: "+fileOut);
			return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			System.out.println( "else---isCompressed-->: "+fileOut);
			return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
		}
	}
}

5.測試類:PrintUserInfo

package com.hadoop.mapreduce.test4.outputformat;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PrintUserInfo {
	public static final IntWritable ONE = new IntWritable(1);
	public static class UserInfoMapper extends Mapper<Text, UserInfo, Text, UserInfo>{
		@Override
		protected void map(Text key, UserInfo value, Mapper<Text, UserInfo, Text, UserInfo>.Context context)
				throws IOException, InterruptedException {
			super.map(key, value, context);
		}
	}
	
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			Job job = Job.getInstance(conf, "UserInfo");
			
			job.setJarByClass(PrintUserInfo.class);
			job.setMapperClass(UserInfoMapper.class);
			
			//定製輸入格式:
			job.setInputFormatClass(UserInfoTextInputFormat.class);
			//定製輸出格式:
			job.setOutputFormatClass(UserInfoTextOutputFormat.class);
			
			job.setMapOutputKeyClass(Text.class);
			//用的本身定義的數據類型
			job.setMapOutputValueClass(UserInfo.class);
			
			FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/mapuserinfo.txt"));
			FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/data/output7/"+System.currentTimeMillis()+"/"));
			System.exit(job.waitForCompletion(true) ? 0 : 1);//執行job
			
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

6.輸出結果:

1.數據文件:

1	ID:221,Name:xj,Age:22,Sex:man,Address:hunan,
2	ID:222,Name:cc,Age:21,Sex:Woman,Address:miluo,

2.結果文件:

1---->Id:221, Name:xj, Age:22, Sex:man, Address:hunan
2---->Id:222, Name:cc, Age:21, Sex:Woman, Address:miluo
相關文章
相關標籤/搜索