hadoop 023.0與hadoop 1.0 io.serializable分析

   hadoop升級到0.23.0和1.0版本後,其IO底層除了本身實現的Writable序列化後,還增長了一個io.serializer包,該包提供了一種可插拔的持久化框架(Pluggable Serialization Framework)。之因此說是可插拔的,是由於能夠把現存的持久化方式嵌入到hadoop的程序中。這個包中有如下的一些類: java

    接口類:
web

          Serializer:定義了序列化的接口apache

       Serialization:定義了序列化的機制(做爲序列化接口Serializer和反序列化接口Deserializer的上層抽象,提供了一個抽象的序列化和反序列化實例的返回)框架

Deserializer :定義了反序列化的接口ide

Serializer接口(具體的序列化類實現接口)函數

Serializer接口位於hadoop.io. Serializer包下面,爲hadoop的序列化提供了一種機制。(This package provides a mechanism for using different serialization frameworks in Hadoopoop

該接口包含了三個方法:this

    void open(OutputStream out)spa

       --打開一個輸出流爲序列化作準備。code

void serialize(T t)

    --對一個對象序列化

void close()

    --關閉輸出流

 

 

Serialization接口(序列化框架接口)

該接口做爲序列化和反序列化內部封裝的一個部分,提供了下面三個方法。

accept(Class<?> c)

    --查看參數Class類是否支持序列化。

Deserializer<T>   getDeserializer(Class<T> c)

    --獲取反序列化對象

Serializer<T> getSerializer(Class<T> c)

    --獲取序列化對象

 

Deserializer接口

該接口提供了反序列化的機制,與Serializer接口相對應,該接口一樣提供了三個方法。

<< 

Provides a facility for deserializing objects of type from an InputStream.

Deserializers are stateful, but must not buffer the input since other producers may read from the input between calls to deserialize(Object).

>> 

void   close()

    --關閉輸入流

T   deserialize(T t)

    --對一個對象反序列化

    Deserialize(T t)方法的實現:

    @SuppressWarnings("unchecked")

    public T deserialize(T object) throws IOException {

      try {

        // ignore passed-in object

        return (T) ois.readObject();

      } catch (ClassNotFoundException e) {

        throw new IOException(e.toString());

      }

    }

void   open(InputStream in)

    --打開一個輸入流實現反序列化作準備

    Open方法的實現爲:

    private ObjectInputStream ois;

 

    public void open(InputStream in) throws IOException {

      ois = new ObjectInputStream(in) {

        @Override protected void readStreamHeader() {

          // no header

        }

      };

}

 

注意:open方法利用裝飾器模式爲InputStream裝飾爲ObjectInputStream

      Deserialize方法從open方法裝飾後的ObjectInputStream類中讀取反序列化的對象,從源碼中能夠看出Deserialize並未對ObjectInputStream進行驗證,這也就意味着必須先open->Deserialize不然會拋出異常。

其餘的類:

 JavaSerilization:實現Serialization接口,並維護兩個分別實現SerializerDeserializer接口的內部類。這兩個內部類包裝Java的序列化機制,實現對實現Serializable接口的類的序列化。 

說明:

    1.open方法利用裝飾器模式爲InputStream裝飾爲ObjectInputStream

    2. Deserialize方法從open方法裝飾後的ObjectInputStream類中讀取反序列化的對象,從源碼中能夠看出Deserialize並未對ObjectInputStream進行驗證,這也就意味着必須先open->Deserialize不然會拋出異常。

結構以下:

    

  WritableSerialization:同上,實現對實現Writable接口的類的序列化 

說明:

1.open方法利用裝飾器模式爲InputStream裝飾爲DataInputStream 

2. Deserialize方法實際上調用的是Writable類的writeDataOutPut)方法和Writable.readFields(java.io.DataInput)對從open方法裝飾後的DataInputStream進行反序列化或序列化的,從源碼中能夠看出Deserialize並未對DataInputStream進行驗證,這也就意味着必須先open->Deserialize不然會拋出異常。該類序列化和反序列化調用的實際是Writable的方法,進行了便捷的封裝。

結構以下:

 

SerilizationFactory:維護一個SerilizationArrayList。它具備參數爲Configuration的構造函數,把parameter io.serializations中逗號隔開的serialization都添加進來。 

結構以下:

說明:

1.該類是SerializationFactory模式,Configuration的衍生類,成員變量爲LogArrayList<Serialization<?>>,ArrayList保存SerializationClass集合。

2.構造方法從Conf配置文件中讀取io.serializations的屬性配置,而後利用add()方法添加到ArrayList<Serialization<?>>中。

3.add()方法從Conf文件利用serializationNam 得到conf.getClassByName(serializationName)Class屬性,再利用反射機制實例化該類並添加到ArrayList<Serialization<?>> serializations中。

4.getSerialization方法根據ClassArrayList獲取具體的Serialization類。

5.怎麼知道對應的Class獲取的是哪一個序列化框架提供的序列化實例實際上也是經過ArrayList<Serialization<?>>中去獲取知足accept()方法所對應的Serialization。

------------------我表示我是分隔符----------------------------------------------

org.apache.hadoop.io中有兩個類: 

Stringifier是把類轉化爲字符串和把字符串轉化爲類的一個接口。DefaultStringifier實現了這個接口,其中用到的序列化方式就是SerializationFactory中維護的Serialization

下面的程序演示瞭如何在把Java的序列化方式加入到這個框架中來: 

package test;

import java.io.Serializable;

public class TestSerializer implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = -5063738225407612355L;
	private int a;
	private String b;
	public TestSerializer(int a, String b) {
		super();
		this.a = a;
		this.b = b;
	}
	public int getA() {
		return a;
	}
	public void setA(int a) {
		this.a = a;
	}
	public String getB() {
		return b;
	}
	public void setB(String b) {
		this.b = b;
	}
}



package test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DefaultStringifier;

public class serializer {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		Configuration conf = new Configuration();
		conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
		TestSerializer ts = new TestSerializer(1,"234");
		DefaultStringifier<TestSerializer> ds = new DefaultStringifier<TestSerializer>(conf, TestSerializer.class);
		String s = null;
		try {
			s = ds.toString(ts);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(s);
		TestSerializer tsxp = null;
		try {
			tsxp = ds.fromString(s);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(tsxp.getA()+":"+tsxp.getB());
	}

}
相關文章
相關標籤/搜索