Hadoop 實現定製的Writable類型(附部分源碼)

本人博客開始遷移,博客整個架構本身搭建及編碼http://www.cookqq.com/listBlog.actionjava

writeable接口對java基本類型提供了封裝,short和char除外。全部的封裝包含get()和set()兩個方法用於讀取和設置值。express

Writable的Java基本類封裝
Java基本類型  Writable使用序列化大小(字節)
布爾型          BooleanWritable     1
字節型          ByteWritable          1
整型            IntWritable             4
整型            VIntWritable         1-5
浮點型         FloatWritable          4
長整型         LongWritable          8
長整型         VLongWritable      1-9
雙精度浮點型DoubleWritable       8
Text類型對應java的stringapache

如今看一下IntWritable的源碼架構

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io;

import java.io.*;

/** A WritableComparable for ints. */
public class IntWritable implements WritableComparable {
  private int value;

  public IntWritable() {}

  public IntWritable(int value) { set(value); }

  /** Set the value of this IntWritable. */
  public void set(int value) { this.value = value; }

  /** Return the value of this IntWritable. */
  public int get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(value);
  }

  /** Returns true iff <code>o</code> is a IntWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof IntWritable))
      return false;
    IntWritable other = (IntWritable)o;
    return this.value == other.value;
  }

  public int hashCode() {
    return value;
  }

  /** Compares two IntWritables. */
  public int compareTo(Object o) {
    int thisValue = this.value;
    int thatValue = ((IntWritable)o).value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }

  public String toString() {
    return Integer.toString(value);
  }

  /** A Comparator optimized for IntWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(IntWritable.class);
    }

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

  static {                                        // register this comparator
    WritableComparator.define(IntWritable.class, new Comparator());
  }
}

IntWritable的關係圖:app

(1)IntWritable實現了接口Writable的2個方法:一個用於將其狀態寫入二進制格式的DataOutput流,另外一個用於從二進制格式的DataInput流讀取其態 框架

write和readFields分別實現了把對象序列化和反序列化的功能,是Writable接口定義的兩個方法
less

(2)IntWritable聲明瞭變量value,而且實現了set,get方法 ide

(3)聲明內部類Comparator,而且實現WritableComparator接口中比較未被序列化的對象方法 函數

(4)註冊comparator oop

Hadoop自帶一系列有用的Writable實現,能夠知足絕大多數用途。但有時,咱們須要編寫本身的自定義實現。經過自定義Writable,咱們可以徹底控制二進制表示和排序順序。Writable是MapReduce數據路徑的核心,因此調整二進制表示對其性能有顯著影響。現有的Hadoop Writable應用已獲得很好的優化,但爲了對付更復雜的結構,最好建立一個新的Writable類型,而不是使用已有的類型。

爲了演示如何建立一個自定義Writable,咱們編寫了一個表示一對字符串的實現,名爲TextPair

importjava.io.*;  

import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<textpair> {
private Text first;
private Text second;
public TextPair() {
set(newText(),newText());
}
public TextPair(String first, String second) {
set(newText(first),newText(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out)throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in)throwsIOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() *163+ second.hashCode();
}
@Override
public boolean equals(Object o) {
if(o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first +"\t"+ second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if(cmp !=0) {
return cmp;
}
return second.compareTo(tp.second);
}
}

此實現的第一部分直觀易懂:有兩個Text實例變量(first和second)和相關的構造函數、get方法和set方法。全部的Writable實現都必須有一個默認的構造函數,以便MapReduce框架可以對它們進行實例化,進而調用readFields()方法來填充它們的字段。 Writable實例是易變的、常常重用的,因此咱們應該儘可能避免在write()或readFields()方法中分配對象。

經過委託給每一個Text對象自己,TextPair的write()方法依次序列化輸出流中的每個Text對象。一樣,也經過委託給Text對象自己,readFields()反序列化輸人流中的字節。DataOutput和DataInput接口有豐富的整套方法用於序列化和反序列化Java基本類型,因此在通常狀況下,咱們可以徹底控制Writable對象的數據傳輸格式。

正如爲Java寫的任意值對象同樣,咱們會重寫java.lang.Object的hashCode()方法,equals()方法和toString()方法。HashPartitioner使用hashCode()方法來選擇reduce分區,因此應該確保寫一個好的哈希函數來確保reduce函數的分區在大小上是至關的。

TextPair是WritableComparable的實現,因此它提供了compareTo()方法的實現,加入咱們但願的順序:它經過一個一個String逐個排序。請注意,TextPair不一樣於前面的TextArrayWritable類(除了它能夠存儲Text對象數以外),由於TextArrayWritable只是一個Writable,而不是WritableComparable。

實現一個快速的RawComparator

上例中所示代碼可以有效工做,但還能夠進一步優化。正如前面所述,在MapReduce中,TextPair被用做鍵時,它必須被反序列化爲要調用的compareTo()方法的對象。是否能夠經過查看其序列化表示的方式來比較兩個TextPair對象。

事實證實,咱們能夠這樣作,由於TextPair由兩個Text對象鏈接而成,二進制Text對象表示是一個可變長度的整型,包含UTF-8表示的字符串中的字節數,後跟UTF-8字節自己。關鍵在於讀取開始的長度。從而得知第一個Text對象的字節表示有多長,而後能夠委託Text對象的RawComparator,而後利用第一或者第二個字符串的偏移量來調用它。下面例子給出了具體方法(注意,該代碼嵌套在TextPair類中)。

public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR =new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int  compare(byte[] b1,int s1,int l1,
byte[] b2,int s2,int l2) {
try{
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if(cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
}catch(IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static{
WritableComparator.define(TextPair.class,newComparator());
}

 

事實上,咱們通常都是繼承WritableComparator,而不是直接實現RawComparator,由於它提供了一些便利的方法和默認實現。這段代碼的精妙之處在於計算firstL1和firstL2,每一個字節流中第一個Text字段的長度。每一個都由可變長度的整型(由WritableUtils的decodeVIntSize()返回)和它的編碼值(由readVInt()返問)組成。

靜態代碼塊註冊原始的comparator以便MapReduce每次看到TextPair類,就知道使用原始comparator做爲其默認comparator。

自定義comparator

從TextPair可知,編寫原始的cornparator比較費力,由於必須處理字節級別的細節。若是須要編寫本身的實現,org.apache.hadoop.io包中Writable的某些前瞻性實現值得研究研究。WritableUtils的有效方法也比較很是方便。

若是可能,還應把自定義comparator寫爲RawComparators。這些comparator實現的排序順序不一樣於默認comparator定義的天然排序順序。下面的例子顯示了TextPair的comparator,稱爲First Comparator。只考慮了一對Text對象中的第一個字符串。請注意,咱們重寫了compare()方法使其使用對象進行比較,因此兩個compare()方法的語義是相同的。

public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR =newText.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1,ints1,intl1,
byte[] b2,ints2,intl2) {
try{
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
}catch(IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if(a instanceof TextPair && b instanceof TextPair) {
return((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}
參考:《hadoop權威指南》
相關文章
相關標籤/搜索