Okio 源碼解析(一):數據讀取流程

簡介

Okio 是 square 開發的一個 Java I/O 庫,而且也是 OkHttp 內部使用的一個組件。Okio 封裝了 java.iojava.nio,而且有多個優勢:java

  • 提供超時機制
  • 不須要人工區分字節流與字符流,易於使用
  • 易於測試

本文先介紹 Okio 的基本用法,而後分析源碼中數據讀取的流程。segmentfault

基本用法

Okio 的用法很簡單,下面是讀取和寫入的示例:數組

// 讀取
InputStream inputStream = ...
BufferedSource bufferedSource = Okio.buffer(Okio.source(inputStream));
String line = bufferedSource.readUtf8();

// 寫入
OutputStream outputStream = ...
BufferedSink bufferedSink = Okio.buffer(Okio.sink(outputStream));
bufferedSink.writeString("test", Charset.defaultCharset());
bufferedSink.close();

Okio 用 Okio.source 封裝 InputStream,用 Okio.sink 封裝 OutputStream。而後統一交給 Okio.buffer 分別得到 BufferedSourceBufferedSink,這兩個類提供了大量的讀寫數據的方法。BufferedSource 中包含的部分接口以下:緩存

int readInt() throws IOException;
long readLong() throws IOException;
byte readByte() throws IOException;
ByteString readByteString() throws IOException;
String readUtf8() throws IOException;
String readString(Charset charset) throws IOException;

其中既包含了讀取字節流,也包含讀取字符流的方法,BufferedSink 則提供了對應的寫入數據的方法。數據結構

基本框架

Okio 中有4個接口,分別是 SourceSinkBufferedSourceBufferedSinkSourceSink 分別用於提供字節流和接收字節流,對應於 InpustreamOutputStreamBufferedSourceBufferedSink 則是保存了相應的緩存數據用於高效讀寫。這幾個接口的繼承關係以下:框架

okio源碼

從上圖能夠看出,SourceSink 提供基本的 readwrite 方法,而 BufferedSourceBufferedSink 則提供了更多的操做數據的方法,但這些都是接口,真正實現的類是 RealBufferedSourceRealBufferedSinkide

另外還有個類是 Buffer, 它同時實現了 BufferedSourceBufferedSink,而且 RealBufferedSourceRealbufferedSink 都包含一個 Buffer 對象,真正的數據讀取操做都是交給 Buffer 完成的。測試

因爲 read 和 write 操做相似,下面以 read 的流程對代碼進行分析。ui

Okio.source

Okio.source 有幾個重載的方法,用於封裝輸入流,最終調用的代碼以下:this

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

從上面代碼能夠看出,Okio.source 接受兩個參數,一個是 InputStream,另外一個是 Timeout,返回了一個匿名的 Source 的實現類。這裏主要看一下 read 方法,首先是參數爲空的判斷,而後是從 in 中讀取數據到類型爲 Buffersink 中,這段代碼中涉及到 Buffer 以及 Segment,下面先看看這兩個東西。

Segment

在 Okio 中,每一個 Segment 表明一段數據,多個 Segment 串成一個循環雙向鏈表。下面是 Segment 的成員變量和構造方法:

final class Segment {
  // segment數據的字節數
  static final int SIZE = 8192;
  // 共享的Segment的最低的數據大小
  static final int SHARE_MINIMUM = 1024;
  // 實際保存的數據
  final byte[] data;
  // 下一個可讀的位置
  int pos;
  // 下一個可寫的位置
  int limit;
  // 保存的數據是不是共享的
  boolean shared;
  // 保存的數據是不是獨佔的
  boolean owner;
  // 鏈表中下一個節點
  Segment next;
  // 鏈表中上一個節點
  Segment prev;

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  Segment(Segment shareFrom) {
    this(shareFrom.data, shareFrom.pos, shareFrom.limit);
    shareFrom.shared = true;
  }

  Segment(byte[] data, int pos, int limit) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.owner = false;
    this.shared = true;
  }
  ...
}

變量的含義已經寫在了註釋中,能夠看出 Segment 中的數據保存在一個字節數組中,並提供了一些變量標識讀與寫的位置。Segment 既然是鏈表中的節點,下面看一下插入與刪除的方法:

// 在當前Segment後面插入一個Segment
public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }
// 從鏈表中刪除當前Segment,並返回其後繼節點
public @Nullable Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

插入與刪除的代碼其實就是數據結構中鏈表的操做。

Buffer

下面看看 Buffer 是如何使用 Segment 的。Buffer 中有兩個重要變量:

@Nullable Segment head;
long size;

一個是 head,表示這個 Buffer 保存的 Segment 鏈表的頭結點。還有一個 size,用於記錄 Buffer 當前的字節數。

在上面 Okio.source 中生成的匿名的 Sourceread 方法中,要讀取數據到 Buffer 中,首次是調用了 writableSegment,這個方法是獲取一個可寫的 Segment,代碼以下所示:

Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }

    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

獲取 Segment 的邏輯是先判斷 Buffer 是否有了 Segment 節點,沒有就先去 SegmentPool 中取一個,而且將首尾相連,造成循環鏈表。若是已經有了,找到末尾的 Segment,判斷其剩餘空間是否知足,不知足就再從 SegmentPool 中獲取一個新的 Segment 添加到末尾。最後,返回末尾的 Segment 用於寫入。

SegmentPool 用於保存廢棄的 Segment,其中有兩個方法,take 從中獲取,recycle 用於回收。

上面 Okio.buffer(Okio.source(in)) 最終獲得的是 RealBufferedSource,這個類中持有一個 Buffer 對象和一個 Source 對象,真正的讀取操做由這兩個對象合做完成。下面是 readString 的代碼:

@Override public String readString(long byteCount, Charset charset) throws IOException {
    require(byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    return buffer.readString(byteCount, charset);
  }
@Override public void require(long byteCount) throws IOException {
    if (!request(byteCount)) throw new EOFException();
  }
// 從source中讀取數據到buffer中
@Override public boolean request(long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");
    while (buffer.size < byteCount) {
      if (source.read(buffer, Segment.SIZE) == -1) return false;
    }
    return true;
  }

首先是從 Source 中讀取數據到 Buffer 中,而後調用 buffer.readstring 方法獲得最終的字符串。下面是 readString 的代碼:

@Override public String readString(long byteCount, Charset charset) throws EOFException {
    checkOffsetAndCount(size, 0, byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    if (byteCount > Integer.MAX_VALUE) {
      throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
    }
    if (byteCount == 0) return "";

    Segment s = head;
    if (s.pos + byteCount > s.limit) {
      // 若是string跨多個Segment,委託給readByteArray去讀
      return new String(readByteArray(byteCount), charset);
    }
    // 將字節序列轉換成String
    String result = new String(s.data, s.pos, (int) byteCount, charset);
    s.pos += byteCount;
    size -= byteCount;

    // 若是pos==limit,回收這個Segment
    if (s.pos == s.limit) {
      head = s.pop();
      SegmentPool.recycle(s);
    }

    return result;
}

在上面的代碼中,即是從 BufferSegment 鏈表中讀取數據。若是 String 跨多個 Segment,那麼調用 readByteArray 循環讀取字節序列。最終將字節序列轉換爲 String 對象。若是 Segmentpos 等於 limit,說明這個 Segment 的數據已經所有讀取完畢,能夠回收,放入 SegmentPool

Okio 讀取數據的時候統一將輸入流當作是字節序列,讀入 Buffer 後在用到的時候再轉換,例如上面讀取 String 時將字節序列進行了轉換。其它還有不少類型,以下面是 readInt 的代碼:

@Override public int readInt() {
    if (size < 4) throw new IllegalStateException("size < 4: " + size);

    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;

    // If the int is split across multiple segments, delegate to readByte().
    if (limit - pos < 4) {
      return (readByte() & 0xff) << 24
          |  (readByte() & 0xff) << 16
          |  (readByte() & 0xff) <<  8
          |  (readByte() & 0xff);
    }

    byte[] data = segment.data;
    int i = (data[pos++] & 0xff) << 24
        |   (data[pos++] & 0xff) << 16
        |   (data[pos++] & 0xff) <<  8
        |   (data[pos++] & 0xff);
    size -= 4;

    if (pos == limit) {
      head = segment.pop();
      SegmentPool.recycle(segment);
    } else {
      segment.pos = pos;
    }

    return i;
}

Buffer 使用 Segment 鏈表保存數據,有個好處是在不一樣的 Buffer 之間移動數據只須要轉移其字節序列的擁有權,如 copyTo(Buffer out, long offset, long byteCount) 代碼所示:

public Buffer copyTo(Buffer out, long offset, long byteCount) {
    if (out == null) throw new IllegalArgumentException("out == null");
    checkOffsetAndCount(size, offset, byteCount);
    if (byteCount == 0) return this;

    out.size += byteCount;

    // Skip segments that we aren't copying from.
    Segment s = head;
    for (; offset >= (s.limit - s.pos); s = s.next) {
      offset -= (s.limit - s.pos);
    }

    // Copy one segment at a time.
    for (; byteCount > 0; s = s.next) {
      Segment copy = new Segment(s);
      copy.pos += offset;
      copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
      if (out.head == null) {
        out.head = copy.next = copy.prev = copy;
      } else {
        out.head.prev.push(copy);
      }
      byteCount -= copy.limit - copy.pos;
      offset = 0;
    }

    return this;
}

其中並無拷貝字節數據,只是鏈表的相關操做。

總結

Okio 讀取數據的流程基本就如本文所分析的,寫入操做與讀取是相似的。Okio 經過 SourceSink 標識輸入流與輸出流。在 Buffer 中使用 Segment 鏈表的方式保存字節數據,而且經過 Segment 擁有權的共享避免了數據的拷貝,經過 SegmentPool 避免了廢棄數據的GC,使得 Okio 成爲一個高效的 I/O 庫。Okio 還有一個優勢是超時機制,具體內容可進入下一篇:Okio 源碼解析(二):超時機制

若是個人文章對您有幫助,不妨點個贊支持一下(^_^)

相關文章
相關標籤/搜索