Okio 源碼解析:輕量而高效的 I/O 庫

你好,我是 N0tExpectErr0r,一名熱愛技術的 Android 開發java

個人我的博客:blog.N0tExpectErr0r.cnnode

從前面的 OkHttp 源碼解析中咱們能夠知道,OkHttp 中的 I/O 都不是經過咱們平時所使用的 Input/Output Stream 來實現,而是使用了 Okio 這個第三方庫,那它與尋常的 IOStream 有什麼區別呢?讓咱們來分析一下它的源碼。數組

Okio 中有兩個很是重要的接口——Sink 以及 Source,它們都繼承了 Closeable,其中 Sink 對應了咱們原來所使用的 OutputStream,而 Source 則對應了咱們原來所使用的 InputStream緩存

Okio 的入口就是Okio 類,它是一個工廠類,能夠經過它內部的一些 static 方法來建立 SinkSource 等對象。安全

Sink

Sink 實際上只是一個接口,讓咱們看看 Sink 中有哪些方法:bash

public interface Sink extends Closeable, Flushable {
  void write(Buffer source, long byteCount) throws IOException;

  @Override void flush() throws IOException;

  Timeout timeout();

  @Override void close() throws IOException;
}
複製代碼

能夠看到,它主要包含了 writeflushtimeoutclose 這幾個方法,咱們能夠經過 Okio.sink 方法基於 OutputStream 獲取一個 Sinkless

private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Sink() {
        @Override public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);
            while (byteCount > 0) {
                timeout.throwIfReached();
                Segment head = source.head;
                int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
                out.write(head.data, head.pos, toCopy);
                head.pos += toCopy;
                byteCount -= toCopy;
                source.size -= toCopy;
                if (head.pos == head.limit) {
                    source.head = head.pop();
                    SegmentPool.recycle(head);
                }
            }
        }
        @Override public void flush() throws IOException {
            out.flush();
        }
        @Override public void close() throws IOException {
            out.close();
        }
        @Override public Timeout timeout() {
            return timeout;
        }
        @Override public String toString() {
            return "sink(" + out + ")";
        }
    };
}
複製代碼

這裏構建並實現了一個 Sink 的匿名內部類並返回,主要實現了它的 write 方法,剩餘方法都是簡單地轉調到 OutputStream 的對應方法。異步

write 方法中,首先進行了一些狀態檢驗,這裏貌似在 Timeout 類中實現了對超時的處理,咱們稍後再分析。以後從 Buffer 中獲取了一個 Segment,並從中取出數據,計算出寫入的量後將其寫入 Sink 所對應的 OutputStreamide

Segment 採用了一種相似鏈表的形式進行鏈接,看來 Buffer 中維護了一個 Segment 鏈表,表明了數據的其中一段。這裏將 Buffer 中的數據分段取出並寫入了 OutputStream 中。oop

最後,經過 SegmentPool.recycle 方法對當前 Segment 進行回收。

從上面的代碼中咱們能夠獲取到以下信息:

  1. Buffer 其實就是內存中的一段數據的抽象,其中經過 Segment 以鏈表的形式保存用於存儲數據。
  2. Segment 存儲數據採用了分段的存儲方式,所以獲取數據時須要分段從 Segment 中獲取數據。
  3. 有一個 SegmentPool 池用於實現 Segment 的複用。
  4. Segment 的使用有點相似鏈表。

Source

SourceSink 同樣,也僅僅是一個接口:

public interface Source extends Closeable {
  long read(Buffer sink, long byteCount) throws IOException;

  Timeout timeout();

  @Override void close() throws IOException;
}
複製代碼

Okio 中能夠經過 source 方法根據 InputStream 建立一個 Source

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) {
        throw new IllegalArgumentException("in == null");
    } else if (timeout == null) {
        throw new IllegalArgumentException("timeout == null");
    } else {
        return new Source() {
            public long read(Buffer sink, long byteCount) throws IOException {
                if (byteCount < 0L) {
                    throw new IllegalArgumentException("byteCount < 0: " + byteCount);
                } else if (byteCount == 0L) {
                    return 0L;
                } else {
                    try {
                        timeout.throwIfReached();
                        Segment tail = sink.writableSegment(1);
                        int maxToCopy = (int)Math.min(byteCount, (long)(8192 - tail.limit));
                        int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
                        if (bytesRead == -1) {
                            return -1L;
                        } else {
                            tail.limit += bytesRead;
                            sink.size += (long)bytesRead;
                            return (long)bytesRead;
                        }
                    } catch (AssertionError var7) {
                        if (Okio.isAndroidGetsocknameError(var7)) {
                            throw new IOException(var7);
                        } else {
                            throw var7;
                        }
                    }
                }
            }
            public void close() throws IOException {
                in.close();
            }
            public Timeout timeout() {
                return timeout;
            }
            public String toString() {
                return "source(" + in + ")";
            }
        };
    }
}
複製代碼

這裏構建並實現了 Source 的一個匿名內部類並返回,應該就是 Source 的默認實現了。

它除了 read 方法其餘都只是簡單地調用了 InputStream 的對應方法,咱們重點看 read 方法:

首先它進行了一些相關的狀態檢測,以後經過 sink.writeableSegment 獲取到了一個能夠寫入的 Segment。以後從 InputStream 中讀取數據向 Segment 中寫入,讀取的大小被限制爲了 8192 個字節。

Buffer

BufferSinkSource 中都擔任了一個十分重要的地位,它對應了咱們內存中存儲的數據,對這些數據進行了抽象。下面讓咱們對 Buffer 進行分析:

Buffer 雖然是咱們內存中數據的抽象,但數據實際上並非存儲在 Buffer 中的,它在內部維護了一個 Segment 的循環鏈表,Segment 纔是真正存儲數據的地方。它經過 Segment 將數據分紅了幾段,經過鏈表進行鏈接。在 Buffer 內部封裝了許多 I/O 操做,都是在對 Segment 中的數據進行處理。

爲何要使用 Segment 對數據進行分段存儲而不直接存儲整個數據呢?因爲數據是分段存放的,這些段中的某一部分可能與另外一個 Buffer 中的數據剛好是相同的,此時就體現出了 Segment 的靈活性,咱們不須要將數據拷貝到另外一個 Buffer 中,只須要將其 Segment 指向這個重複段的 Segment 便可。同時,對於一些如將數據從 Source 轉移到 Sink 中這種狀況,也不須要進行拷貝,只須要將鏈表指向咱們的 Segment 便可,極大地提升了效率,同時節省了內存空間。

Segment

咱們首先看一下存儲數據的 Segment,它表明了數據中的一段,是一個雙向的循環鏈表,主要有如下的參數:

final class Segment {
    // Segment 存儲數據的大小
    static final int SIZE = 8192;
    // 進行數據共享的最小字節數
    static final int SHARE_MINIMUM = 1024;
    // 存儲數據的字節數組
    final byte[] data;
    // 用戶讀取數據的下一個起始位置
    int pos;
    // 能夠被寫入的下一個起始位置
    int limit;
    // 數據是否已被共享
    boolean shared;
    // 該字節數組是否屬於該Segment
    boolean owner;
    // 鏈表指針
    Segment next;
    // 鏈表指針
    Segment prev;
    // ...
}
複製代碼

能夠看到,其中 pos 表明了下一次讀取的起始位置,而 limit 表明了下一次寫入的起始位置,咱們能夠根據它們兩個值將整個 Segment 的空間分爲如圖的三段:

image-20190921122845126

其中已讀區域的數據咱們之後都不會再用到,已寫入區域的數據正在等待讀取,而空閒區域尚未填入數據,能夠進行寫入。

共享機制

同時,Segment 還支持了對數據的共享,經過 sharedowner 字段分別代表了數據是否已被共享以及其是否屬於當前 Segment。同時它提供了兩種拷貝方式: sharedCopy 以及 unsharedCopy

unsharedCopy 返回了一個新的 Segment,並將 data 數組經過 clone 方法拷貝到了新 Segment 中:

/** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {
    return new Segment(data.clone(), pos, limit, false, true);
}
複製代碼

sharedCopy 一樣返回了一個新的 Segment,但其 data 數組是與新 Segment 進行共享的:

/**
 * Returns a new segment that shares the underlying byte array with this. Adjusting pos and limit
 * are safe but writes are forbidden. This also marks the current segment as shared, which
 * prevents it from being pooled.
 */
final Segment sharedCopy() {
    shared = true;
    return new Segment(data, pos, limit, true, false);
}
複製代碼

同時經過註釋咱們能夠看到,當數據共享後,爲了保證安全性,禁止了寫入操做。同時將被拷貝的 Segment 也標記爲了 shared,從而防止其被回收。

這樣的設計一樣是爲了減小拷貝,從而提升 I/O 的效率。

合併與分割

Segment 還支持了與前一個 Segment 的合併以及對自身的分割操做,從而使得使用者可以更靈活地操做。

合併操做會在當前 Segment 與它的前一個節點都沒有超過其大小的一半時,將兩者的數據進行合併,並將當前 Segment 進行回收,從而增大內存的利用效率:

/**
 * Call this when the tail and its predecessor may both be less than half
 * full. This will copy data so that segments can be recycled.
 */
public final void compact() {
    if (prev == this) throw new IllegalStateException();
    // 上一個節點的數據不是能夠寫入的(是共享數據),取消合併
    if (!prev.owner) return;
    // 計算當前節點與前一個節點的剩餘空間
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    // 沒有足夠的寫入空間時,不進行合併å
    if (byteCount > availableByteCount) return;
    // 進行合併,將當前節點數據寫入前一節點
    writeTo(prev, byteCount);
    // 從鏈表中刪除當前節點,並進行回收
    pop();
    SegmentPool.recycle(this);
}
複製代碼

而分割操做則會將 Segment 中的數據分割爲 [pos, pos+byteCount)[pos+byteCount, limit) 的兩段:

public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;
    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
    	// 若是拷貝量大於1024字節,經過共享的形式
      prefix = sharedCopy();
    } else {
    	// 拷貝量低於1024字節,經過arrayCopy進行拷貝
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }
    // 對limit及pos進行修改
    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
}
複製代碼

這裏首先對不一樣數據段的數據進行了處理,若是數據段大於了 1024 字節,則將數據經過共享交給了分割的前一個節點,兩端 Segment 公用同一個 data 數組,不然經過拷貝的形式構建一個新的 Segment

爲何這裏須要對數據大小的不一樣採用不一樣的處理方式呢?咱們能夠看到上面的註釋,裏面給出了答案:首先,爲了不拷貝數據帶來的性能開銷,加入了共享 Segment 的功能。可是因爲共享的數據是隻讀的,若是有不少很短的數據段的話,使用的表現並不會很好,所以只有當拷貝的數據量比較大時,纔會進行 Segment 的共享。

以後,將兩者的 poslimit 都進行了設置。因爲 pos 以前的部分及 limit 以後的部分都不會影響到咱們正常的讀取和寫入,所以咱們能夠不用關心它們目前的狀態,不必再對它們進行一些如填充零之類的操做。

SegmentPool

同時,Okio 還使用了 SegmentPool 來實現一個對象池,從而避免 Segment 頻繁地建立及銷燬所帶來的性能開銷。

SegmentPool 的實現十分簡單,它內部維護了一個單鏈表,用於存儲被回收存在池中的 Segment,其最大容量被限制在了 64 k。

當須要 Segment 時,能夠經過 take 方法來獲取一個被回收的對象:

static Segment take() {
    synchronized (SegmentPool.class) {
        if (next != null) {
          Segment result = next;
          next = result.next;
          result.next = null;
          byteCount -= Segment.SIZE;
          return result;
        }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock. } 複製代碼

它會在單鏈表中找到一個空閒的 Segment 並初始化後返回。若當前鏈表中沒有對象,則會建立一個新的 Segment

Segment 使用完畢時,首先能夠經過 Segmentpop 操做將其從鏈表中移除,以後能夠調用 SegmentPool.recycle 方法對其進行回收:

static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
    	if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
    	byteCount += Segment.SIZE;
    	segment.next = next;
    	segment.pos = segment.limit = 0;
    	next = segment;
    }
}
複製代碼

回收 Segment 時,不會對只讀的 Segment 進行回收,若 Segment 個數超過了上限,則不會對該 Segment 進行回收。

數據轉移

Okio 與 java.io 有個很大的不一樣,體如今 Buffer 的數據轉移上,咱們能夠經過其 copyTo 方法來完成數據的轉移。之因此叫轉移,由於它相對於複製來講,是有很大的數據提高的。例如咱們能夠看到兩個 Buffer 之間的數據轉移是如何進行的:

public final Buffer copyTo(Buffer out, long offset, long byteCount) {
    if (out == null) throw new IllegalArgumentException("out == null");
    checkOffsetAndCount(size, offset, byteCount);
    if (byteCount == 0) return this;
    out.size += byteCount;

    // 跳過不進行拷貝的 Segment
    Segment s = head;
    for (; offset >= (s.limit - s.pos); s = s.next) {
    	offset -= (s.limit - s.pos);
    }
    for (; byteCount > 0; s = s.next) {
        // 經過 sharedCopy 將數據拷貝到 copy 中	
        Segment copy = s.sharedCopy();
        copy.pos += offset;
        copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
        // 插入 Segment
        if (out.head == null) {
          out.head = copy.next = copy.prev = copy;
        } else {
          out.head.prev.push(copy);
        }
        byteCount -= copy.limit - copy.pos;
        offset = 0;
    }
    return this;
}
複製代碼

從上面的代碼中能夠看出,實際上這個過程是經過了 Segment 共享實現的,所以不須要進行拷貝,極大地提升了數據轉移的效率。

BufferedSource

咱們能夠經過 Okio.buffer 方法對一個普通的 Source 進行包裝,獲取一個具備緩衝能力的 BufferSource,它是一個接口,定義了一系列讀取的方法:

public interface BufferedSource extends Source, ReadableByteChannel {
    @Deprecated
    Buffer buffer();

    Buffer getBuffer();

    boolean exhausted() throws IOException;

    void require(long byteCount) throws IOException;

    boolean request(long byteCount) throws IOException;

    byte readByte() throws IOException;

    short readShort() throws IOException;

    short readShortLe() throws IOException;
    
    // ...一系列讀取方法

    long indexOf(byte b, long fromIndex) throws IOException;

    long indexOf(byte b, long fromIndex, long toIndex) throws IOException;

    long indexOf(ByteString bytes) throws IOException;

    long indexOf(ByteString bytes, long fromIndex) throws IOException;

    long indexOfElement(ByteString targetBytes) throws IOException;

    long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;

    boolean rangeEquals(long offset, ByteString bytes) throws IOException;

    boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
        throws IOException;

    BufferedSource peek();

    InputStream inputStream();
}
複製代碼

它主要有兩個實現類:BufferRealBufferedSource。其中 RealBufferedSource 顯然是咱們經過 buffer 方法包裝後獲得的類,而 Buffer 實際上對 BufferSource 也進行了實現,經過一系列 read 方法能夠從 Segment 中讀取處對應的數據。而咱們的 RealBufferedSource 則是 Source 的一個包裝類,而且其維護了一個 Buffer,從而提升 Input 的效率。咱們先分析其思路,再來討論爲何這樣能提升 Input 的效率。

咱們能夠首先看到 RealBufferedSource 的讀取方法,這裏以 readByteArray 方法舉例:

@Override public byte[] readByteArray(long byteCount) throws IOException {
    require(byteCount);
    return buffer.readByteArray(byteCount);
}
複製代碼

這裏首先調用了 require 方法,以後再從 buffer 中將數據讀出,看來在 require 中將數據先讀取到了 buffer 中。

咱們看到 require 方法:

@Override public void require(long byteCount) throws IOException {
    if (!request(byteCount)) throw new EOFException();
}
複製代碼

它實際上轉調到了 request 方法:

@Override public boolean request(long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");
    while (buffer.size < byteCount) {
    	if (source.read(buffer, Segment.SIZE) == -1) return false;
    }
    return true;
}
複製代碼

request 方法中,不斷地向 buffer 中讀取,每次讀取 Segment.SIZE 也就是 8192 個字節。也就是它讀取的量是 byteCount 以 8192 字節向上取整。爲何它不恰好讀取 byteCount 個字節,要讀滿 8192 個字節呢?

這就是一種預取思想,由於 I/O 操做每每是很是頻繁的,若是進行了一次讀取,那就頗有可能還會進行下一次讀取,所以咱們預先把它下一次可能讀取的部分一塊兒讀取出來,這樣下次讀取時,就不須要再對系統進行請求以獲取數據了,能夠直接從咱們的 buffer 中拿到。這就是爲何說加入 buffer 提升了咱們 I/O 的效率。

可能還有人會問,爲何這樣能提升 I/O 效率呢,不都是讀了同樣的量麼?這個就涉及到一些操做系統的知識了。在現代的操做系統中,咱們的程序每每運行在用戶態,而用戶態其實是沒有進行 I/O 的權限的,所以每每都是向操做系統發起請求,切換到內核態,再進行 I/O,完成後再次回到用戶態。這樣的用戶態及內核態的切換其實是很是耗時的,而且這個過程當中也伴隨着拷貝。所以採用上面的 buffer 能夠有效地減小咱們的這種系統 I/O 調用,加快咱們的效率。

BufferedSink

咱們一樣能夠經過 Okio.buffer 方法對一個普通的 Sink 進行包裝,從而獲取一個帶有 buffer 緩衝能力的 BufferedSinkBufferedSink 也是一個接口 ,內部定義了一系列寫入的方法:

public interface BufferedSink extends Sink, WritableByteChannel {
    Buffer buffer();

    BufferedSink write(ByteString byteString) throws IOException;

    BufferedSink write(byte[] source) throws IOException;

    BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;

    long writeAll(Source source) throws IOException;

    BufferedSink write(Source source, long byteCount) throws IOException;
    
    // ...一些對 write 的封裝

    @Override void flush() throws IOException;

    BufferedSink emit() throws IOException;

    BufferedSink emitCompleteSegments() throws IOException;

    OutputStream outputStream();
}
複製代碼

BufferedSink 一樣有兩個實現類:BufferRealBufferedSink,咱們能夠先看到 RealBufferedSink,它是一個 Sink 的包裝類,而且內部維護了一個 Buffer

write

咱們先看看其寫入方法:

@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, offset, byteCount);
    return emitCompleteSegments();
}
複製代碼

這裏拿了一個簡單的寫入 byte[] 的方法進行了舉例,它首先將數據寫入了 buffer 中,以後調用了 emitCompleteSegments 方法。能夠看到這裏並無對 sink真正進行寫入,那寫入到底是在哪裏進行的呢?咱們看看 emitCompleteSegments 方法中作了什麼:

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}
複製代碼

這裏首先調用 buffer.completeSegmentByteCount 方法獲取到了 buffer 中已寫入但未被讀取的部分字節數(只包括已經被寫滿了的 Segment 中的),以後調用 sink.write 將其寫入到了 sink 中。

這裏其實很奇怪,按道理來講 buffer 的做用是經過緩存來進行一些優化,但這個方法將數據寫入 buffer 後,數據又當即被寫入到了 sink 中。這樣相比直接寫入到 sink 中,反而會帶來性能的損耗啊。這裏爲何要這樣作呢?

我看到這裏時對這段也比較奇怪,但考慮到 Okio 的總體設計來講,應該是把 Buffer 當作了一個數據統一的中轉站,將讀寫的優化統一放在了 Buffer 中進行,所以考慮到總體的一致性,將 RealBufferedSink 也採用了經過 Buffer 中轉的方式編寫,應該算是一種妥協吧。而且採用 Buffer 還有好處就是,一份數據既能夠用於讀也能夠用於寫。

flush

RealBufferedSink 還支持了 flush 操做,經過 flush 方法能夠將緩衝區的全部數據寫入 sink 中:

@Override public void flush() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    if (buffer.size > 0) {
      sink.write(buffer, buffer.size);
    }
    sink.flush();
}
複製代碼

emit

RealBufferedSink 還具備 emit 功能,分別是 emitCompleteSegments 方法及 emit 方法,前者是將全部已填滿的 Segment 中已寫入未讀取的數據寫入 sink,後者則是將 buffer 中全部已寫入未讀取數據寫入 sink(相似 flush):

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}

@Override public BufferedSink emit() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.size();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}
複製代碼

Timeout 超時機制

Okio 中經過 Timeout 類實現了 SinkSource 的超時機制,在 Sink 的寫入與 Source 的讀取時對超時進行判斷,若是超時則中斷寫入等操做。其中對於包裝了普通 InputStream / OutputStream 的使用了普通的 Timeout,而對於對 Socket 進行了包裝的則使用 AsyncTimeout

Timeout

咱們先對普通的 Timeout 進行研究,Timeout 中主要有兩個值,timeoutdeadline ,分別表明了 wait 的最大等待時間與完成某個工做的超時時間。

deadline

對於 deadline,咱們能夠經過 deadline 方法進行設定:

/** Set a deadline of now plus {@code duration} time. */
public final Timeout deadline(long duration, TimeUnit unit) {
    if (duration <= 0) throw new IllegalArgumentException("duration <= 0: " + duration);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    return deadlineNanoTime(System.nanoTime() + unit.toNanos(duration));
}
複製代碼

以後,在每一個須要檢查超時的地方須要調用該 TimeoutthrowIfReached 方法(如 Sinkwrite 方法):

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      Thread.currentThread().interrupt(); // Retain interrupted status.
      throw new InterruptedIOException("interrupted");
    }
    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
}
複製代碼

這裏很簡單,就是進行時間的校驗,若達到了設定的時間,則拋出異常從而中斷後續操做。

timeout

同時,Timeout 還實現了對 monitor 進行 wait 的超時機制,經過 waitUntilNotified 方法能夠等待 monitornotify,若等待的過程超過了 Timeout 所設定的時間或當前線程被中斷,則會拋出異常,從而避免一直進行等待。而且,該方法須要在 synchronized 代碼塊中調用,以保證線程安全

在咱們構造了一個 Timeout 後,可使用 timeout 方法對其 wait 超時時間進行設定:

public Timeout timeout(long timeout, TimeUnit unit) {
    if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    this.timeoutNanos = unit.toNanos(timeout);
    return this;
}
複製代碼

這裏主要是將 timeoutNanos 設置爲了對應的值。接着咱們看到 waitUntilNotified 方法:

/**
 * Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either
 * the thread is interrupted or if this timeout elapses before {@code monitor} is notified. The
 * caller must be synchronized on {@code monitor}.
 */
public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
    try {
        boolean hasDeadline = hasDeadline();
        long timeoutNanos = timeoutNanos();
        // 沒有 timeout 的設定的話,直接調用 monitor 的 wait 方法
        if (!hasDeadline && timeoutNanos == 0L) {
            monitor.wait(); // There is no timeout: wait forever.
            return;
        }
        // 計算咱們要 wait 的時間
        long waitNanos;
        long start = System.nanoTime();
        // 下面主要就是等待 timeout 與 deadline 中最小的那個
        if (hasDeadline && timeoutNanos != 0) {
            long deadlineNanos = deadlineNanoTime() - start;
            waitNanos = Math.min(timeoutNanos, deadlineNanos);
        } else if (hasDeadline) {
            waitNanos = deadlineNanoTime() - start;
        } else {
            waitNanos = timeoutNanos;
        }
        // wait 相應時間
        long elapsedNanos = 0L;
        if (waitNanos > 0L) {
            long waitMillis = waitNanos / 1000000L;
            monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
            elapsedNanos = System.nanoTime() - start;
        }
        // 若是尚未 notify,則拋出異常
        if (elapsedNanos >= waitNanos) {
            throw new InterruptedIOException("timeout");
        }
    } catch (InterruptedException e) {
    	// 線程若是在這個過程當中被 interrupt,則拋出異常
    	Thread.currentThread().interrupt();
    	throw new InterruptedIOException("interrupted");
    }
}
複製代碼

AsyncTimeout

AsyncTimeoutTimeout 的子類,接下來咱們看看 AsyncTimeout 是如何對 Socket 中的超時進行處理的。

首先能夠看到 AsyncTimeout 中保存了一個 head 及一個 next 引用,顯然這裏是有一個鏈表存儲的 AsyncTimeout 隊列的:

// AsyncTimeout 隊列的頭部
static @Nullable AsyncTimeout head;
// 當前節點是否在隊列中
private boolean inQueue;
// 下一個節點
private @Nullable AsyncTimeout next;
複製代碼

這裏感受與 MessageQueue 有點類似,猜想 AsyncTimeout 會根據超時的時間按序存儲在隊列中。

而且從 AsyncTimeout 的 JavaDoc 中能夠看到,它須要使用者在異步的事件開始時調用 enter 方法,結束時調用 exit 方法。同時它在背後開闢了一個線程對超時進行定時檢查。

enter & exit

讓咱們先看到 enter 方法:

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline
    // 時間到了就不加入隊列了
    if (timeoutNanos == 0 && !hasDeadline) {
    	return;
    }
    inQueue = true;
    // 開啓線程對超時進行檢查
    scheduleTimeout(this, timeoutNanos, hasDeadline);
}
複製代碼

上面主要是將其 inQueue 設置爲了 true,以後調用 scheduleTimeout 方法對超時進行定時檢查。咱們暫時先不關注 scheduleTimeout 的具體實現。

接着咱們看到 exit 方法:

/** Returns true if the timeout occurred. */
public final boolean exit() {
    if (!inQueue) return false;
    inQueue = false;
    return cancelScheduledTimeout(this);
}
複製代碼

這裏也很是簡單,就是將 inQueue 設置爲了 false,並調用 cancelScheduledTimeout 方法中止前面的定時校驗線程。

scheduleTimeout

咱們接下來看看這個定時校驗的具體實現,咱們先看到 scheduleTimeout 方法:

private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // 若是隊列中尚未節點,構造一個頭節點並啓動 Watchdog
    if (head == null) {
    	head = new AsyncTimeout();
    	new Watchdog().start();
    }
    long now = System.nanoTime();
    // 計算具體超時時間,主要是取 timeout 與 deadline 的最小值
    if (timeoutNanos != 0 && hasDeadline) {
    	// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
    	// Math.min() is undefined for absolute values, but meaningful for relative ones.
    	node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
    	node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
     	node.timeoutAt = node.deadlineNanoTime();
    } else {
    	throw new AssertionError();
    }
    long remainingNanos = node.remainingNanos(now);
    // 按剩餘時間從小到大插入到隊列中
    for (AsyncTimeout prev = head; true; prev = prev.next) {
        if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
            node.next = prev.next;
            prev.next = node;
            if (prev == head) {
                // 插入在隊列頭部,進行 notify 
                AsyncTimeout.class.notify();
            }
            break;
        }
    }
}
複製代碼

上面的邏輯主要分爲如下幾步:

  1. 若隊列中尚未節點,構造一個頭節點而且啓動 WatchdogWatchdog 是一個 Thread 的子類,也就是咱們的定時掃描線程。
  2. 計算該 Timeout 的超時時間,取了 timeoutdeadline 的最小值
  3. 將該 timeout 按剩餘時間從小到大的順序插入隊列中
  4. 若插入的位置是隊列的頭部,則進行 notify(這裏還沒法瞭解到意圖,咱們能夠日後看看)

cancelScheduledTimeout

接着咱們看看 cancelScheduledTimeout 作了些什麼:

private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // Remove the node from the linked list.
    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
        if (prev.next == node) {
            prev.next = node.next;
            node.next = null;
            return false;
        }
    }
    // The node wasn t found in the linked list: it must have timed out!
    return true;
}
複製代碼

這裏很簡單,就是將該 AsyncTimeout 從隊列中移除,若返回 true,則表明超時已經發生,若返回 false,則表明超時還未發生,該 Timeout 被移除。這個返回值一樣反映到了咱們的 exit 方法返回值中。

Watchdog

接着咱們看看 Watchdog 到底是如何對超時進行檢測的:

private static final class Watchdog extends Thread {
    public void run() {
    	while (true) {
    	    try {
    	    	AsyncTimeout timedOut;
    	    	synchronized (AsyncTimeout.class) {
    	    	    timedOut = awaitTimeout();
    	    	    // 找不到要 interrupt 的節點,繼續尋找
    	    	    if (timedOut == null) continue;
    	    	    // 隊列已空,中止線程
    	    	    if (timedOut == head) {
    	    	    	head = null;
    	    	    	return;
    	    	    }
    	    	}
    	    	// 調用 timeout 方法通知超時
    	    	timedOut.timedOut();
    	    } catch (InterruptedException ignored) {
    	    }
    	}
    }
}
複製代碼

Wachdog 中不斷調用 awaitTimeout 方法嘗試獲取一個能夠中止的 Timeout,以後調用了其 timeOut 方法通知外部已超時。

awaitTimeout

咱們能夠看看 awaitTimeout 作了什麼:

static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
    AsyncTimeout node = head.next;
    // 隊列爲空,wait 直到有新的節點加入
    if (node == null) {
    	long startNanos = System.nanoTime();
    	AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
    	return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
    	    ? head  // The idle timeout elapsed.
    	    : null; // The situation has changed.
    }
    long waitNanos = node.remainingNanos(System.nanoTime());
    // 計算該節點須要 wait 的時間
    if (waitNanos > 0) {
    	// wait 對應的時間
    	long waitMillis = waitNanos / 1000000L;
    	waitNanos -= (waitMillis * 1000000L);
    	AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
    	return null;
    }
    // wait 事後已超時,將其移出隊列
    head.next = node.next;
    node.next = null;
    return node;
}
複製代碼

這裏主要有如下幾步:

  1. 若是隊列爲空,wait 直到有新的節點加入隊列
  2. 計算節點須要 wait 的時間並 wait 對應時間
  3. 時間到後,說明該節點超時,將其移出隊列

經過這裏的代碼,咱們就知道爲何前面在鏈表頭部加入節點時須要進行一次 notify 了,主要有兩個目的:

  1. 若隊列中沒有元素,能夠經過 notify通知此處有新元素加入隊列。
  2. 因爲插入在頭部,說明其比後面的節點的須要等待時間更少,所以須要中止前一次 wait 來計算該新的 Timeout 所須要的等待時間,並對其進行超時處理。

這裏的處理和 Android 中 MessageQueue 的設計仍是有殊途同歸之妙的,咱們能夠學習一波。

sink & source

AsyncTimeout 還實現了 sinksource 方法來實現了支持 AsyncTimeout 超時機制的 SinkSource,主要是經過在其各類操做先後分別調用 enterexit。下面以 Sink 爲例:

public final Sink sink(final Sink sink) {
  return new Sink() {
    @Override public void write(Buffer source, long byteCount) throws IOException {
      checkOffsetAndCount(source.size, 0, byteCount);
      while (byteCount > 0L) {
        // Count how many bytes to write. This loop guarantees we split on a segment boundary.
        long toWrite = 0L;
        for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
          int segmentSize = s.limit - s.pos;
          toWrite += segmentSize;
          if (toWrite >= byteCount) {
            toWrite = byteCount;
            break;
          }
        }
        // Emit one write. Only this section is subject to the timeout.
        boolean throwOnTimeout = false;
        enter();
        try {
          sink.write(source, toWrite);
          byteCount -= toWrite;
          throwOnTimeout = true;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }
    }
    
    @Override public void flush() throws IOException {
      boolean throwOnTimeout = false;
      enter();
      try {
        sink.flush();
        throwOnTimeout = true;
      } catch (IOException e) {
        throw exit(e);
      } finally {
        exit(throwOnTimeout);
      }
    }
    
    @Override public void close() throws IOException {
      boolean throwOnTimeout = false;
      enter();
      try {
        sink.close();
        throwOnTimeout = true;
      } catch (IOException e) {
        throw exit(e);
      } finally {
        exit(throwOnTimeout);
      }
    }
    
    @Override public Timeout timeout() {
      return AsyncTimeout.this;
    }
    
    @Override public String toString() {
      return "AsyncTimeout.sink(" + sink + ")";
    }
}	
複製代碼

比較簡單,這裏就不作太多解釋了。

總結

Okio 是一套基於 java.io 進行了一系列優化的十分優秀的 I/O 庫,它經過引入了 Segment 機制大大下降了數據遷移的成本,減小了拷貝的次數,而且對 java.io 繁瑣的體系進行了簡化,使得整個庫更易於使用。在 Okio 中還實現了不少有其它功能的 SourceSink,感興趣的讀者能夠自行翻閱一下源碼。同時各位能夠去回顧一下前面的 OkHttp 源碼解析中,OkHttp 是如何使用 Okio 進行 Socket 的數據寫入及讀取的。

相關文章
相關標籤/搜索