Okio 框架源碼學習

Retrofit,OkHttp,Okio Square 安卓平臺網絡層三板斧源碼學習
基於 okio 1.13.0 版本 okio github 地址

簡介

Okio 主要是替代 java.io 和 java.nio 那一套複雜的 api 調用框架,讓數據訪問、存儲和處理更加便捷。java

Okio 是 OkHttp 的基石,而 OkHttp 是 Retrofit 的基石。這三個框架合稱『Square 安卓平臺網絡層三板斧』node

使用方式

參考 OkioTestgit

……
@Test public void readWriteFile() throws Exception {
    File file = temporaryFolder.newFile();

    BufferedSink sink = Okio.buffer(Okio.sink(file));
    sink.writeUtf8("Hello, java.io file!");
    sink.close();
    assertTrue(file.exists());
    assertEquals(20, file.length());

    BufferedSource source = Okio.buffer(Okio.source(file));
    assertEquals("Hello, java.io file!", source.readUtf8());
    source.close();
}

……
複製代碼

經過 OkioTest 能夠大體明白 Okio 主要有 『讀』、『寫』兩大類操做。能夠操做的對象爲:github

1. 文件
2. 文件路徑描述類 Path
3. Socket
4. OutputStream
5. InputStream
複製代碼

Okio 經過 sink(xxx) 去寫一個對象,經過 source(xxx)去讀一個對象。api

一覽 Okio 讀寫框架

經過 Okio.buffer() 得到用來讀寫的 BufferedSource、BufferedSink緩存

BufferedSink sink = Okio.buffer(Okio.sink(file));
BufferedSource source = Okio.buffer(Okio.source(file));
複製代碼

進一步查看 buffer() 方法網絡

public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
}

public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
}
複製代碼
RealBufferedSink

看下 RealBufferedSink 類框架

final class RealBufferedSink implements BufferedSink {
    public final Buffer buffer = new Buffer();
    public final Sink sink;
    boolean closed;

    RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
    }
    ……
}
複製代碼

RealBufferedSink 實現了 BufferedSink 接口,BufferedSink 實現了 Sink 接口。異步

而 Sink 實現了 Closeable, Flushable 接口。socket

1. Flushable 接口只定義了一個方法 flush() ,用來實現把數據從緩存區寫入到底層輸入流。
2. Closeable 接口定義 close() ,用來關閉流。
3. Sink 接口又定義了一個 write(Buffer source, long byteCount) 和 timeout() 用來寫入數據和設置超時。
4. BufferedSink 接口則定義了一堆 wirteXXX(……) 用來操做不一樣類型的數據寫入。
複製代碼

在看 RealBufferedSink 的成員變量

public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
複製代碼

這裏出現了一個 Buffer 對象,一個從構造函數裏面傳入的 Sink 對象,以及一個用來記錄流是否關閉的 boolean 標誌。

RealBufferedSink 的各類 wirteXXX(……)大都以下

@Override public BufferedSink writeXXX(……) throws IOException {
  if (closed) throw new IllegalStateException("closed");
  buffer.writeXXX(……);
  return emitCompleteSegments();
}
複製代碼

可見寫入數據的方法,都是有 buffer 對象實現。而在 emitXXX() 和 flush() 方法中則調用了 sink.write(buffer, byteCount) 和 sink.flush()

RealBufferedSource

RealBufferedSource 和 RealBufferedSink 相似

final class RealBufferedSource implements BufferedSource {
    public final Buffer buffer = new Buffer();
    public final Source source;
    boolean closed;

    RealBufferedSource(Source source) {
        if (source == null) throw new NullPointerException("source == null");
        this.source = source;
    }
}
複製代碼

RealBufferedSource 實現了 BufferedSource 接口,BufferedSource 實現了 Source 接口。

Source 接口一樣也實現了 Closeable 接口。

1. Source 集成了 Closeable 接口,表示 Source 提供了一個 close 方法關閉讀取數據的流。
2. Source 定義了一個 read(Buffer sink, long byteCount) 用來讀取數據,一個 timeout() 方法用來設置讀取超時。
3. BufferedSource 定義了不少 readXXX(……) 用來讀取數據。
複製代碼

RealBufferedSource 中的 readXXX(……) 方法和 RealBufferedSink 中的 writeXXX(……) 相似,都是經過成員變量 buffer 和 構造對象時傳入的 Source 對象配合起來讀取數據。

總結一下整個讀寫框架的結構以下:

okio_01.png

對全部讀寫對象的統一處理

不管是 File 、Path、InputStream、OutputStream 、Socket ,在 Okio 框架中只要一個簡單的 Okio.sink(……) 方法便可得到對應的輸入流(RealBufferedSink)和輸出流(RealBufferedSource)

並且 Okio 還給輸入/輸出流的都提供一個額外參數:Timeout,用來設置讀寫的超時設置。

全部的 sink 方法,都會調用到

private static Sink sink(final OutputStream out, final Timeout timeout) {
  if (out == null) throw new IllegalArgumentException("out == null");
  if (timeout == null) throw new IllegalArgumentException("timeout == null");

  return new Sink() {
    @Override public void write(Buffer source, long byteCount) throws IOException {
      checkOffsetAndCount(source.size, 0, byteCount);
      while (byteCount > 0) {
        timeout.throwIfReached();
        Segment head = source.head;
        int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
        out.write(head.data, head.pos, toCopy);

        head.pos += toCopy;
        byteCount -= toCopy;
        source.size -= toCopy;

        if (head.pos == head.limit) {
          source.head = head.pop();
          SegmentPool.recycle(head);
        }
      }
    }

    @Override public void flush() throws IOException {
      out.flush();
    }

    @Override public void close() throws IOException {
      out.close();
    }

    @Override public Timeout timeout() {
      return timeout;
    }

    @Override public String toString() {
      return "sink(" + out + ")";
    }
  };
}
複製代碼

Okio.sink() 會建立一個匿名內部類的實例,實現了 write 方法,用來寫入數據到 OutputStream(File 、Path、Socket 都會被轉成成 OutputStream(),每次寫入數據的時候,都會檢測是否超時。(超時機制後面後講)

Okio.Source() 相似會建立一個實現 Source 接口的匿名內部類實例。實現 read 方法 ,負責從 InputStream 中讀取數據。

Okio 在讀寫數據的時候,裏面都會用用一個 Segment 對象。Segment 是 Okio 定義的一個***鏈表結構***的數據片斷,每一個 Segment 能夠最多存放 8K 的字節。

萬能的 Buffer

寫數據的時候 Okio 會先把數據寫到 buffer 中

BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();
複製代碼

Okio.buffer() 返回的是 RealBufferedSink

@Override public BufferedSink writeUtf8(String string) throws IOException {
  if (closed) throw new IllegalStateException("closed");
  buffer.writeUtf8(string);
  return emitCompleteSegments();
}
複製代碼

查看 writeUtf8

@Override public Buffer writeUtf8(String string) {
  return writeUtf8(string, 0, string.length());
}
複製代碼

而後把 String 變成一個 Segment 鏈表

@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
    ……

    // Transcode a UTF-16 Java String to UTF-8 bytes.
    for (int i = beginIndex; i < endIndex;) {
      int c = string.charAt(i);

      if (c < 0x80) {
        Segment tail = writableSegment(1);
        byte[] data = tail.data;
        ……
        while (i < runLimit) {
          c = string.charAt(i);
          if (c >= 0x80) break;
          data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
        }    
        ……

      } else if (c < 0x800) {
        // Emit a 11-bit character with 2 bytes.
        writeByte(c >>  6        | 0xc0); // 110xxxxx
        writeByte(c       & 0x3f | 0x80); // 10xxxxxx
        i++;

      } ……
    }

    return this;
  }
複製代碼

經過 writableSegment 是否是要開闢新的 Segment 到隊列尾部

Segment writableSegment(int minimumCapacity) {
  if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

  if (head == null) {
    head = SegmentPool.take(); // Acquire a first segment.
    return head.next = head.prev = head;
  }

  Segment tail = head.prev;
  if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
    tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
  }
  return tail;
}
複製代碼

在看 emitCompleteSegments()

@Override
public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}
複製代碼

buffer.completeSegmentByteCount() 用來計算 Segment 的緩存的字節長度

public long completeSegmentByteCount() {
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
        result -= tail.limit - tail.pos;
    }

    return result;
}
複製代碼

sink.write(buffer, byteCount) 就是以前傳入的集成的 Sink 匿名類。

總結一下整個流程

okio_02.png

讀數據的時候 Buffer 起到的做用相似,直接貼一下流程圖

okio_03.png

Okio 超時機制

Okio 能夠給他 OutputStream 、InputStream 增長一個超市設置。讀寫文件時會設置一個默認的 TimeOut ,這個方法是個空實現。

在讀寫 Socket 的時候,Okio 給咱們展現一個如何設置一個異步的超時機制,用來在 Socket 讀寫超時時關閉流。

public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
}
複製代碼

先看 timeout(socket)

private static AsyncTimeout timeout(final Socket socket) {
    return new AsyncTimeout() {
        @Override
        protected IOException newTimeoutException(@Nullable IOException cause) {
            ……
        }

        @Override
        protected void timedOut() {
            try {
                socket.close();
            }……
        }
    };
}
複製代碼

這裏看出會返回一個 AsyncTimeout 的匿名對象,主要在 timeOut() 中關閉 Socket。

sink(socket.getOutputStream(), timeout) 方法在上面已經看過了主要看其中的一句代碼

private static Sink sink(final OutputStream out, final Timeout timeout) {
    ……
    return new Sink() {
        @Override
        public void write(Buffer source, long byteCount) throws IOException {
            ……
            while (byteCount > 0) {
                timeout.throwIfReached();
                ……
            }
        }
        ……
    };
}
複製代碼

在看一下 throwIfReached 方法

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
        throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
        throw new InterruptedIOException("deadline reached");
    }
}
複製代碼

若是 hasDeadline 是 true,而且 deadlineNanoTime 大於 System.nanoTime() 來判斷是否達超時。

在看 timeout.sink(sink)

public final Sink sink(final Sink sink) {
    return new Sink() {
        @Override
        public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);

            while (byteCount > 0L) {
                // Count how many bytes to write. This loop guarantees we split on a segment boundary.
                long toWrite = 0L;
                for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
                    int segmentSize = s.limit - s.pos;
                    toWrite += segmentSize;
                    if (toWrite >= byteCount) {
                        toWrite = byteCount;
                        break;
                    }
                }

                // Emit one write. Only this section is subject to the timeout.
                boolean throwOnTimeout = false;
                enter();
                try {
                    sink.write(source, toWrite);
                    byteCount -= toWrite;
                    throwOnTimeout = true;
                } catch (IOException e) {
                    throw exit(e);
                } finally {
                    exit(throwOnTimeout);
                }
            }
        }

        @Override
        public void flush() throws IOException {
            boolean throwOnTimeout = false;
            enter();
            try {
                sink.flush();
                throwOnTimeout = true;
            } catch (IOException e) {
                throw exit(e);
            } finally {
                exit(throwOnTimeout);
            }
        }

        @Override
        public void close() throws IOException {
            boolean throwOnTimeout = false;
            enter();
            try {
                sink.close();
                throwOnTimeout = true;
            } catch (IOException e) {
                throw exit(e);
            } finally {
                exit(throwOnTimeout);
            }
        }

        @Override
        public Timeout timeout() {
            return AsyncTimeout.this;
        }

        @Override
        public String toString() {
            return "AsyncTimeout.sink(" + sink + ")";
        }
    };
}
複製代碼

能夠看出 timeout.sink(sink) 從新包裝了 Sink 給 Sink 的每一個方法都增長一個 enter() 方法

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
        return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
}
複製代碼

這裏會發現若是知足了條件,會執行 scheduleTimeout 方法。可是默認狀況下,條件不會被知足。

查看一下 SocketTimeoutTest

@Test
public void readWithoutTimeout() throws Exception {
    Socket socket = socket(ONE_MB, 0);
    BufferedSource source = Okio.buffer(Okio.source(socket));
    source.timeout().timeout(5000, TimeUnit.MILLISECONDS);
    source.require(ONE_MB);
    socket.close();
}
複製代碼

這裏能夠看到,須要調用 source.timeout().timeout(5000, TimeUnit.MILLISECONDS)

public Timeout timeout(long timeout, TimeUnit unit) {
    if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    this.timeoutNanos = unit.toNanos(timeout);
    return this;
}
複製代碼

這裏能夠看到 timeoutNanos 在這裏賦值了。因此設置 timeout(5000, TimeUnit.MILLISECONDS) 後會出發 scheduleTimeout(this, timeoutNanos, hasDeadline)

private static synchronized void scheduleTimeout(
        AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
    if (head == null) {
        head = new AsyncTimeout();
        new Watchdog().start();
    }
    ……
    // Insert the node in sorted order.
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
        if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
            node.next = prev.next;
            prev.next = node;
            if (prev == head) {
                AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
            }
            break;
        }
    }
}
複製代碼

這裏用到一個同步鎖、啓動一個 Watchdog 線程。而且根據 timeout 的超時時間,把 AsyncTimeout 添加到一個任務隊列中。

private static final class Watchdog extends Thread {
    Watchdog() {
        super("Okio Watchdog");
        setDaemon(true);
    }

    public void run() {
        while (true) {
            try {
                AsyncTimeout timedOut;
                synchronized (AsyncTimeout.class) {
                    timedOut = awaitTimeout();

                    // Didn't find a node to interrupt. Try again.
                    if (timedOut == null) continue;

                    // The queue is completely empty. Let this thread exit and let another watchdog thread
                    // get created on the next call to scheduleTimeout().
                    if (timedOut == head) {
                        head = null;
                        return;
                    }
                }

                // Close the timed out node.
                timedOut.timedOut();
            } catch (InterruptedException ignored) {
            }
        }
    }
}
複製代碼

Watchdog 線程會一直同步遍歷任務隊列執行 awaitTimeout()

static @Nullable
AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    if (node == null) {
        long startNanos = System.nanoTime();
        AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
        return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
                ? head  // The idle timeout elapsed.
                : null; // The situation has changed.
    }

    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    if (waitNanos > 0) {
        // Waiting is made complicated by the fact that we work in nanoseconds,
        // but the API wants (millis, nanos) in two arguments.
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
        return null;
    }

    // The head of the queue has timed out. Remove it.
    head.next = node.next;
    node.next = null;
    return node;
}
複製代碼

} 這裏會根據隊列頭部的 AsyncTimeout 節點,計算出剩餘時間,而後執行 AsyncTimeout.class.wait(waitMillis, (int) waitNanos) 方法阻塞。

注意這個的 wait(timeout) 會被 AsyncTimeout.class.notify() 喚醒。
複製代碼

若是任務隊列爲空會執行 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS) ,等待一分鐘。而後再判斷是否有新的任務。

參考資料

拆輪子系列:拆 Okio

Okio源碼分析

okio github 地址

相關文章
相關標籤/搜索