hadoop升級到0.23.0和1.0版本後,其IO底層除了本身實現的Writable序列化後,還增長了一個io.serializer包,該包提供了一種可插拔的持久化框架(Pluggable Serialization Framework)。之因此說是可插拔的,是由於能夠把現存的持久化方式嵌入到hadoop的程序中。這個包中有如下的一些類: java
接口類:
web
Serializer:定義了序列化的接口apache
Serialization:定義了序列化的機制(做爲序列化接口Serializer和反序列化接口Deserializer的上層抽象,提供了一個抽象的序列化和反序列化實例的返回)框架
Serializer接口(具體的序列化類實現接口)函數
Serializer接口位於hadoop.io. Serializer包下面,爲hadoop的序列化提供了一種機制。(This package provides a mechanism for using different serialization frameworks in Hadoop)oop
該接口包含了三個方法:this
void open(OutputStream out)spa
--打開一個輸出流爲序列化作準備。code
void serialize(T t)
--對一個對象序列化
void close()
--關閉輸出流
Serialization接口(序列化框架接口)
該接口做爲序列化和反序列化內部封裝的一個部分,提供了下面三個方法。
accept(Class<?> c)
--查看參數Class類是否支持序列化。
Deserializer<T> getDeserializer(Class<T> c)
--獲取反序列化對象
Serializer<T> getSerializer(Class<T> c)
--獲取序列化對象
Deserializer接口
該接口提供了反序列化的機制,與Serializer接口相對應,該接口一樣提供了三個方法。
<<
Provides a facility for deserializing objects of type from an InputStream.
Deserializers are stateful, but must not buffer the input since other producers may read from the input between calls to deserialize(Object).
>>
void close()
--關閉輸入流
T deserialize(T t)
--對一個對象反序列化
Deserialize(T t)方法的實現:
@SuppressWarnings("unchecked")
public T deserialize(T object) throws IOException {
try {
// ignore passed-in object
return (T) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e.toString());
}
}
void open(InputStream in)
--打開一個輸入流實現反序列化作準備
Open方法的實現爲:
private ObjectInputStream ois;
public void open(InputStream in) throws IOException {
ois = new ObjectInputStream(in) {
@Override protected void readStreamHeader() {
// no header
}
};
}
注意:open方法利用裝飾器模式爲InputStream裝飾爲ObjectInputStream
Deserialize方法從open方法裝飾後的ObjectInputStream類中讀取反序列化的對象,從源碼中能夠看出Deserialize並未對ObjectInputStream進行驗證,這也就意味着必須先open->Deserialize不然會拋出異常。
其餘的類:
JavaSerilization:實現Serialization接口,並維護兩個分別實現Serializer和Deserializer接口的內部類。這兩個內部類包裝Java的序列化機制,實現對實現Serializable接口的類的序列化。
說明:
1.open方法利用裝飾器模式爲InputStream裝飾爲ObjectInputStream
2. Deserialize方法從open方法裝飾後的ObjectInputStream類中讀取反序列化的對象,從源碼中能夠看出Deserialize並未對ObjectInputStream進行驗證,這也就意味着必須先open->Deserialize不然會拋出異常。
結構以下:
WritableSerialization:同上,實現對實現Writable接口的類的序列化
說明:
1.open方法利用裝飾器模式爲InputStream裝飾爲DataInputStream
2. Deserialize方法實際上調用的是Writable類的write(DataOutPut)方法和Writable.readFields(java.io.DataInput)對從從open方法裝飾後的DataInputStream進行反序列化或序列化的,從源碼中能夠看出Deserialize並未對DataInputStream進行驗證,這也就意味着必須先open->Deserialize不然會拋出異常。該類序列化和反序列化調用的實際是Writable的方法,進行了便捷的封裝。
結構以下:
SerilizationFactory:維護一個Serilization的ArrayList。它具備參數爲Configuration的構造函數,把parameter io.serializations中逗號隔開的serialization都添加進來。
結構以下:
說明:
1.該類是Serialization的Factory模式,是Configuration的衍生類,成員變量爲Log和ArrayList<Serialization<?>>,ArrayList保存Serialization的Class集合。
2.構造方法從Conf配置文件中讀取io.serializations的屬性配置,而後利用add()方法添加到ArrayList<Serialization<?>>中。
3.add()方法從Conf文件利用serializationNam 得到conf.getClassByName(serializationName)的Class屬性,再利用反射機制實例化該類並添加到ArrayList<Serialization<?>> serializations中。
4.getSerialization方法根據Class從ArrayList獲取具體的Serialization類。
5.怎麼知道對應的Class獲取的是哪一個序列化框架提供的序列化實例實際上也是經過ArrayList<Serialization<?>>中去獲取知足accept()方法所對應的Serialization。
------------------我表示我是分隔符----------------------------------------------
org.apache.hadoop.io中有兩個類:
Stringifier是把類轉化爲字符串和把字符串轉化爲類的一個接口。DefaultStringifier實現了這個接口,其中用到的序列化方式就是SerializationFactory中維護的Serialization。
下面的程序演示瞭如何在把Java的序列化方式加入到這個框架中來:
package test; import java.io.Serializable; public class TestSerializer implements Serializable{ /** * */ private static final long serialVersionUID = -5063738225407612355L; private int a; private String b; public TestSerializer(int a, String b) { super(); this.a = a; this.b = b; } public int getA() { return a; } public void setA(int a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } }
package test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DefaultStringifier; public class serializer { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub Configuration conf = new Configuration(); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); TestSerializer ts = new TestSerializer(1,"234"); DefaultStringifier<TestSerializer> ds = new DefaultStringifier<TestSerializer>(conf, TestSerializer.class); String s = null; try { s = ds.toString(ts); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(s); TestSerializer tsxp = null; try { tsxp = ds.fromString(s); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(tsxp.getA()+":"+tsxp.getB()); } }