序列化是將結構化對象爲字節流以便與經過網絡進行傳輸或者寫入持久存儲。反序列化指的是將字節流轉爲一系列結構化對象的過程。 java
序化在分佈式數據處理的兩列大領域常常出現:進程間通訊和永久存儲 apache
hadoop中,節點直接的進程間通訊是用遠程過程調用(RPC)實現的。RPC協議將消息序列化成二進制流後發送到運城節點,遠程節點接着將二進制流反序列化爲原始的消息。 數組
在Hadoop中,Writable接口定義了兩個方法:一個用於將其狀態寫入二進制格式的DataOutput流,另外一個用於從二進制格式的DataInput流讀取其態。 網絡
packageorg.apache.hadoop.io; importjava.io.DataOutput; importjava.io.DataInput; importjava.io.IOException; public interface Writable { void write(DataOutput out)throws IOException; void readFields(DataInput in)throws IOException;
write和readFields分別實現了把對象序列化和反序列化的功能 分佈式
讓咱們來看一個特別的Writable,看看能夠對它進行哪些操做。咱們要使用IntWritable,這是一個Java的int對象的封裝。可使用set()函數來建立和設置它的值: 函數
IntWritable writable =new IntWritable(); writable.set(163);
IntWritable writable =newIntWritable(163);
public static byte[] serialize(Writable writable)throws IOException { ByteArrayOutputStream out =new ByteArrayOutputStream(); DataOutputStream dataOut =new DataOutputStream(out); writable.write(dataOut); dataOut.close(); returnout.toByteArray(); }
byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));
assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));
讓咱們再來試試反序列化。咱們建立一個幫助方法來從一個字節數組讀取一個Writable對象: oop
public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException { ByteArrayInputStream in =new ByteArrayInputStream(bytes); DataInputStream dataIn =new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }咱們構造一個新的、缺值的IntWritable,而後調用deserialize()方法來讀取剛寫入的輸出流。而後發現它的值(使用get方法檢索獲得)仍是原來的值163:
IntWritable newWritable =new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));
IntWritable實現了WritableComparable接口,後者是Writable和java.lang.Comparable接口的子接口。 優化
packageorg.apache.hadoop.io; public interface WritableComparable<t> extends Writable, Comparable<t> { }
類型的比較對MapReduce而言相當重要的,鍵和鍵之間的比較是在排序階段完成。Hadoop提供的一個優化方法是從Java Comparator的RawComparator擴展 spa
packageorg.apache.hadoop.io; importjava.util.Comparator; public interface RawComparator<t> extends Comparator<t> { public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2); }
package java.util; public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }這個接口容許執行者比較從流中讀取的未被反序列化爲對象的記錄,從而省去了建立對象的全部開銷。例如,IntWritables的comparator使用原始的compare()方法從每一個字節數組的指定開始位置(S1和S2)和長度(L1和L2)讀取整數b1和b2而後直接進行比較。
WritableComparator是RawComparator對WritableComparable類的一個通用實現。它提供兩個主要功能。首先,它提供了一個默認的對原始compare()函數的調用,對從數據流對要比較的對象進行反序列化,而後調用對象的compare()方法。其次,它充當的是RawComparator實例的一個工廠方法(Writable方法已經註冊)。例如,爲得到IntWritable的comparator,咱們只需使用: code
RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);
WritableComparator get方法源碼:
private static HashMap<Class, WritableComparator> comparators = new HashMap<Class, WritableComparator>(); // registry /** Get a comparator for a {@link WritableComparable} implementation. */ public static synchronized WritableComparator get(Class<? extends WritableComparable> c) { WritableComparator comparator = comparators.get(c); if (comparator == null) comparator = new WritableComparator(c, true); return comparator; }
comparator能夠用來比較兩個IntWritable:
IntWritable w1 =newIntWritable(163); IntWritable w2 =newIntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));
或者它們的序列化描述:
byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));WritableComparator的compare()方法的源碼:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them } @SuppressWarnings("unchecked") public int compare(WritableComparable a, WritableComparable b) { return a.compareTo(b); }
參考:《hadoop權威指南》