Hadoop序列化中的Writable接口(附部分源碼)

序列化是將結構化對象爲字節流以便與經過網絡進行傳輸或者寫入持久存儲。反序列化指的是將字節流轉爲一系列結構化對象的過程。 java

序化在分佈式數據處理的兩大領域常常出現:進程間通訊和永久存儲 apache

hadoop中,節點直接的進程間通訊是用遠程過程調用(RPC)實現的。RPC協議將消息序列化成二進制流後發送到運城節點,遠程節點接着將二進制流反序列化爲原始的消息。 數組

在Hadoop中,Writable接口定義了兩個方法:一個用於將其狀態寫入二進制格式的DataOutput流,另外一個用於從二進制格式的DataInput流讀取其態。 網絡

packageorg.apache.hadoop.io;
importjava.io.DataOutput;
importjava.io.DataInput;
importjava.io.IOException;
public interface Writable {
    void write(DataOutput out)throws IOException;
    void readFields(DataInput in)throws IOException;

write和readFields分別實現了把對象序列化和反序列化的功能 分佈式

讓咱們來看一個特別的Writable,看看能夠對它進行哪些操做。咱們要使用IntWritable,這是一個Java的int對象的封裝。可使用set()函數來建立和設置它的值: 函數

IntWritable writable =new IntWritable();
writable.set(163);
相似地,咱們也可使用構造函數:
IntWritable writable =newIntWritable(163);
爲了檢查IntWritable的序列化形式,咱們寫一個小的輔助方法,它把一個java.io.ByteArrayOutputStream封裝到java.io.DataOutputStream中(java.io.DataOutput的一個實現),以此來捕獲序列化的數據流中的字節:
public static byte[] serialize(Writable writable)throws IOException {
    ByteArrayOutputStream out =new ByteArrayOutputStream();
    DataOutputStream dataOut =new DataOutputStream(out);
    writable.write(dataOut);
    dataOut.close();
    returnout.toByteArray();
}
整數用四個字節寫入(咱們使用JUnit 4斷言):
byte[] bytes = serialize(writable);
assertThat(bytes.length, is(4));
字節使用大端順序寫入(因此,最重要的字節寫在數據流的開始處,這是由java.io.DataOutput接口規定的),咱們可使用Hadoop的StringUtils方法看到它們的十六進制表示:
assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

讓咱們再來試試反序列化。咱們建立一個幫助方法來從一個字節數組讀取一個Writable對象: oop

public static byte[] deserialize(Writable writable,byte[] bytes)
throws IOException {
    ByteArrayInputStream in =new ByteArrayInputStream(bytes);
    DataInputStream dataIn =new DataInputStream(in);
    writable.readFields(dataIn);
    dataIn.close();
    return bytes;
}
咱們構造一個新的、缺值的IntWritable,而後調用deserialize()方法來讀取剛寫入的輸出流。而後發現它的值(使用get方法檢索獲得)仍是原來的值163:
IntWritable newWritable =new IntWritable();
deserialize(newWritable, bytes);
assertThat(newWritable.get(), is(163));
WritableComparable 和comparator

IntWritable實現了WritableComparable接口,後者是Writable和java.lang.Comparable接口的子接口。 優化

packageorg.apache.hadoop.io;
public interface WritableComparable<t> extends Writable, Comparable<t> {
}

類型的比較對MapReduce而言相當重要的,鍵和鍵之間的比較是在排序階段完成。Hadoop提供的一個優化方法是從Java Comparator的RawComparator擴展 spa

packageorg.apache.hadoop.io;
importjava.util.Comparator;
public interface RawComparator<t> extends Comparator<t> {
     public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2);
}
 package java.util;
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}
這個接口容許執行者比較從流中讀取的未被反序列化爲對象的記錄,從而省去了建立對象的全部開銷。例如,IntWritables的comparator使用原始的compare()方法從每一個字節數組的指定開始位置(S1和S2)和長度(L1和L2)讀取整數b1和b2而後直接進行比較。

WritableComparator是RawComparator對WritableComparable類的一個通用實現。它提供兩個主要功能。首先,它提供了一個默認的對原始compare()函數的調用,對從數據流對要比較的對象進行反序列化,而後調用對象的compare()方法其次,它充當的是RawComparator實例的一個工廠方法(Writable方法已經註冊)。例如,爲得到IntWritable的comparator,咱們只需使用: code

RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);

WritableComparator get方法源碼:


private static HashMap<Class, WritableComparator> comparators =
    new HashMap<Class, WritableComparator>(); // registry

  /** Get a comparator for a {@link WritableComparable} implementation. */
  public static synchronized WritableComparator get(Class<? extends WritableComparable> c) {
    WritableComparator comparator = comparators.get(c);
    if (comparator == null)
      comparator = new WritableComparator(c, true);
    return comparator;
  }


comparator能夠用來比較兩個IntWritable:

IntWritable w1 =newIntWritable(163);
IntWritable w2 =newIntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));

或者它們的序列化描述: 

byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));
WritableComparator的compare()方法的源碼:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    try {
      buffer.reset(b1, s1, l1);                   // parse key1
      key1.readFields(buffer);
      
      buffer.reset(b2, s2, l2);                   // parse key2
      key2.readFields(buffer);
      
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    
    return compare(key1, key2);                   // compare them
  }

 @SuppressWarnings("unchecked")
  public int compare(WritableComparable a, WritableComparable b) {
    return a.compareTo(b);
  }


參考:《hadoop權威指南》

相關文章
相關標籤/搜索